You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@atlas.apache.org by ma...@apache.org on 2017/06/28 05:57:30 UTC
[17/25] incubator-atlas git commit: ATLAS-1898: initial commit of ODF
http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/6d19e129/odf/odf-core/src/main/java/org/apache/atlas/odf/core/ODFImplementations.java
----------------------------------------------------------------------
diff --git a/odf/odf-core/src/main/java/org/apache/atlas/odf/core/ODFImplementations.java b/odf/odf-core/src/main/java/org/apache/atlas/odf/core/ODFImplementations.java
new file mode 100755
index 0000000..4021049
--- /dev/null
+++ b/odf/odf-core/src/main/java/org/apache/atlas/odf/core/ODFImplementations.java
@@ -0,0 +1,95 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.atlas.odf.core;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.net.URL;
+import java.text.MessageFormat;
+import java.util.Enumeration;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Properties;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+public class ODFImplementations {
+
+ Logger logger = Logger.getLogger(ODFImplementations.class.getName());
+
+ private Map<String, String> implementations = new HashMap<String, String>();
+
+ public ODFImplementations(String path, ClassLoader cl) {
+ Enumeration<URL> resources;
+ try {
+ resources = cl.getResources(path);
+ } catch (IOException exc) {
+ logger.log(Level.WARNING, MessageFormat.format("An error occurred while reading properties ''0'' could not be loaded", path), exc);
+ return;
+ }
+ while (resources.hasMoreElements()) {
+ URL url = resources.nextElement();
+ try {
+ InputStream is = url.openStream();
+ if (is != null) {
+ Properties props = new Properties();
+ props.load(is);
+ for (Object key : props.keySet()) {
+ String keyString = (String) key;
+ try {
+ if (implementations.containsKey(key)) {
+ String existingClassString = implementations.get(keyString);
+ String newClassString = props.getProperty(keyString);
+ if (!existingClassString.equals(newClassString)) {
+ Class<?> existingClass = cl.loadClass(existingClassString);
+ Class<?> newClass = cl.loadClass(newClassString);
+ String superClass = null;
+ String subClass = null;
+ // select the class lowest in the class hierarchy
+ if (existingClass.isAssignableFrom(newClass)) {
+ superClass = existingClassString;
+ subClass = newClassString;
+ } else if (newClass.isAssignableFrom(existingClass)) {
+ superClass = newClassString;
+ subClass = existingClassString;
+ }
+ if (superClass != null) {
+ logger.log(Level.INFO, "Implementation for interface ''{0}'' was found more than once, using subclass ''{1}'' (found superclass ''{2}'')",
+ new Object[] { key, subClass, superClass });
+ implementations.put(keyString, subClass);
+ } else {
+ logger.log(Level.WARNING, "Implementation for interface ''{0}'' was found more than once, using ''{1}''. (Conflict between ''{1}'' and ''{2}'')",
+ new Object[] { key, existingClassString, newClassString });
+ }
+ }
+ } else {
+ cl.loadClass(props.getProperty(keyString));
+ implementations.put(keyString, props.getProperty(keyString));
+ }
+ } catch (ClassNotFoundException exc) {
+ logger.log(Level.SEVERE, "Class found in odf-implementation.properties file could not be loaded", exc);
+ }
+ }
+ }
+ } catch (IOException e) {
+ logger.log(Level.WARNING, MessageFormat.format("Properties ''0'' could not be loaded", url), e);
+ }
+ }
+ }
+
+ public Map<String, String> getImplementations() {
+ return implementations;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/6d19e129/odf/odf-core/src/main/java/org/apache/atlas/odf/core/ODFInitializer.java
----------------------------------------------------------------------
diff --git a/odf/odf-core/src/main/java/org/apache/atlas/odf/core/ODFInitializer.java b/odf/odf-core/src/main/java/org/apache/atlas/odf/core/ODFInitializer.java
new file mode 100755
index 0000000..64e54ad
--- /dev/null
+++ b/odf/odf-core/src/main/java/org/apache/atlas/odf/core/ODFInitializer.java
@@ -0,0 +1,97 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.atlas.odf.core;
+
+import java.util.concurrent.TimeoutException;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import org.apache.atlas.odf.core.controlcenter.AnalysisRequestTrackerStore;
+import org.apache.atlas.odf.core.controlcenter.ThreadManager;
+import org.apache.atlas.odf.core.messaging.DiscoveryServiceQueueManager;
+
+public class ODFInitializer {
+
+ static Logger logger = Logger.getLogger(ODFInitializer.class.getName());
+
+ static Object initLock = new Object();
+
+ private static boolean running = false;
+ private static long lastStopTimestamp = 0;
+ private static long lastStartTimestamp = 0;
+ private static boolean startStopInProgress = false;
+
+
+ public static long getLastStopTimestamp() {
+ synchronized (initLock) {
+ return lastStopTimestamp;
+ }
+ }
+
+ public static long getLastStartTimestamp() {
+ synchronized (initLock) {
+ return lastStartTimestamp;
+ }
+ }
+
+ public static boolean isRunning() {
+ synchronized (initLock) {
+ return running;
+ }
+ }
+
+ public static boolean isStartStopInProgress() {
+ return startStopInProgress;
+ }
+
+ public static void start() {
+ synchronized (initLock) {
+ if (!running) {
+ startStopInProgress = true;
+ DiscoveryServiceQueueManager qm = new ODFInternalFactory().create(DiscoveryServiceQueueManager.class);
+ try {
+ qm.start();
+ } catch (Exception e) {
+ logger.log(Level.WARNING, "Timeout occurred while starting ODF", e);
+ }
+ lastStartTimestamp = System.currentTimeMillis();
+ running = true;
+ startStopInProgress = false;
+ }
+ }
+ }
+
+ public static void stop() {
+ synchronized (initLock) {
+ if (running) {
+ startStopInProgress = true;
+ ODFInternalFactory f = new ODFInternalFactory();
+ DiscoveryServiceQueueManager qm = f.create(DiscoveryServiceQueueManager.class);
+ try {
+ qm.stop();
+ } catch (TimeoutException e) {
+ logger.log(Level.WARNING, "Timeout occurred while stopping ODF", e);
+ }
+ ThreadManager tm = f.create(ThreadManager.class);
+ tm.shutdownAllUnmanagedThreads();
+ AnalysisRequestTrackerStore arts = f.create(AnalysisRequestTrackerStore.class);
+ arts.clearCache();
+ lastStopTimestamp = System.currentTimeMillis();
+ running = false;
+ startStopInProgress = false;
+ }
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/6d19e129/odf/odf-core/src/main/java/org/apache/atlas/odf/core/ODFInternalFactory.java
----------------------------------------------------------------------
diff --git a/odf/odf-core/src/main/java/org/apache/atlas/odf/core/ODFInternalFactory.java b/odf/odf-core/src/main/java/org/apache/atlas/odf/core/ODFInternalFactory.java
new file mode 100755
index 0000000..4fd09a7
--- /dev/null
+++ b/odf/odf-core/src/main/java/org/apache/atlas/odf/core/ODFInternalFactory.java
@@ -0,0 +1,93 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.atlas.odf.core;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Properties;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import org.apache.atlas.odf.api.utils.ODFLogConfig;
+
+public class ODFInternalFactory {
+
+ private static Properties defaultImplemenetations = Utils.readConfigProperties("org/apache/atlas/odf/core/internal/odf-default-implementation.properties");
+ private static ODFImplementations overwrittenImplementations = null;
+ private static Map<Class<?>, Object> singletons = new HashMap<>();
+
+ public static String SINGLETON_MARKER = "@singleton";
+
+ static {
+ ODFLogConfig.run();
+
+ Logger logger = Logger.getLogger(ODFInternalFactory.class.getName());
+ ClassLoader cl = ODFInternalFactory.class.getClassLoader();
+ String overwriteConfig = "org/apahe/atlas/odf/odf-implementation.properties";
+ overwrittenImplementations = new ODFImplementations(overwriteConfig, cl);
+ if (overwrittenImplementations.getImplementations().isEmpty()) {
+ overwrittenImplementations = null;
+ } else {
+ logger.log(Level.INFO, "Found overwritten implementation config: {0}", overwrittenImplementations.getImplementations());
+ }
+ if (overwrittenImplementations == null) {
+ logger.log(Level.INFO, "Default implementations are used");
+ }
+ }
+
+ private Object createObject(Class<?> cl) throws ClassNotFoundException, IllegalAccessException, InstantiationException {
+ String clazz = null;
+ if (overwrittenImplementations != null) {
+ clazz = overwrittenImplementations.getImplementations().get(cl.getName());
+ }
+ if (clazz == null) {
+ clazz = defaultImplemenetations.getProperty(cl.getName());
+ }
+ if (clazz == null) {
+ // finally try to instantiate the class as such
+ clazz = cl.getName();
+ }
+ boolean isSingleton = false;
+ if (clazz.endsWith(SINGLETON_MARKER)) {
+ clazz = clazz.substring(0, clazz.length() - SINGLETON_MARKER.length());
+ isSingleton = true;
+ }
+ Object o = null;
+ Class<?> implClass = this.getClass().getClassLoader().loadClass(clazz);
+ if (isSingleton) {
+ o = singletons.get(implClass);
+ if (o == null) {
+ o = implClass.newInstance();
+ singletons.put(implClass, o);
+ }
+ } else {
+ o = implClass.newInstance();
+ }
+ return o;
+ }
+
+ @SuppressWarnings("unchecked")
+ public <T> T create(Class<T> cl) {
+ try {
+ return (T) createObject(cl);
+ } catch (ClassNotFoundException e) {
+ throw new RuntimeException(e);
+ } catch (IllegalAccessException e) {
+ throw new RuntimeException(e);
+ } catch (InstantiationException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/6d19e129/odf/odf-core/src/main/java/org/apache/atlas/odf/core/ODFUtils.java
----------------------------------------------------------------------
diff --git a/odf/odf-core/src/main/java/org/apache/atlas/odf/core/ODFUtils.java b/odf/odf-core/src/main/java/org/apache/atlas/odf/core/ODFUtils.java
new file mode 100755
index 0000000..623a727
--- /dev/null
+++ b/odf/odf-core/src/main/java/org/apache/atlas/odf/core/ODFUtils.java
@@ -0,0 +1,77 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.atlas.odf.core;
+
+import java.text.MessageFormat;
+import java.util.List;
+import java.util.logging.Logger;
+
+import org.apache.atlas.odf.api.analysis.AnalysisManager;
+import org.apache.atlas.odf.api.analysis.AnalysisRequest;
+import org.apache.atlas.odf.api.analysis.AnalysisRequestStatus;
+import org.apache.atlas.odf.api.analysis.AnalysisResponse;
+
+public class ODFUtils {
+ public static int DEFAULT_TIMEOUT_SECS = 10 * 60; // 10 minutes
+
+ public static AnalysisRequestStatus runSynchronously(AnalysisManager analysisManager, AnalysisRequest request) {
+ return runSynchronously(analysisManager, request, DEFAULT_TIMEOUT_SECS); // default is
+ }
+
+ public static AnalysisRequestStatus runSynchronously(AnalysisManager analysisManager, AnalysisRequest request, int timeoutInSeconds) {
+ Logger logger = Logger.getLogger(ODFUtils.class.getName());
+ AnalysisResponse response = analysisManager.runAnalysis(request);
+ if (response.isInvalidRequest()) {
+ AnalysisRequestStatus status = new AnalysisRequestStatus();
+ status.setState(AnalysisRequestStatus.State.ERROR);
+ status.setDetails(MessageFormat.format("Request was invalid. Details: {0}", response.getDetails()));
+ status.setRequest(request);
+ return status;
+ }
+ AnalysisRequestStatus status = null;
+ long startTime = System.currentTimeMillis();
+ boolean timeOutReached = false;
+ do {
+ logger.fine("Polling for result...");
+ status = analysisManager.getAnalysisRequestStatus(response.getId());
+ try {
+ Thread.sleep(1000);
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ }
+ long currentTime = System.currentTimeMillis();
+ timeOutReached = (currentTime - startTime) > (timeoutInSeconds * 1000);
+ } while ((AnalysisRequestStatus.State.ACTIVE.equals(status.getState()) || AnalysisRequestStatus.State.QUEUED.equals(status.getState()) //
+ && !timeOutReached));
+ return status;
+
+ }
+
+ public static AnalysisRequestStatus.State combineStates(List<AnalysisRequestStatus.State> allStates) {
+ // if one of the requests is in error, so is the complete request
+ if (allStates.contains(AnalysisRequestStatus.State.ERROR)) {
+ return AnalysisRequestStatus.State.ERROR;
+ }
+ // if no request could be found -> not found
+ if (Utils.containsOnly(allStates, new AnalysisRequestStatus.State[] { AnalysisRequestStatus.State.NOT_FOUND })) {
+ return AnalysisRequestStatus.State.NOT_FOUND;
+ }
+ // if all request are either not found or finished -> finished
+ if (Utils.containsOnly(allStates, new AnalysisRequestStatus.State[] { AnalysisRequestStatus.State.NOT_FOUND, AnalysisRequestStatus.State.FINISHED })) {
+ return AnalysisRequestStatus.State.FINISHED;
+ }
+ // else always return active
+ return AnalysisRequestStatus.State.ACTIVE;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/6d19e129/odf/odf-core/src/main/java/org/apache/atlas/odf/core/OpenDiscoveryFrameworkImpl.java
----------------------------------------------------------------------
diff --git a/odf/odf-core/src/main/java/org/apache/atlas/odf/core/OpenDiscoveryFrameworkImpl.java b/odf/odf-core/src/main/java/org/apache/atlas/odf/core/OpenDiscoveryFrameworkImpl.java
new file mode 100755
index 0000000..e8361fd
--- /dev/null
+++ b/odf/odf-core/src/main/java/org/apache/atlas/odf/core/OpenDiscoveryFrameworkImpl.java
@@ -0,0 +1,82 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.atlas.odf.core;
+
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import org.apache.atlas.odf.api.OpenDiscoveryFramework;
+import org.apache.atlas.odf.api.metadata.MetadataStore;
+import org.apache.atlas.odf.api.metadata.importer.JDBCMetadataImporter;
+import org.apache.atlas.odf.api.settings.SettingsManager;
+import org.apache.wink.json4j.JSONException;
+
+import org.apache.atlas.odf.api.analysis.AnalysisManager;
+import org.apache.atlas.odf.api.annotation.AnnotationStore;
+import org.apache.atlas.odf.api.discoveryservice.DiscoveryServiceManager;
+import org.apache.atlas.odf.api.engine.EngineManager;
+import org.apache.atlas.odf.api.engine.ServiceRuntimesInfo;
+import org.apache.atlas.odf.core.controlcenter.ServiceRuntimes;
+import org.apache.atlas.odf.json.JSONUtils;
+
+public class OpenDiscoveryFrameworkImpl implements OpenDiscoveryFramework {
+
+ private Logger logger = Logger.getLogger(OpenDiscoveryFrameworkImpl.class.getName());
+
+ public OpenDiscoveryFrameworkImpl() {
+ if (!ODFInitializer.isRunning() && !ODFInitializer.isStartStopInProgress()) {
+ logger.log(Level.INFO, "Initializing Open Discovery Platform");
+ ODFInitializer.start();
+ getEngineManager().checkHealthStatus(); // This implicitly initializes the control center and the message queues
+
+ logger.log(Level.INFO, "Open Discovery Platform successfully initialized.");
+
+ // log active runtimes
+ ServiceRuntimesInfo activeRuntimesInfo = ServiceRuntimes.getRuntimesInfo(ServiceRuntimes.getActiveRuntimes());
+ try {
+ logger.log(Level.INFO, "Active runtimes: ''{0}''", JSONUtils.toJSON(activeRuntimesInfo));
+ } catch (JSONException e) {
+ logger.log(Level.WARNING, "Active runtime info has wrong format", e);
+ }
+ }
+ }
+
+ public AnalysisManager getAnalysisManager() {
+ return new ODFInternalFactory().create(AnalysisManager.class);
+ }
+
+ public DiscoveryServiceManager getDiscoveryServiceManager() {
+ return new ODFInternalFactory().create(DiscoveryServiceManager.class);
+ }
+
+ public EngineManager getEngineManager() {
+ return new ODFInternalFactory().create(EngineManager.class);
+ }
+
+ public SettingsManager getSettingsManager() {
+ return new ODFInternalFactory().create(SettingsManager.class);
+ }
+
+ public AnnotationStore getAnnotationStore() {
+ return new ODFInternalFactory().create(AnnotationStore.class);
+ }
+
+ public MetadataStore getMetadataStore() {
+ return new ODFInternalFactory().create(MetadataStore.class);
+ }
+
+ public JDBCMetadataImporter getJDBCMetadataImporter() {
+ return new ODFInternalFactory().create(JDBCMetadataImporter.class);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/6d19e129/odf/odf-core/src/main/java/org/apache/atlas/odf/core/StandaloneEnvironment.java
----------------------------------------------------------------------
diff --git a/odf/odf-core/src/main/java/org/apache/atlas/odf/core/StandaloneEnvironment.java b/odf/odf-core/src/main/java/org/apache/atlas/odf/core/StandaloneEnvironment.java
new file mode 100755
index 0000000..e58dd37
--- /dev/null
+++ b/odf/odf-core/src/main/java/org/apache/atlas/odf/core/StandaloneEnvironment.java
@@ -0,0 +1,71 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.atlas.odf.core;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+
+import org.apache.atlas.odf.core.configuration.ConfigContainer;
+
+public class StandaloneEnvironment implements Environment {
+
+ @Override
+ public String getProperty(String propertyName) {
+ return System.getProperty(propertyName);
+ }
+
+ @Override
+ public String getCurrentUser() {
+ return System.getProperty("user.name");
+ }
+
+ @Override
+ public String getZookeeperConnectString() {
+ return getProperty("odf.zookeeper.connect");
+ }
+
+ @Override
+ public ConfigContainer getDefaultConfiguration() {
+ return Utils.readConfigurationFromClasspath("org/apache/atlas/odf/core/internal/odf-initial-configuration.json");
+ }
+
+ @Override
+ public Map<String, String> getPropertiesWithPrefix(String prefix) {
+ Map<String, String> foundProps = new HashMap<>();
+ Properties props = System.getProperties();
+ for (String key : props.stringPropertyNames()) {
+ if (key.startsWith(prefix)) {
+ foundProps.put(key, props.getProperty(key));
+ }
+ }
+ return foundProps;
+ }
+
+ @Override
+ public List<String> getActiveRuntimeNames() {
+ String p = getProperty("odf.active.runtimes");
+ if (p == null || p.equals("ALL")) {
+ return null;
+ }
+ if (p.equals("NONE")) {
+ return new ArrayList<>();
+ }
+ return Arrays.asList(p.split(","));
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/6d19e129/odf/odf-core/src/main/java/org/apache/atlas/odf/core/Utils.java
----------------------------------------------------------------------
diff --git a/odf/odf-core/src/main/java/org/apache/atlas/odf/core/Utils.java b/odf/odf-core/src/main/java/org/apache/atlas/odf/core/Utils.java
new file mode 100755
index 0000000..060f9fb
--- /dev/null
+++ b/odf/odf-core/src/main/java/org/apache/atlas/odf/core/Utils.java
@@ -0,0 +1,314 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.atlas.odf.core;
+
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.io.PrintWriter;
+import java.io.StringWriter;
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
+import java.text.MessageFormat;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.StringTokenizer;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import org.apache.atlas.odf.api.settings.KafkaConsumerConfig;
+import org.apache.atlas.odf.api.settings.ODFSettings;
+import org.apache.atlas.odf.api.settings.SettingsManager;
+import org.apache.atlas.odf.core.configuration.ConfigContainer;
+import org.apache.atlas.odf.json.JSONUtils;
+import org.apache.wink.json4j.JSONObject;
+
+import org.apache.atlas.odf.api.discoveryservice.AnalysisRequestTracker;
+import org.apache.atlas.odf.api.discoveryservice.DiscoveryServiceProperties;
+
+public class Utils {
+
+ static Logger logger = Logger.getLogger(Utils.class.getName());
+
+ private static final List<Class<? extends Object>> MERGABLE_CLASSES = Arrays.asList(ConfigContainer.class, KafkaConsumerConfig.class, ODFSettings.class, DiscoveryServiceProperties.class);
+
+ public static void mergeODFPOJOs(Object source, Object update) {
+ if (!source.getClass().isAssignableFrom(update.getClass())) {
+ return;
+ }
+
+ Method[] sourceMethods = source.getClass().getDeclaredMethods();
+
+ for (Method getterMethod : sourceMethods) {
+ if (getterMethod.getName().startsWith("get") || getterMethod.getName().startsWith("is")) {
+ String setterMethodName = getterMethod.getName().replaceFirst("get", "set");
+ if (getterMethod.getName().startsWith("is")) {
+ setterMethodName = setterMethodName.replaceFirst("is", "set");
+ }
+ try {
+ Method setterMethod = source.getClass().getDeclaredMethod(setterMethodName, getterMethod.getReturnType());
+ Object updateValue = getterMethod.invoke(update);
+ if (updateValue != null) {
+ Object sourceValue = getterMethod.invoke(source);
+
+ if (sourceValue != null && MERGABLE_CLASSES.contains(updateValue.getClass())) {
+ //Value is another POJO, must also try merging these instead of overwriting
+ mergeODFPOJOs(sourceValue, updateValue);
+ setterMethod.invoke(source, sourceValue);
+ } else if (sourceValue instanceof Map && updateValue instanceof Map) {
+ Map updateJSON = (Map) updateValue;
+ Map sourceJSON = (Map) sourceValue;
+ for (Object key : updateJSON.keySet()) {
+ sourceJSON.put(key, updateJSON.get(key));
+ }
+ setterMethod.invoke(source, sourceJSON);
+ } else {
+ setterMethod.invoke(source, updateValue);
+ }
+ }
+
+ } catch (NoSuchMethodException e) {
+ throw new RuntimeException(MessageFormat.format("Objects of type {0} and {1} could not be merged, no matching method found for {2}!", source.getClass().getName(), update
+ .getClass().getName(), getterMethod.getName()), e);
+ } catch (SecurityException e) {
+ throw new RuntimeException(MessageFormat.format("Objects of type {0} and {1} could not be merged, method {2} could not be accessed (SecurityException)!", source.getClass()
+ .getName(), update.getClass().getName(), setterMethodName), e);
+ } catch (IllegalAccessException e) {
+ throw new RuntimeException(MessageFormat.format("Objects of type {0} and {1} could not be merged, method {2} could not be accessed! (IllegalAccessException)", source.getClass()
+ .getName(), update.getClass().getName(), getterMethod.getName()), e);
+ } catch (IllegalArgumentException e) {
+ throw new RuntimeException(MessageFormat.format("Objects of type {0} and {1} could not be merged, method {2} does not accept the right parameters!", source.getClass().getName(),
+ update.getClass().getName(), setterMethodName), e);
+ } catch (InvocationTargetException e) {
+ e.printStackTrace();
+ throw new RuntimeException(MessageFormat.format("Objects of type {0} and {1} could not be merged, method {2} or {3} could not be invoked!", source.getClass().getName(), update
+ .getClass().getName(), getterMethod.getName(), setterMethodName), e);
+ }
+
+ }
+ }
+ }
+
+ public static Properties readConfigProperties(String path) {
+ // TODO cache this in static variables, it doesn't change at runtime
+ InputStream is = Utils.class.getClassLoader().getResourceAsStream(path);
+ if (is == null) {
+ return null;
+ }
+ Properties props = new Properties();
+ try {
+ props.load(is);
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ return props;
+ }
+
+ public static void setCurrentTimeAsLastModified(AnalysisRequestTracker tracker) {
+ tracker.setLastModified(System.currentTimeMillis());
+ }
+
+ public static String getExceptionAsString(Throwable exc) {
+ StringWriter sw = new StringWriter();
+ PrintWriter pw = new PrintWriter(sw);
+ exc.printStackTrace(pw);
+ String st = sw.toString();
+ return st;
+ }
+
+ public static String collectionToString(Collection<?> coll, String separator) {
+ StringBuffer buf = null;
+ for (Object o : coll) {
+ if (buf == null) {
+ buf = new StringBuffer("[ ");
+ } else {
+ buf.append(separator);
+ }
+ buf.append(o.toString());
+ }
+ buf.append(" ]");
+ return buf.toString();
+ }
+
+ public static <T> boolean containsOnly(List<T> l, T[] elements) {
+ for (T t : l) {
+ boolean containsOnlyElements = false;
+ for (T el : elements) {
+ if (t.equals(el)) {
+ containsOnlyElements = true;
+ break;
+ }
+ }
+ if (!containsOnlyElements) {
+ return false;
+ }
+ }
+ return true;
+ }
+
+ public static <T> boolean containsNone(List<T> l, T[] elements) {
+ for (T t : l) {
+ boolean containsAnyElement = false;
+ for (T el : elements) {
+ if (t.equals(el)) {
+ containsAnyElement = true;
+ break;
+ }
+ }
+ if (containsAnyElement) {
+ return true;
+ }
+ }
+ return false;
+ }
+
+ public static List<String> splitString(String s, char separator) {
+ List<String> l = new ArrayList<String>();
+ if (s != null) {
+ StringTokenizer tok = new StringTokenizer(s, String.valueOf(separator));
+ while (tok.hasMoreTokens()) {
+ l.add(tok.nextToken());
+ }
+ }
+ return l;
+ }
+
+ public static String getInputStreamAsString(InputStream is, String encoding) {
+ try {
+ final int n = 2048;
+ byte[] b = new byte[0];
+ byte[] temp = new byte[n];
+ int bytesRead;
+ while ((bytesRead = is.read(temp)) != -1) {
+ byte[] newB = new byte[b.length + bytesRead];
+ System.arraycopy(b, 0, newB, 0, b.length);
+ System.arraycopy(temp, 0, newB, b.length, bytesRead);
+ b = newB;
+ }
+ String s = new String(b, encoding);
+ return s;
+ } catch (IOException exc) {
+ return getExceptionAsString(exc);
+ }
+ }
+
+ public static void mergeJSONObjects(JSONObject source, JSONObject target) {
+ if (source != null && target != null) {
+ target.putAll(source);
+ }
+ }
+
+ public static <T> T getValue(T value, T defaultValue) {
+ if (value == null) {
+ return defaultValue;
+ }
+ return value;
+ }
+
+ public static String getSystemPropertyExceptionIfMissing(String propertyName) {
+ Environment env = new ODFInternalFactory().create(Environment.class);
+ String value = env.getProperty(propertyName);
+ if (value == null) {
+ String msg = MessageFormat.format("System property ''{0}'' is not set", propertyName);
+ logger.log(Level.SEVERE, msg);
+ throw new RuntimeException(msg);
+ }
+ return value;
+ }
+
+ public static int getIntEnvironmentProperty(String propertyName, int defaultValue) {
+ Environment env = new ODFInternalFactory().create(Environment.class);
+ String value = env.getProperty(propertyName);
+ if (value == null) {
+ return defaultValue;
+ }
+ try {
+ return Integer.parseInt(value);
+ } catch(NumberFormatException exc) {
+ return defaultValue;
+ }
+ }
+
+
+ public static void runSystemCommand(String command) {
+ logger.log(Level.INFO, "Running system command: " + command);
+ try {
+ Runtime r = Runtime.getRuntime();
+ Process p = r.exec(command);
+ p.waitFor();
+ BufferedReader b = new BufferedReader(new InputStreamReader(p.getInputStream()));
+ String line = "";
+ while ((line = b.readLine()) != null) {
+ logger.log(Level.INFO, "System command out: " + line);
+ }
+ b.close();
+ } catch(IOException | InterruptedException e) {
+ logger.log(Level.INFO, "Error executing system command.", e);
+ }
+ }
+
+ public static ConfigContainer readConfigurationFromClasspath(String jsonFileInClasspath) {
+ InputStream is = SettingsManager.class.getClassLoader().getResourceAsStream(jsonFileInClasspath);
+ try {
+ JSONObject configJSON = new JSONObject(is);
+ ConfigContainer config = JSONUtils.fromJSON(configJSON.write(), ConfigContainer.class);
+ return config;
+ } catch (Exception exc) {
+ throw new RuntimeException(exc);
+ }
+ }
+
+ public static String joinStrings(List<String> l, char separator) {
+ String result = null;
+ if ((l != null) && !l.isEmpty()) {
+ StringBuilder buf = null;
+ for (String s : l) {
+ if (buf == null) {
+ buf = new StringBuilder();
+ } else {
+ buf.append(separator);
+ }
+ buf.append(s);
+ }
+ result = buf.toString();
+ }
+ return result;
+ }
+
+ public static String getEnvironmentProperty(String name, String defaultValue) {
+ Environment env = new ODFInternalFactory().create(Environment.class);
+ String s = env.getProperty(name);
+ return s != null ? s : defaultValue;
+ }
+
+ public static long getEnvironmentProperty(String name, long defaultValue) {
+ Environment env = new ODFInternalFactory().create(Environment.class);
+ String s = env.getProperty(name);
+ if (s == null) {
+ return defaultValue;
+ }
+ try {
+ return Long.parseLong(s);
+ } catch(NumberFormatException exc) {
+ String msg = MessageFormat.format("Property ''{0}'' could not be converted to an integer", new Object[]{name});
+ logger.log(Level.WARNING, msg);
+ return defaultValue;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/6d19e129/odf/odf-core/src/main/java/org/apache/atlas/odf/core/analysis/AnalysisManagerImpl.java
----------------------------------------------------------------------
diff --git a/odf/odf-core/src/main/java/org/apache/atlas/odf/core/analysis/AnalysisManagerImpl.java b/odf/odf-core/src/main/java/org/apache/atlas/odf/core/analysis/AnalysisManagerImpl.java
new file mode 100755
index 0000000..8f7fab2
--- /dev/null
+++ b/odf/odf-core/src/main/java/org/apache/atlas/odf/core/analysis/AnalysisManagerImpl.java
@@ -0,0 +1,177 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.atlas.odf.core.analysis;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import org.apache.atlas.odf.api.analysis.AnalysisCancelResult;
+import org.apache.atlas.odf.api.analysis.AnalysisManager;
+import org.apache.atlas.odf.api.analysis.AnalysisRequest;
+import org.apache.atlas.odf.api.analysis.AnalysisRequestStatus;
+import org.apache.atlas.odf.api.analysis.AnalysisRequestStatus.State;
+import org.apache.atlas.odf.api.analysis.AnalysisRequestSummary;
+import org.apache.atlas.odf.api.analysis.AnalysisRequestTrackers;
+import org.apache.atlas.odf.api.analysis.AnalysisResponse;
+import org.apache.atlas.odf.api.metadata.MetaDataObjectReference;
+import org.apache.atlas.odf.core.ODFInternalFactory;
+import org.apache.atlas.odf.core.ODFUtils;
+import org.apache.atlas.odf.core.Utils;
+import org.apache.atlas.odf.core.controlcenter.AnalysisRequestTrackerStore;
+import org.apache.atlas.odf.core.controlcenter.ControlCenter;
+import org.apache.atlas.odf.json.JSONUtils;
+
+/**
+ *
+ * External Java API for creating and managing analysis requests
+ *
+ */
+public class AnalysisManagerImpl implements AnalysisManager {
+
+ public final static char COMPOUND_REQUEST_SEPARATOR = ',';
+ private Logger logger = Logger.getLogger(AnalysisManagerImpl.class.getName());
+ private ControlCenter controlCenter;
+
+ public AnalysisManagerImpl() {
+ controlCenter = new ODFInternalFactory().create(ControlCenter.class);
+ }
+
+ /**
+ * Issues a new ODF analysis request
+ *
+ * @param request Analysis request
+ * @return Response containing the request id and status information
+ */
+ public AnalysisResponse runAnalysis(AnalysisRequest request) {
+ if (((request.getDiscoveryServiceSequence() == null) || request.getDiscoveryServiceSequence().isEmpty())
+ && ((request.getAnnotationTypes() == null) || request.getAnnotationTypes().isEmpty())) {
+ AnalysisResponse response = new AnalysisResponse();
+ response.setId(request.getId());
+ response.setDetails("Either a sequence of discovery service ids or a list of annotation types must be specified to initiate an analysis request.");
+ response.setInvalidRequest(true);
+ return response;
+ }
+
+ if ((request.getDataSets().size() == 1) || request.isProcessDataSetsSequentially()) {
+ logger.log(Level.INFO, "Using sequential request processing (maybe because there is only a single data set)");
+ AnalysisResponse response = controlCenter.startRequest(request);
+ logger.log(Level.INFO, "Request with ID ''{0}'' started on data sets ''{1}''. Complete request: {2}.",
+ new Object[] { response.getId(), request.getDataSets(), JSONUtils.lazyJSONSerializer(request) });
+ return response;
+ }
+
+ List<String> requestIDs = new ArrayList<String>();
+ List<String> detailsMessages = new ArrayList<String>();
+ boolean invalidRequest = true;
+ logger.log(Level.INFO, "Running requests for ''{0}'' data sets in parallel", request.getDataSets().size());
+ logger.log(Level.FINE, "Splitting request into multiple request for each data set. Data Sets: {0}", request.getDataSets());
+ for (MetaDataObjectReference dataSet : request.getDataSets()) {
+ AnalysisRequest partRequest = new AnalysisRequest();
+ partRequest.setDiscoveryServiceSequence(request.getDiscoveryServiceSequence());
+ partRequest.setAdditionalProperties(request.getAdditionalProperties());
+ partRequest.setDataSets(Collections.singletonList(dataSet));
+ AnalysisResponse partResponse = controlCenter.startRequest(partRequest);
+ if (!partResponse.isInvalidRequest()) {
+ String partRequestID = partResponse.getId();
+ requestIDs.add(partRequestID);
+ detailsMessages.add(partResponse.getDetails());
+ // as soon as one request is valid, we make the compound request valid
+ invalidRequest = false;
+ }
+ }
+ AnalysisResponse response = new AnalysisResponse();
+ response.setId(Utils.joinStrings(requestIDs, COMPOUND_REQUEST_SEPARATOR));
+ response.setDetails(Utils.joinStrings(detailsMessages, COMPOUND_REQUEST_SEPARATOR));
+ response.setInvalidRequest(invalidRequest);
+ return response;
+ }
+
+ /**
+ * Retrieve status of an ODF analysis request
+ *
+ * @param requestId Unique id of the analysis request
+ * @return Status of the analysis request
+ */
+ public AnalysisRequestStatus getAnalysisRequestStatus(String requestId) {
+ List<String> singleRequestIds = Utils.splitString(requestId, COMPOUND_REQUEST_SEPARATOR);
+ if (singleRequestIds.size() == 1) {
+ AnalysisRequestStatus status = controlCenter.getRequestStatus(requestId);
+ return status;
+ }
+ AnalysisRequestStatus compoundStatus = new AnalysisRequestStatus();
+ compoundStatus.setState(State.QUEUED);
+ AnalysisRequest compoundRequest = new AnalysisRequest(); // assemble a compound request
+ compoundRequest.setId(requestId);
+ List<String> allMessages = new ArrayList<String>();
+ List<MetaDataObjectReference> allDataSets = new ArrayList<>();
+ List<State> allStates = new ArrayList<>();
+ for (String singleRequestId : singleRequestIds) {
+ AnalysisRequestStatus singleStatus = controlCenter.getRequestStatus(singleRequestId);
+ if (compoundRequest.getDiscoveryServiceSequence() == null) {
+ // assume all fields of the single requests are the same
+ // since they were created through runAnalysis()
+ compoundRequest.setDiscoveryServiceSequence(singleStatus.getRequest().getDiscoveryServiceSequence());
+ compoundRequest.setAdditionalProperties(singleStatus.getRequest().getAdditionalProperties());
+ }
+ if (singleStatus.getRequest().getDataSets() != null) {
+ allDataSets.addAll(singleStatus.getRequest().getDataSets());
+ }
+ allStates.add(singleStatus.getState());
+ allMessages.add(singleStatus.getDetails());
+ }
+ compoundRequest.setDataSets(allDataSets);
+
+ compoundStatus.setState(ODFUtils.combineStates(allStates));
+ compoundStatus.setRequest(compoundRequest);
+ compoundStatus.setDetails(Utils.joinStrings(allMessages, COMPOUND_REQUEST_SEPARATOR));
+ return compoundStatus;
+ }
+
+ /**
+ * Retrieve statistics about all previous ODF analysis requests
+ *
+ * @return Request summary
+ */
+ public AnalysisRequestSummary getAnalysisStats() {
+ AnalysisRequestTrackerStore store = new ODFInternalFactory().create(AnalysisRequestTrackerStore.class);
+ return store.getRequestSummary();
+ }
+
+ /**
+ * Retrieve status details of recent ODF analysis requests
+ *
+ * @param offset Starting offset (use 0 to start with the latest request)
+ * @param limit Maximum number of analysis requests to be returned (use -1 to retrieve all requests)
+ * @return Status details for each discovery request
+ */
+ public AnalysisRequestTrackers getAnalysisRequests(int offset, int limit) {
+ AnalysisRequestTrackerStore store = new ODFInternalFactory().create(AnalysisRequestTrackerStore.class);
+ AnalysisRequestTrackers analysisrequestTrackers = new AnalysisRequestTrackers();
+ analysisrequestTrackers.setAnalysisRequestTrackers(store.getRecentTrackers(offset, limit));
+ return analysisrequestTrackers;
+ }
+
+ /**
+ * Request a specific ODF discovery request to be canceled
+ *
+ * @param requestId Unique id of the analysis request
+ * @return Status of the cancellation attempt
+ */
+ public AnalysisCancelResult cancelAnalysisRequest(String requestId) {
+ return controlCenter.cancelRequest(requestId);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/6d19e129/odf/odf-core/src/main/java/org/apache/atlas/odf/core/annotation/InternalAnnotationStoreUtils.java
----------------------------------------------------------------------
diff --git a/odf/odf-core/src/main/java/org/apache/atlas/odf/core/annotation/InternalAnnotationStoreUtils.java b/odf/odf-core/src/main/java/org/apache/atlas/odf/core/annotation/InternalAnnotationStoreUtils.java
new file mode 100755
index 0000000..798b2d3
--- /dev/null
+++ b/odf/odf-core/src/main/java/org/apache/atlas/odf/core/annotation/InternalAnnotationStoreUtils.java
@@ -0,0 +1,48 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.atlas.odf.core.annotation;
+
+import java.util.List;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import org.apache.atlas.odf.api.metadata.models.Annotation;
+import org.apache.atlas.odf.api.ODFFactory;
+import org.apache.atlas.odf.api.analysis.AnalysisRequest;
+import org.apache.atlas.odf.api.annotation.AnnotationStore;
+import org.apache.atlas.odf.api.discoveryservice.DiscoveryServiceResult;
+
+public class InternalAnnotationStoreUtils {
+
+ public static void storeDiscoveryServiceResult(DiscoveryServiceResult result, AnalysisRequest req) {
+ Logger logger = Logger.getLogger(InternalAnnotationStoreUtils.class.getName());
+ AnnotationStore mds = new ODFFactory().create().getAnnotationStore();
+ mds.setAnalysisRun(req.getId());
+ if (result != null) {
+ logger.log(Level.FINE, "Persisting annotations returned by discovery service");
+ List<Annotation> annotations = result.getAnnotations();
+ if (annotations != null) {
+ for (Annotation annot : annotations) {
+ // only persist if reference was not set
+ if (annot.getReference() == null) {
+ mds.store(annot);
+ } else {
+ logger.log(Level.WARNING, "Returned annotation object has a non-null reference set and will not be persisted (reference: {0})", annot.getReference().toString());
+ }
+ }
+ }
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/6d19e129/odf/odf-core/src/main/java/org/apache/atlas/odf/core/configuration/ConfigContainer.java
----------------------------------------------------------------------
diff --git a/odf/odf-core/src/main/java/org/apache/atlas/odf/core/configuration/ConfigContainer.java b/odf/odf-core/src/main/java/org/apache/atlas/odf/core/configuration/ConfigContainer.java
new file mode 100755
index 0000000..f779155
--- /dev/null
+++ b/odf/odf-core/src/main/java/org/apache/atlas/odf/core/configuration/ConfigContainer.java
@@ -0,0 +1,68 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.atlas.odf.core.configuration;
+
+
+import java.util.List;
+
+import org.apache.atlas.odf.api.discoveryservice.DiscoveryServiceProperties;
+import org.apache.atlas.odf.api.settings.ODFSettings;
+import org.apache.atlas.odf.api.settings.validation.ValidationException;
+
+import io.swagger.annotations.ApiModel;
+import io.swagger.annotations.ApiModelProperty;
+
+/**
+ * {
+ * "odf" : {...},
+ * "userDefined" : {...}
+ * }
+ *
+ *
+ * This class is final, because reflection is used to access getters / setters in order to merge. This doesn't work with inherited methods
+ */
+@ApiModel(description="All ODF configuration options.")
+public final class ConfigContainer {
+
+ @ApiModelProperty(value="General ODF configuration options along with details about available discovery services", required=true)
+ private ODFSettings odf;
+
+ @ApiModelProperty(value="Details about available discovery services")
+ private List<DiscoveryServiceProperties> registeredServices = null;
+
+ public List<DiscoveryServiceProperties> getRegisteredServices() {
+ return registeredServices;
+ }
+
+ public void setRegisteredServices(List<DiscoveryServiceProperties> registeredServices) {
+ this.registeredServices = registeredServices;
+ }
+
+ public ODFSettings getOdf() {
+ return odf;
+ }
+
+ public void setOdf(ODFSettings odfSettings) {
+ this.odf = odfSettings;
+ }
+
+ public void validate() throws ValidationException {
+ if (this.odf != null) {
+ odf.validate();
+ }
+ if (this.registeredServices != null) {
+ new ServiceValidator().validate("ODFConfig.registeredServices", this.registeredServices);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/6d19e129/odf/odf-core/src/main/java/org/apache/atlas/odf/core/configuration/ConfigManager.java
----------------------------------------------------------------------
diff --git a/odf/odf-core/src/main/java/org/apache/atlas/odf/core/configuration/ConfigManager.java b/odf/odf-core/src/main/java/org/apache/atlas/odf/core/configuration/ConfigManager.java
new file mode 100755
index 0000000..7ad90e6
--- /dev/null
+++ b/odf/odf-core/src/main/java/org/apache/atlas/odf/core/configuration/ConfigManager.java
@@ -0,0 +1,235 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.atlas.odf.core.configuration;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.net.URL;
+import java.text.MessageFormat;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Enumeration;
+import java.util.List;
+import java.util.UUID;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import org.apache.atlas.odf.api.analysis.AnalysisRequest;
+import org.apache.atlas.odf.api.metadata.MetadataStore;
+import org.apache.atlas.odf.api.settings.SparkConfig;
+import org.apache.atlas.odf.api.settings.validation.ValidationException;
+import org.apache.atlas.odf.core.Encryption;
+import org.apache.atlas.odf.core.Environment;
+import org.apache.atlas.odf.core.ODFInternalFactory;
+import org.apache.atlas.odf.json.JSONUtils;
+import org.apache.wink.json4j.JSONException;
+
+import org.apache.atlas.odf.api.ODFFactory;
+import org.apache.atlas.odf.core.Utils;
+import org.apache.atlas.odf.core.controlcenter.ControlCenter;
+import org.apache.atlas.odf.api.discoveryservice.DiscoveryServiceProperties;
+import org.apache.atlas.odf.core.store.ODFConfigurationStorage;
+
+public class ConfigManager {
+ private Logger logger = Logger.getLogger(ConfigManager.class.getName());
+ public static final String HIDDEN_PASSWORD_IDENTIFIER = "***hidden***";
+ public static final long CONFIG_UPDATE_SLEEP_BETWEEN_POLLS = 20;
+ public static final int CONFIG_UPDATE_MAX_POLLS = 1500;
+ private static final String DEFAULT_ENCRYPTED_SPARK_CONFIGS = "spark.authenticate.secret,spark.ssl.keyPassword,spark.ssl.keyStorePassword,spark.ssl.trustStorePassword";
+
+ protected ODFConfigurationStorage configurationStore;
+ protected ODFConfigNotificationPublisher notificationManager;
+
+ public ConfigManager() {
+ ODFInternalFactory f = new ODFInternalFactory();
+ this.configurationStore = f.create(ODFConfigurationStorage.class);
+ this.notificationManager = f.create(ODFConfigNotificationPublisher.class);
+ }
+
+ public ConfigContainer getConfigContainer() {
+ ConfigContainer config = configurationStore.getConfig(getDefaultConfigContainer());
+ return config;
+ }
+
+ public ConfigContainer getConfigContainerHidePasswords() {
+ ConfigContainer config = configurationStore.getConfig(getDefaultConfigContainer());
+ hidePasswords(config);
+ return config;
+ }
+
+ public void updateConfigContainer(ConfigContainer update) throws ValidationException {
+ try {
+ update = JSONUtils.cloneJSONObject(update);
+ } catch (JSONException e) {
+ throw new RuntimeException(e);
+ }
+ update.validate();
+ ConfigContainer source = getConfigContainer();
+ unhideAndEncryptPasswords(update, source);
+
+ List<DiscoveryServiceProperties> newServicesToRun = new ArrayList<DiscoveryServiceProperties>();
+ if (update.getRegisteredServices() != null
+ && source.getRegisteredServices().size() < update.getRegisteredServices().size()) {
+ // store added services if update registers new ones
+ List<DiscoveryServiceProperties> newRegisteredServices = new ArrayList<DiscoveryServiceProperties>();
+ newRegisteredServices.addAll(update.getRegisteredServices());
+ for (DiscoveryServiceProperties oldService : source.getRegisteredServices()) {
+ for (int no = 0; no < newRegisteredServices.size(); no++) {
+ if (newRegisteredServices.get(no).getId().equals(oldService.getId())) {
+ newRegisteredServices.remove(no);
+ break;
+ }
+ }
+ }
+
+ newServicesToRun.addAll(newRegisteredServices);
+ }
+
+ Utils.mergeODFPOJOs(source, update);
+ configurationStore.storeConfig(source);
+
+ if (source.getOdf().getRunNewServicesOnRegistration() && !newServicesToRun.isEmpty()) {
+ runNewServices(newServicesToRun);
+ }
+
+ String changeId = UUID.randomUUID().toString();
+ configurationStore.addPendingConfigChange(changeId);
+ this.notificationManager.publishConfigChange(source, changeId);
+ for (int i=0; i < CONFIG_UPDATE_MAX_POLLS; i++) {
+ if (!configurationStore.isConfigChangePending(changeId)) {
+ logger.log(Level.INFO, MessageFormat.format("Config change id ''{0}'' successfully completed after {1} msec.", new Object[] { changeId, i * CONFIG_UPDATE_SLEEP_BETWEEN_POLLS } ));
+ return;
+ }
+ try {
+ Thread.sleep(CONFIG_UPDATE_SLEEP_BETWEEN_POLLS);
+ } catch (InterruptedException e) {
+ // Ignore interrupt
+ logger.log(Level.WARNING, "Sleep period was interrupted", e);
+ }
+ }
+ logger.log(Level.WARNING, MessageFormat.format("Config change did not complete after {0} msec.", CONFIG_UPDATE_SLEEP_BETWEEN_POLLS * CONFIG_UPDATE_MAX_POLLS));
+ }
+
+ public void resetConfigContainer() {
+ logger.warning("resetting ODF configuration!");
+ configurationStore.storeConfig(getDefaultConfigContainer());
+ }
+
+ private static String defaultConfig = null;
+
+ List<DiscoveryServiceProperties> getServicesFoundOnClassPath() throws IOException, JSONException {
+ ClassLoader cl = this.getClass().getClassLoader();
+ Enumeration<URL> services = cl.getResources("META-INF/odf/odf-services.json");
+ List<DiscoveryServiceProperties> result = new ArrayList<>();
+ while (services.hasMoreElements()) {
+ URL url = services.nextElement();
+ InputStream is = url.openStream();
+ String json = Utils.getInputStreamAsString(is, "UTF-8");
+ logger.log(Level.INFO, "Service found on the classpath at {0}: {1}", new Object[] { url, json });
+ result.addAll(JSONUtils.fromJSONList(json, DiscoveryServiceProperties.class));
+ }
+ logger.log(Level.INFO, "Number of classpath services found: {0}", result.size());
+ return result;
+ }
+
+ private ConfigContainer getDefaultConfigContainer() {
+ if (defaultConfig == null) {
+ try {
+ ConfigContainer config = new ODFInternalFactory().create(Environment.class).getDefaultConfiguration();
+ // now look for services found on the classpath
+ config.getRegisteredServices().addAll(getServicesFoundOnClassPath());
+ defaultConfig = JSONUtils.toJSON(config);
+ } catch (IOException | JSONException e) {
+ String msg = "Default config could not be loaded or parsed!";
+ logger.severe(msg);
+ throw new RuntimeException(msg, e);
+ }
+ }
+ try {
+ return JSONUtils.fromJSON(defaultConfig, ConfigContainer.class);
+ } catch (JSONException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ private void runNewServices(List<DiscoveryServiceProperties> newServices) {
+ ControlCenter cc = new ODFInternalFactory().create(ControlCenter.class);
+ List<String> servicesToRun = new ArrayList<String>();
+ for (DiscoveryServiceProperties info : newServices) {
+ servicesToRun.add(info.getId());
+ }
+
+ AnalysisRequest req = new AnalysisRequest();
+ MetadataStore mds = new ODFFactory().create().getMetadataStore();
+ req.setDiscoveryServiceSequence(servicesToRun);
+ req.setDataSets(mds.search(mds.newQueryBuilder().objectType("DataSet").build()));
+ req.setIgnoreDataSetCheck(true);
+ cc.startRequest(req);
+ }
+
+ private void unhideAndEncryptPasswords(ConfigContainer updatedConfigContainer,
+ ConfigContainer originalConfiguration) {
+ if (updatedConfigContainer.getOdf() != null) {
+ String odfPassword = updatedConfigContainer.getOdf().getOdfPassword();
+ if (odfPassword != null) {
+ if (odfPassword.equals(HIDDEN_PASSWORD_IDENTIFIER)) {
+ // Password was not changed, therefore keep original
+ // encrypted password
+ updatedConfigContainer.getOdf().setOdfPassword(originalConfiguration.getOdf().getOdfPassword());
+ } else if (!Encryption.isEncrypted(odfPassword)) {
+ updatedConfigContainer.getOdf().setOdfPassword(Encryption.encryptText(odfPassword));
+ }
+ }
+ if (updatedConfigContainer.getOdf().getSparkConfig() != null) {
+ SparkConfig updatedSparkConfig = updatedConfigContainer.getOdf().getSparkConfig();
+ if (updatedSparkConfig.getConfigs() != null) {
+ List<String> encryptedSparkConfigs = Arrays.asList(DEFAULT_ENCRYPTED_SPARK_CONFIGS.split(","));
+ for (String configName : updatedSparkConfig.getConfigs().keySet()) {
+ if (encryptedSparkConfigs.contains(configName)) {
+ String updatedConfigValue = (String) updatedSparkConfig.getConfigs().get(configName);
+ if (updatedConfigValue.equals(HIDDEN_PASSWORD_IDENTIFIER)) {
+ // Encrypted value was not changed, therefore keep original
+ // Encrypted value
+ SparkConfig originalSparkConfig = originalConfiguration.getOdf().getSparkConfig();
+ updatedSparkConfig.setConfig(configName, originalSparkConfig.getConfigs().get(configName));
+ } else if (!Encryption.isEncrypted(updatedConfigValue)) {
+ updatedSparkConfig.setConfig(configName, Encryption.encryptText(updatedConfigValue));
+ }
+ }
+ }
+ }
+ }
+ }
+ }
+
+ private void hidePasswords(ConfigContainer configContainer) {
+ if (configContainer.getOdf() != null) {
+ if (configContainer.getOdf().getOdfPassword() != null) {
+ configContainer.getOdf().setOdfPassword(HIDDEN_PASSWORD_IDENTIFIER);
+ }
+ if ((configContainer.getOdf().getSparkConfig() != null)){
+ SparkConfig sparkConfig = configContainer.getOdf().getSparkConfig();
+ if (sparkConfig.getConfigs() != null) {
+ List<String> encryptedSparkConfigs = Arrays.asList(DEFAULT_ENCRYPTED_SPARK_CONFIGS.split(","));
+ for (String configName : sparkConfig.getConfigs().keySet()) {
+ if (((encryptedSparkConfigs.contains(configName)) && (sparkConfig.getConfigs().get(configName)) != null)) {
+ sparkConfig.setConfig(configName, HIDDEN_PASSWORD_IDENTIFIER);
+ }
+ }
+ }
+ }
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/6d19e129/odf/odf-core/src/main/java/org/apache/atlas/odf/core/configuration/ODFConfigNotificationPublisher.java
----------------------------------------------------------------------
diff --git a/odf/odf-core/src/main/java/org/apache/atlas/odf/core/configuration/ODFConfigNotificationPublisher.java b/odf/odf-core/src/main/java/org/apache/atlas/odf/core/configuration/ODFConfigNotificationPublisher.java
new file mode 100755
index 0000000..a7f822f
--- /dev/null
+++ b/odf/odf-core/src/main/java/org/apache/atlas/odf/core/configuration/ODFConfigNotificationPublisher.java
@@ -0,0 +1,45 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.atlas.odf.core.configuration;
+
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import org.apache.atlas.odf.core.ODFInternalFactory;
+import org.apache.atlas.odf.core.controlcenter.AdminMessage;
+import org.apache.atlas.odf.core.controlcenter.AdminMessage.Type;
+import org.apache.atlas.odf.core.messaging.DiscoveryServiceQueueManager;
+import org.apache.atlas.odf.json.JSONUtils;
+
+public class ODFConfigNotificationPublisher {
+
+ Logger logger = Logger.getLogger(ODFConfigNotificationPublisher.class.getName());
+
+ public void publishConfigChange(ConfigContainer update, String changeId) {
+ try {
+ logger.log(Level.FINE, "publishing config change: {0}", JSONUtils.toJSON(update));
+ ConfigContainer clone = JSONUtils.fromJSON(JSONUtils.toJSON(update), ConfigContainer.class);
+ AdminMessage amsg = new AdminMessage();
+ amsg.setId(changeId);
+ amsg.setAdminMessageType(Type.CONFIGCHANGE);
+ amsg.setConfigUpdateDetails(clone);
+ amsg.setDetails("Configuration update");
+ DiscoveryServiceQueueManager qm = new ODFInternalFactory().create(DiscoveryServiceQueueManager.class);
+ qm.enqueueInAdminQueue(amsg);
+ } catch (Exception exc) {
+ logger.log(Level.WARNING, "An unexpected exception occurres when writing to admin queue. Ignoring it", exc);
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/6d19e129/odf/odf-core/src/main/java/org/apache/atlas/odf/core/configuration/ServiceValidator.java
----------------------------------------------------------------------
diff --git a/odf/odf-core/src/main/java/org/apache/atlas/odf/core/configuration/ServiceValidator.java b/odf/odf-core/src/main/java/org/apache/atlas/odf/core/configuration/ServiceValidator.java
new file mode 100755
index 0000000..011d728
--- /dev/null
+++ b/odf/odf-core/src/main/java/org/apache/atlas/odf/core/configuration/ServiceValidator.java
@@ -0,0 +1,75 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.atlas.odf.core.configuration;
+
+import java.text.MessageFormat;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.atlas.odf.api.ODFFactory;
+import org.apache.atlas.odf.api.discoveryservice.DiscoveryServiceProperties;
+import org.apache.atlas.odf.api.settings.validation.PropertyValidator;
+import org.apache.atlas.odf.api.settings.validation.ValidationException;
+import org.apache.atlas.odf.core.controlcenter.ServiceRuntime;
+import org.apache.atlas.odf.core.controlcenter.ServiceRuntimes;
+
+public class ServiceValidator implements PropertyValidator {
+
+ public void validate(String property, Object value) throws ValidationException {
+ validate(property, value, true);
+ }
+
+ private void validate(String property, Object value, boolean topLevel) throws ValidationException {
+ if (value == null) {
+ throw new ValidationException("Null values are not allowed for this property");
+ }
+
+ if (value instanceof List) {
+ List<DiscoveryServiceProperties> newServices = (List<DiscoveryServiceProperties>) value;
+ List<String> ids = new ArrayList<String>();
+ for (int no = 0; no < newServices.size(); no++) {
+ DiscoveryServiceProperties service = (DiscoveryServiceProperties) newServices.get(no);
+ validate(property, service, false);
+ String serviceId = service.getId();
+ if (ids.contains(serviceId)) {
+ throw new ValidationException(property, MessageFormat.format("you cannot register multiple services with the same id {0}!", serviceId));
+ } else {
+ ids.add(serviceId);
+ }
+ }
+ } else if (value instanceof DiscoveryServiceProperties) {
+ DiscoveryServiceProperties service = (DiscoveryServiceProperties) value;
+ if (service.getId() == null || service.getId().trim().isEmpty() || service.getName() == null || service.getName().trim().isEmpty() || service.getEndpoint() == null) {
+ throw new ValidationException(property, MessageFormat.format("A service requires {0}", "id, name and an endpoint"));
+ }
+
+ if (topLevel) {
+ List<String> regServices = new ArrayList<String>();
+ List<DiscoveryServiceProperties> services = new ODFFactory().create().getDiscoveryServiceManager().getDiscoveryServicesProperties();
+ for (DiscoveryServiceProperties regService : services) {
+ regServices.add(regService.getId());
+ }
+
+ if (regServices.contains(service.getId())) {
+ throw new ValidationException(property, MessageFormat.format("a service with id {0} already exists!", service.getId()));
+ }
+ }
+
+ ServiceRuntime runtime = ServiceRuntimes.getRuntimeForDiscoveryService(service);
+ runtime.validate(service);
+ } else {
+ throw new ValidationException(property, "only DiscoveryServiceRegistrationInfo objects or list of such objects are allowed for this property");
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/6d19e129/odf/odf-core/src/main/java/org/apache/atlas/odf/core/controlcenter/AdminMessage.java
----------------------------------------------------------------------
diff --git a/odf/odf-core/src/main/java/org/apache/atlas/odf/core/controlcenter/AdminMessage.java b/odf/odf-core/src/main/java/org/apache/atlas/odf/core/controlcenter/AdminMessage.java
new file mode 100755
index 0000000..fffff6f
--- /dev/null
+++ b/odf/odf-core/src/main/java/org/apache/atlas/odf/core/controlcenter/AdminMessage.java
@@ -0,0 +1,60 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.atlas.odf.core.controlcenter;
+
+import org.apache.atlas.odf.core.configuration.ConfigContainer;
+
+// JSON
+public class AdminMessage {
+ public static enum Type {
+ SHUTDOWN, RESTART, CONFIGCHANGE
+ }
+
+ private Type adminMessageType;
+ private String details;
+ private ConfigContainer configUpdateDetails;
+ private String messageId;
+
+ public Type getAdminMessageType() {
+ return adminMessageType;
+ }
+
+ public void setAdminMessageType(Type adminMessageType) {
+ this.adminMessageType = adminMessageType;
+ }
+
+ public String getDetails() {
+ return details;
+ }
+
+ public void setDetails(String details) {
+ this.details = details;
+ }
+
+ public ConfigContainer getConfigUpdateDetails() {
+ return configUpdateDetails;
+ }
+
+ public void setConfigUpdateDetails(ConfigContainer configUpdateDetails) {
+ this.configUpdateDetails = configUpdateDetails;
+ }
+
+ public String getId() {
+ return this.messageId;
+ }
+
+ public void setId(String messageId) {
+ this.messageId = messageId;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/6d19e129/odf/odf-core/src/main/java/org/apache/atlas/odf/core/controlcenter/AdminQueueProcessor.java
----------------------------------------------------------------------
diff --git a/odf/odf-core/src/main/java/org/apache/atlas/odf/core/controlcenter/AdminQueueProcessor.java b/odf/odf-core/src/main/java/org/apache/atlas/odf/core/controlcenter/AdminQueueProcessor.java
new file mode 100755
index 0000000..874e061
--- /dev/null
+++ b/odf/odf-core/src/main/java/org/apache/atlas/odf/core/controlcenter/AdminQueueProcessor.java
@@ -0,0 +1,92 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.atlas.odf.core.controlcenter;
+
+import java.util.concurrent.ExecutorService;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import org.apache.wink.json4j.JSONException;
+
+import org.apache.atlas.odf.core.ODFInitializer;
+import org.apache.atlas.odf.json.JSONUtils;
+
+public class AdminQueueProcessor implements QueueMessageProcessor {
+
+ private Logger logger = Logger.getLogger(AdminQueueProcessor.class.getName());
+
+ @Override
+ public void process(ExecutorService executorService, String msg, int partition, long offset) {
+ AdminMessage adminMessage;
+ try {
+ adminMessage = JSONUtils.fromJSON(msg, AdminMessage.class);
+ } catch (JSONException e) {
+ throw new RuntimeException(e);
+ }
+ switch (adminMessage.getAdminMessageType()) {
+ case SHUTDOWN:
+ initiateShutdown(executorService, false);
+ break;
+ case RESTART:
+ initiateShutdown(executorService, true);
+ break;
+ default:
+ // do nothing
+ }
+ }
+
+ static Object restartLockObject = new Object();
+
+ private void initiateShutdown(ExecutorService executorService, final boolean restart) {
+ logger.log(Level.INFO, "Shutdown of ODF was requested...");
+ Runnable shutDownRunnable = new Runnable() {
+
+ @Override
+ public void run() {
+ logger.log(Level.INFO, "Initiating shutdown");
+
+ // sleep some time before initiating the actual shutdown to give the process() a chance to return
+ // before it is itself shut down
+ long sleepTimeBeforeShutdown = 1000;
+ try {
+ Thread.sleep(sleepTimeBeforeShutdown);
+ } catch (InterruptedException e) {
+ // do nothing
+ e.printStackTrace();
+ }
+
+ synchronized (restartLockObject) {
+ logger.log(Level.INFO, "Shutting down ODF...");
+ try {
+ ODFInitializer.stop();
+ logger.log(Level.INFO, "ODF was shutdown");
+
+ if (restart) {
+ logger.log(Level.INFO, "Restarting ODF");
+ ODFInitializer.start();
+ logger.log(Level.INFO, "ODF restarted");
+ }
+ } catch (Exception e) {
+ logger.log(Level.SEVERE, "An unexpected error occurred when shutting down ODF", e);
+ }
+ }
+
+ }
+
+ };
+
+ executorService.submit(shutDownRunnable);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/6d19e129/odf/odf-core/src/main/java/org/apache/atlas/odf/core/controlcenter/AnalysisRequestTrackerStore.java
----------------------------------------------------------------------
diff --git a/odf/odf-core/src/main/java/org/apache/atlas/odf/core/controlcenter/AnalysisRequestTrackerStore.java b/odf/odf-core/src/main/java/org/apache/atlas/odf/core/controlcenter/AnalysisRequestTrackerStore.java
new file mode 100755
index 0000000..e43bd45
--- /dev/null
+++ b/odf/odf-core/src/main/java/org/apache/atlas/odf/core/controlcenter/AnalysisRequestTrackerStore.java
@@ -0,0 +1,53 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.atlas.odf.core.controlcenter;
+
+import java.util.List;
+
+import org.apache.atlas.odf.api.analysis.AnalysisRequest;
+import org.apache.atlas.odf.api.analysis.AnalysisRequestSummary;
+import org.apache.atlas.odf.api.analysis.AnalysisRequestTrackerStatus.STATUS;
+import org.apache.atlas.odf.api.discoveryservice.AnalysisRequestTracker;
+
+
+public interface AnalysisRequestTrackerStore {
+
+ /**
+ * set the status of old requests which were last modified before the cutOffTimestamp
+ * with an optional detailsMessage
+ */
+ void setStatusOfOldRequest(long cutOffTimestamp, STATUS status, String detailsMessage);
+
+ // store / update the passed tracker
+ void store(AnalysisRequestTracker tracker);
+
+ AnalysisRequestTracker query(String analysisRequestId);
+
+ AnalysisRequestTracker findSimilarQueuedRequest(AnalysisRequest request);
+
+ /**
+ * @param number - number of trackers to retrieve, -1 for all
+ * @return
+ */
+ List<AnalysisRequestTracker> getRecentTrackers(int offset, int limit);
+
+ /**
+ * Clear any internal caches, if any.
+ */
+ void clearCache();
+
+ int getSize();
+
+ AnalysisRequestSummary getRequestSummary();
+}
http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/6d19e129/odf/odf-core/src/main/java/org/apache/atlas/odf/core/controlcenter/AsyncDiscoveryServiceWrapper.java
----------------------------------------------------------------------
diff --git a/odf/odf-core/src/main/java/org/apache/atlas/odf/core/controlcenter/AsyncDiscoveryServiceWrapper.java b/odf/odf-core/src/main/java/org/apache/atlas/odf/core/controlcenter/AsyncDiscoveryServiceWrapper.java
new file mode 100755
index 0000000..8100f18
--- /dev/null
+++ b/odf/odf-core/src/main/java/org/apache/atlas/odf/core/controlcenter/AsyncDiscoveryServiceWrapper.java
@@ -0,0 +1,108 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.atlas.odf.core.controlcenter;
+
+import java.util.concurrent.ExecutorService;
+
+import org.apache.atlas.odf.api.annotation.AnnotationStore;
+import org.apache.atlas.odf.api.discoveryservice.DataSetCheckResult;
+import org.apache.atlas.odf.api.discoveryservice.DiscoveryServiceRequest;
+import org.apache.atlas.odf.api.discoveryservice.DiscoveryServiceResponse.ResponseCode;
+import org.apache.atlas.odf.api.discoveryservice.async.AsyncDiscoveryService;
+import org.apache.atlas.odf.api.discoveryservice.async.DiscoveryServiceAsyncRunStatus;
+import org.apache.atlas.odf.api.discoveryservice.async.DiscoveryServiceAsyncStartResponse;
+import org.apache.atlas.odf.api.discoveryservice.datasets.DataSetContainer;
+import org.apache.atlas.odf.api.discoveryservice.sync.DiscoveryServiceSyncResponse;
+import org.apache.atlas.odf.api.discoveryservice.sync.SyncDiscoveryService;
+import org.apache.atlas.odf.api.metadata.MetadataStore;
+import org.apache.atlas.odf.core.Utils;
+
+public class AsyncDiscoveryServiceWrapper implements SyncDiscoveryService {
+
+ AsyncDiscoveryService wrappedService = null;
+
+ public AsyncDiscoveryServiceWrapper(AsyncDiscoveryService wrappedService) {
+ this.wrappedService = wrappedService;
+ }
+
+ @Override
+ public DiscoveryServiceSyncResponse runAnalysis(DiscoveryServiceRequest request) {
+ try {
+ DiscoveryServiceAsyncStartResponse asyncResponse = wrappedService.startAnalysis(request);
+ ResponseCode code = asyncResponse.getCode();
+ if (code != ResponseCode.OK) {
+ DiscoveryServiceSyncResponse response = new DiscoveryServiceSyncResponse();
+ response.setCode(code);
+ response.setDetails(asyncResponse.getDetails());
+ return response;
+ }
+ // poll the async service
+ final long maxWaitTimeSecs = Utils.getEnvironmentProperty("odf.async.max.wait.secs", 10 * 60); // default: 10 minutes
+ final long pollingIntervalMS = Utils.getEnvironmentProperty("odf.async.poll.interval.ms", 1000);
+ long maxPolls = (maxWaitTimeSecs * 1000) / pollingIntervalMS;
+ int pollCounter = 0;
+
+ DiscoveryServiceSyncResponse response = new DiscoveryServiceSyncResponse();
+ String runId = asyncResponse.getRunId();
+ while (pollCounter < maxPolls) {
+ Thread.sleep(pollingIntervalMS);
+ DiscoveryServiceAsyncRunStatus status = wrappedService.getStatus(runId);
+ switch (status.getState()) {
+ case NOT_FOUND:
+ // should not happen
+ response.setCode(ResponseCode.UNKNOWN_ERROR);
+ response.setDetails("Run ID " + runId + " was not found. This should not have happened.");
+ return response;
+ case ERROR:
+ response.setCode(ResponseCode.UNKNOWN_ERROR);
+ response.setDetails(status.getDetails());
+ return response;
+ case FINISHED:
+ response.setCode(ResponseCode.OK);
+ response.setDetails(status.getDetails());
+ response.setResult(status.getResult());
+ return response;
+ default:
+ // continue polling
+ pollCounter++;
+ }
+ }
+ response.setCode(ResponseCode.UNKNOWN_ERROR);
+ response.setDetails("Polled Async service for " + maxWaitTimeSecs + " seconds without positive result");
+ return response;
+ } catch (Exception exc) {
+ DiscoveryServiceSyncResponse response = new DiscoveryServiceSyncResponse();
+ response.setCode(ResponseCode.UNKNOWN_ERROR);
+ response.setDetails("An unknown error occurred: " + Utils.getExceptionAsString(exc));
+ return response;
+ }
+ }
+
+ public void setExecutorService(ExecutorService executorService) {
+ wrappedService.setExecutorService(executorService);
+ }
+
+ public void setMetadataStore(MetadataStore metadataStore) {
+ wrappedService.setMetadataStore(metadataStore);
+ }
+
+ public void setAnnotationStore(AnnotationStore annotationStore) {
+ wrappedService.setAnnotationStore(annotationStore);
+ }
+
+ public DataSetCheckResult checkDataSet(DataSetContainer dataSetContainer) {
+ return wrappedService.checkDataSet(dataSetContainer);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/6d19e129/odf/odf-core/src/main/java/org/apache/atlas/odf/core/controlcenter/ConfigChangeQueueProcessor.java
----------------------------------------------------------------------
diff --git a/odf/odf-core/src/main/java/org/apache/atlas/odf/core/controlcenter/ConfigChangeQueueProcessor.java b/odf/odf-core/src/main/java/org/apache/atlas/odf/core/controlcenter/ConfigChangeQueueProcessor.java
new file mode 100755
index 0000000..bcd2965
--- /dev/null
+++ b/odf/odf-core/src/main/java/org/apache/atlas/odf/core/controlcenter/ConfigChangeQueueProcessor.java
@@ -0,0 +1,45 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.atlas.odf.core.controlcenter;
+
+import java.util.concurrent.ExecutorService;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import org.apache.atlas.odf.core.ODFInternalFactory;
+import org.apache.atlas.odf.core.controlcenter.AdminMessage.Type;
+import org.apache.atlas.odf.core.store.ODFConfigurationStorage;
+import org.apache.atlas.odf.json.JSONUtils;
+
+public class ConfigChangeQueueProcessor implements QueueMessageProcessor {
+
+ Logger logger = Logger.getLogger(ConfigChangeQueueProcessor.class.getName());
+
+ @Override
+ public void process(ExecutorService executorService, String msg, int partition, long offset) {
+ try {
+ AdminMessage amsg = JSONUtils.fromJSON(msg, AdminMessage.class);
+ if (Type.CONFIGCHANGE.equals(amsg.getAdminMessageType())) {
+ logger.info("Received config change: " + JSONUtils.toJSON(amsg));
+ ODFInternalFactory f = new ODFInternalFactory();
+ ODFConfigurationStorage configStorage = f.create(ODFConfigurationStorage.class);
+ configStorage.onConfigChange(amsg.getConfigUpdateDetails());
+ configStorage.removePendingConfigChange(amsg.getId());
+ }
+ } catch(Exception exc) {
+ logger.log(Level.WARNING, "An exception occurred while processing admin message", exc);
+ }
+ }
+
+}