You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@atlas.apache.org by ma...@apache.org on 2017/06/28 05:57:27 UTC

[14/25] incubator-atlas git commit: ATLAS-1898: initial commit of ODF

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/6d19e129/odf/odf-core/src/main/java/org/apache/atlas/odf/core/metadata/JDBCMetadataImporterImpl.java
----------------------------------------------------------------------
diff --git a/odf/odf-core/src/main/java/org/apache/atlas/odf/core/metadata/JDBCMetadataImporterImpl.java b/odf/odf-core/src/main/java/org/apache/atlas/odf/core/metadata/JDBCMetadataImporterImpl.java
new file mode 100755
index 0000000..4bccd6c
--- /dev/null
+++ b/odf/odf-core/src/main/java/org/apache/atlas/odf/core/metadata/JDBCMetadataImporterImpl.java
@@ -0,0 +1,181 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.atlas.odf.core.metadata;
+
+import java.sql.Connection;
+import java.sql.DatabaseMetaData;
+import java.sql.DriverManager;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.text.MessageFormat;
+import java.util.ArrayList;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import org.apache.atlas.odf.api.ODFFactory;
+import org.apache.atlas.odf.api.metadata.MetaDataObjectReference;
+import org.apache.atlas.odf.api.metadata.MetadataQueryBuilder;
+import org.apache.atlas.odf.api.metadata.MetadataStore;
+import org.apache.atlas.odf.api.metadata.importer.JDBCMetadataImportResult;
+import org.apache.atlas.odf.api.metadata.importer.JDBCMetadataImporter;
+import org.apache.atlas.odf.api.metadata.importer.MetadataImportException;
+import org.apache.atlas.odf.api.metadata.models.JDBCConnection;
+import org.apache.atlas.odf.api.metadata.models.MetaDataObject;
+import org.apache.atlas.odf.api.metadata.models.Column;
+import org.apache.atlas.odf.api.metadata.models.Database;
+import org.apache.atlas.odf.api.metadata.models.Schema;
+import org.apache.atlas.odf.api.metadata.models.Table;
+
+public class JDBCMetadataImporterImpl implements JDBCMetadataImporter {
+	Logger logger = Logger.getLogger(JDBCMetadataImporterImpl.class.getName());
+	private WritableMetadataStore mds;
+	WritableMetadataStoreUtils mdsUtils;
+
+	public JDBCMetadataImporterImpl() {
+		MetadataStore currentMds = new ODFFactory().create().getMetadataStore();
+		if (currentMds instanceof WritableMetadataStore) {
+			this.mds = (WritableMetadataStore) currentMds;
+		} else {
+			String errorText = "Cannot import data because metadata store ''{0}'' does not support the WritableMetadataStore interface.";
+			throw new RuntimeException(MessageFormat.format(errorText , currentMds.getClass()));
+		}
+	}
+
+	@Override
+	public JDBCMetadataImportResult importTables(JDBCConnection connection, String dbName, String schemaPattern, String tableNamePattern)  {
+		Connection conn = null;
+		try {
+			logger.log(Level.FINE, "Importing tables...");
+			conn = DriverManager.getConnection(connection.getJdbcConnectionString(), connection.getUser(), connection.getPassword());
+			DatabaseMetaData dmd = conn.getMetaData();
+			List<MetaDataObjectReference> matchingDatabases = mds.search(mds.newQueryBuilder().objectType("Database").simpleCondition("name", MetadataQueryBuilder.COMPARATOR.EQUALS, dbName).build());
+			Database odfDatabase = null;
+			if (!matchingDatabases.isEmpty()) {
+				odfDatabase = (Database) mds.retrieve(matchingDatabases.get(0));
+				mds.updateObject(odfDatabase);
+			} else {
+				odfDatabase = new Database();
+				List<MetaDataObjectReference> conList = new ArrayList<MetaDataObjectReference>();
+				odfDatabase.setConnections(conList);
+				odfDatabase.setName(dbName);
+				odfDatabase.setDbType(dmd.getDatabaseProductName());
+				odfDatabase.setDescription("Database " + dbName + " imported by JDBC2AtlasImporter on " + new Date());
+				mds.createObject(odfDatabase);
+			}
+			Map<String, Table> tableMap = new HashMap<String, Table>();
+			Map<String, Schema> schemaMap = new HashMap<>();
+			List<MetaDataObjectReference> schemaList = new ArrayList<MetaDataObjectReference>();
+			Set<String> tableNames = new HashSet<>();
+			ResultSet columnRS = dmd.getColumns(null, schemaPattern, tableNamePattern, null);
+			while (columnRS.next()) {
+				String columnName = columnRS.getString("COLUMN_NAME");
+				String schemaName = columnRS.getString("TABLE_SCHEM");
+				String tableName = columnRS.getString("TABLE_NAME");
+				String dataType = columnRS.getString("TYPE_NAME");
+				
+				Schema schema = schemaMap.get(schemaName);
+				if (schema == null) {
+					for (Schema s : mds.getSchemas(odfDatabase)) {
+						if (schemaName.equals(s.getName())) {
+							schema = s;
+							mds.updateObject(schema);
+							break;
+						}
+					}
+					if (schema == null) {
+						schema = new Schema();
+						schema.setName(schemaName);
+						schemaList.add(mds.createObject(schema));
+					}
+					schemaMap.put(schemaName, schema);
+					mds.addSchemaReference(odfDatabase, schema.getReference());
+				}
+				
+				String key = schemaName + "." + tableName;
+				Table tableObject = tableMap.get(key);
+				if (tableObject == null) {
+					for (Table t : mds.getTables(schema)) {
+						if (tableName.equals(t.getName())) {
+							tableObject = t;
+							mds.updateObject(tableObject);
+							break;
+						}
+					}
+					if (tableObject == null) {
+						tableObject = new Table();
+						tableObject.setName(tableName);
+						MetaDataObjectReference ref = mds.createObject(tableObject);
+						tableObject.setReference(ref);
+					}
+					tableNames.add(tableName);
+					tableMap.put(key, tableObject);
+					mds.addTableReference(schema, tableObject.getReference());
+				}
+				Column column = null;
+				for (Column c : mds.getColumns(tableObject)) {
+					if (columnName.equals(c.getName())) {
+						column = c;
+						break;
+					}
+				}
+				if (column == null) {
+					// Add new column only if a column with the same name does not exist
+					column = WritableMetadataStoreUtils.createColumn(columnName, dataType, null);
+					mds.createObject(column);
+				}
+				mds.addColumnReference(tableObject, column.getReference());
+			}
+			columnRS.close();
+			logger.log(Level.INFO, "Found {0} tables in database ''{1}'': ''{2}''", new Object[]{tableMap.keySet().size(), dbName, tableNames });
+
+			JDBCConnection odfConnection = null;
+			for (MetaDataObject c : mds.getConnections(odfDatabase)) {
+				if ((c instanceof JDBCConnection) && connection.getJdbcConnectionString().equals(((JDBCConnection) c).getJdbcConnectionString())) {
+					odfConnection = (JDBCConnection) c;
+					mds.updateObject(odfConnection);
+					break;
+				}
+			}
+			if (odfConnection == null) {
+				odfConnection = new JDBCConnection();
+				odfConnection.setJdbcConnectionString(connection.getJdbcConnectionString());
+				odfConnection.setUser(connection.getUser());
+				odfConnection.setPassword(connection.getPassword());
+				odfConnection.setDescription("JDBC connection for database " + dbName);
+				mds.createObject(odfConnection);
+			}
+			mds.addConnectionReference(odfDatabase, odfConnection.getReference());
+
+			mds.commit();
+			return new JDBCMetadataImportResult(dbName, odfDatabase.getReference().getId(), new ArrayList<String>( tableMap.keySet() ));
+		} catch (SQLException exc) {
+			throw new MetadataImportException(exc);
+		} finally {
+			if (conn != null) {
+				try {
+					conn.close();
+				} catch (SQLException e) {
+					e.printStackTrace();
+				}
+			}
+		}
+
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/6d19e129/odf/odf-core/src/main/java/org/apache/atlas/odf/core/metadata/SampleDataHelper.java
----------------------------------------------------------------------
diff --git a/odf/odf-core/src/main/java/org/apache/atlas/odf/core/metadata/SampleDataHelper.java b/odf/odf-core/src/main/java/org/apache/atlas/odf/core/metadata/SampleDataHelper.java
new file mode 100755
index 0000000..9169d8a
--- /dev/null
+++ b/odf/odf-core/src/main/java/org/apache/atlas/odf/core/metadata/SampleDataHelper.java
@@ -0,0 +1,67 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.atlas.odf.core.metadata;
+
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.text.MessageFormat;
+import java.util.Properties;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import org.apache.atlas.odf.core.Utils;
+
+public class SampleDataHelper {
+	private static Logger logger = Logger.getLogger(SampleDataHelper.class.getName());
+	private static final String SAMPLE_DATA_FILE_LIST = "sample-data-toc.properties";
+	private static final String SAMPLE_DATA_FILE_FOLDER = "org/apache/atlas/odf/core/metadata/internal/sampledata/";
+
+	public static void copySampleFiles() {
+		Properties toc = new Properties();
+		ClassLoader cl = SampleDataHelper.class.getClassLoader();
+		try {
+			toc.load(cl.getResourceAsStream(SAMPLE_DATA_FILE_FOLDER + SAMPLE_DATA_FILE_LIST));
+
+			for (String contentFileName : toc.stringPropertyNames()) {
+				logger.log(Level.INFO, "Processing sample file: {0}", contentFileName);
+				String url = copySampleDataFileContents(cl.getResourceAsStream(SAMPLE_DATA_FILE_FOLDER + contentFileName), contentFileName);
+				logger.log(Level.INFO, "Sample data file ''{0}'' copied to {1}", new Object[] { contentFileName, url });
+			}
+		} catch(IOException e) {
+			logger.log(Level.FINE, "An unexpected exception ocurred while connecting to Atlas", e);
+			String messageText = MessageFormat.format("Content file list {0} could not be accessed.", SAMPLE_DATA_FILE_FOLDER + SAMPLE_DATA_FILE_LIST);
+			throw new RuntimeException(messageText, e);
+		}
+		logger.log(Level.INFO, "All sample data files created");
+	}
+
+	private static String copySampleDataFileContents(InputStream is, String contentFile) throws IOException {
+		String url = null;
+		String target = null;
+		String os = System.getProperty("os.name").toLowerCase();
+		if (os.startsWith("windows")) {
+			url = "file://localhost/c:/tmp/" + contentFile;
+			target = "c:/tmp/" + contentFile;
+		} else {
+			url = "file:///tmp/" + contentFile;
+			target = "/tmp/" + contentFile;
+		}
+		String content = Utils.getInputStreamAsString(is, "UTF-8");
+		FileOutputStream fos = new FileOutputStream(target);
+		fos.write(content.getBytes("UTF-8"));
+		fos.close();
+		return url;
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/6d19e129/odf/odf-core/src/main/java/org/apache/atlas/odf/core/metadata/WritableMetadataStore.java
----------------------------------------------------------------------
diff --git a/odf/odf-core/src/main/java/org/apache/atlas/odf/core/metadata/WritableMetadataStore.java b/odf/odf-core/src/main/java/org/apache/atlas/odf/core/metadata/WritableMetadataStore.java
new file mode 100755
index 0000000..8cc56d6
--- /dev/null
+++ b/odf/odf-core/src/main/java/org/apache/atlas/odf/core/metadata/WritableMetadataStore.java
@@ -0,0 +1,111 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.atlas.odf.core.metadata;
+
+import org.apache.atlas.odf.api.metadata.MetaDataObjectReference;
+import org.apache.atlas.odf.api.metadata.MetadataStore;
+import org.apache.atlas.odf.api.metadata.models.DataFileFolder;
+import org.apache.atlas.odf.api.metadata.models.DataStore;
+import org.apache.atlas.odf.api.metadata.models.Database;
+import org.apache.atlas.odf.api.metadata.models.MetaDataObject;
+import org.apache.atlas.odf.api.metadata.models.RelationalDataSet;
+import org.apache.atlas.odf.api.metadata.models.Schema;
+
+/**
+ * Interface to be implemented by metadata stores that support write access, i.e. the creation of new metadata objects,
+ * update of existing metadata objects, and creation of references between metadata objects. The new or updated objects
+ * and references remain in a staging area until they are committed. This is necessary in order to avoid inconsistent
+ * states during comprehensive write operations.  
+ * 
+ *
+ */
+public interface WritableMetadataStore extends MetadataStore {
+
+	/**
+	 * Add a new metadata object to the staging area of the metadata store.
+	 * If the object already has a reference, the reference id might be changed when committing the new object.  
+	 *
+	 * @param metaDataObject Metadata object
+	 */
+	public MetaDataObjectReference createObject(MetaDataObject metaDataObject);
+
+	/**
+	 * Add an updated metadata object to the staging area of the metadata store. The object reference must point to an
+	 * existing object in the metadata store.  
+	 *
+	 * @param metaDataObject Metadata object
+	 */
+	public void updateObject(MetaDataObject metaDataObject);
+
+	/**
+	 * Apply all staged changes to the metadata store.
+	 *
+	 */
+	public void commit();
+
+	/**
+	 * Add a data file reference to an updated or new data file folder in the staging area.
+	 * The new reference will be merged with existing references during the commit operation.
+	 *
+	 * @param folder Data file folder to add the reference to
+	 * @param reference Reference of the data file to be added to the folder 
+	 */
+	public void addDataFileReference(DataFileFolder folder, MetaDataObjectReference reference);
+
+	/**
+	 * Add a data file folder reference to an updated or new data file folder in the staging area.
+	 * The new reference will be merged with existing references during the commit operation.
+	 *
+	 * @param folder Data file folder to add the reference to
+	 * @param reference Reference of the data file folder to be added to the folder 
+	 */
+	public void addDataFileFolderReference(DataFileFolder folder, MetaDataObjectReference reference);
+
+	/**
+	 * Add a schema reference to an updated or new database in the staging area.
+	 * The new reference will be merged with existing references during the commit operation.
+	 *
+	 * @param database Database to add the reference to
+	 * @param reference Reference of the schema to be added to the database 
+	 */
+	public void addSchemaReference(Database database, MetaDataObjectReference reference);
+
+	/**
+	 * Add a table reference to an updated or new schema in the staging area.
+	 * The new reference will be merged with existing references during the commit operation.
+	 *
+	 * @param schema Schema to add the reference to
+	 * @param reference Reference of the table to be added to the schema 
+	 */
+	public void addTableReference(Schema schema, MetaDataObjectReference reference);
+
+	/**
+	 * Add a column reference to an updated or new relational data set in the staging area.
+	 * The new reference will be merged with existing references during the commit operation.
+	 *
+	 * @param relationalDataSet Relational data set to add the reference to
+	 * @param reference Reference of the column to be added to the relational data set 
+	 */
+	public void addColumnReference(RelationalDataSet relationalDataSet, MetaDataObjectReference reference);
+
+	/**
+	 * Add a connection reference to an updated or new data store in the staging area.
+	 * The new reference will be merged with existing references during the commit operation.
+	 *
+	 * @param dataStore Data store set to add the reference to
+	 * @param reference Reference of the connection to be added to the data store 
+	 */
+	public void addConnectionReference(DataStore dataStore, MetaDataObjectReference reference);
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/6d19e129/odf/odf-core/src/main/java/org/apache/atlas/odf/core/metadata/WritableMetadataStoreBase.java
----------------------------------------------------------------------
diff --git a/odf/odf-core/src/main/java/org/apache/atlas/odf/core/metadata/WritableMetadataStoreBase.java b/odf/odf-core/src/main/java/org/apache/atlas/odf/core/metadata/WritableMetadataStoreBase.java
new file mode 100755
index 0000000..d5f8772
--- /dev/null
+++ b/odf/odf-core/src/main/java/org/apache/atlas/odf/core/metadata/WritableMetadataStoreBase.java
@@ -0,0 +1,117 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.atlas.odf.core.metadata;
+
+import java.text.MessageFormat;
+import java.util.ArrayList;
+import java.util.LinkedHashMap;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import org.apache.atlas.odf.api.metadata.InternalMetadataStoreBase;
+import org.apache.atlas.odf.api.metadata.MetaDataObjectReference;
+import org.apache.atlas.odf.api.metadata.MetadataStoreException;
+import org.apache.atlas.odf.api.metadata.StoredMetaDataObject;
+import org.apache.atlas.odf.api.metadata.models.DataFileFolder;
+import org.apache.atlas.odf.api.metadata.models.DataStore;
+import org.apache.atlas.odf.api.metadata.models.Database;
+import org.apache.atlas.odf.api.metadata.models.MetaDataObject;
+import org.apache.atlas.odf.api.metadata.models.RelationalDataSet;
+import org.apache.atlas.odf.api.metadata.models.Schema;
+
+/**
+ * Common base for writable metadata stores.
+ * Note that the methods implemented by InternalMetadataStoreBase are not necessarily used by all classes that extend WritableMetadataStoreBase.
+ * (If Java supported multiple inheritance, WritableMetadataStoreBase and InternalMetadataStoreBase would be independent classes.)
+ * 
+ * 
+ */
+public abstract class WritableMetadataStoreBase extends InternalMetadataStoreBase implements WritableMetadataStore {
+	private static Logger logger = Logger.getLogger(WritableMetadataStoreBase.class.getName());
+
+	abstract protected LinkedHashMap<String, StoredMetaDataObject> getStagedObjects();
+
+	private void addReference(MetaDataObject metaDataObject, String attributeName, MetaDataObjectReference reference) {
+		if (metaDataObject.getReference() == null) {
+			throw new MetadataStoreException("Cannot add a reference because metadata object reference is null.");
+		}
+		StoredMetaDataObject obj = this.getStagedObjects().get(metaDataObject.getReference().getId());
+		if (obj != null) {
+			if (obj.getReferenceMap().get(attributeName) == null) {
+				obj.getReferenceMap().put(attributeName, new ArrayList<MetaDataObjectReference>());
+			}
+			obj.getReferenceMap().get(attributeName).add(reference);
+		} else {
+			String errorMessage = MessageFormat.format("A staged object with id ''{0}'' does not exist. Create or update the object before adding a reference.", metaDataObject.getReference().getId());
+			throw new MetadataStoreException(errorMessage);
+		}
+	}
+
+	@Override
+	public void addDataFileReference(DataFileFolder folder, MetaDataObjectReference reference) {
+		addReference(folder, ODF_DATAFILES_REFERENCE, reference);
+	}
+
+	@Override
+	public void addDataFileFolderReference(DataFileFolder folder, MetaDataObjectReference reference) {
+		addReference(folder, ODF_DATAFILEFOLDERS_REFERENCE, reference);
+	}
+
+	@Override
+	public void addSchemaReference(Database database, MetaDataObjectReference reference) {
+		addReference(database, ODF_SCHEMAS_REFERENCE, reference);
+	}
+
+	@Override
+	public void addTableReference(Schema schema, MetaDataObjectReference reference) {
+		addReference(schema, ODF_TABLES_REFERENCE, reference);
+	}
+
+	@Override
+	public void addColumnReference(RelationalDataSet relationalDataSet, MetaDataObjectReference reference) {
+		addReference(relationalDataSet, ODF_COLUMNS_REFERENCE, reference);
+	}
+
+	@Override
+	public void addConnectionReference(DataStore dataStore, MetaDataObjectReference reference) {
+		addReference(dataStore, ODF_CONNECTIONS_REFERENCE, reference);
+	}
+
+	@Override
+	public MetaDataObjectReference createObject(MetaDataObject metaDataObject) {
+		if (metaDataObject.getReference() == null) {
+			metaDataObject.setReference(WritableMetadataStoreUtils.generateMdoRef(this));
+		}
+		this.getStagedObjects().put(metaDataObject.getReference().getId(), new StoredMetaDataObject(metaDataObject));
+		logger.log(Level.FINE, "Added new new object of type ''{0}'' with id ''{1}'' to staging area.",
+				new Object[] { metaDataObject.getClass().getSimpleName(), metaDataObject.getReference().getId() });
+		return metaDataObject.getReference();
+	}
+
+	@Override
+	public void updateObject(MetaDataObject metaDataObject) {
+		if (metaDataObject.getReference() == null) {
+			throw new MetadataStoreException("Reference attribute cannot be ''null'' when updating a metadata object.");
+		}
+		if (retrieve(metaDataObject.getReference()) == null) {
+			throw new MetadataStoreException(
+					MessageFormat.format("An object wih id ''{0}'' does not extist in this metadata store.",
+							metaDataObject.getReference().getId()));
+		}
+		this.getStagedObjects().put(metaDataObject.getReference().getId(), new StoredMetaDataObject(metaDataObject));
+		logger.log(Level.FINE, "Added updated object of type ''{0}'' with id ''{1}'' to staging area.",
+				new Object[] { metaDataObject.getClass().getSimpleName(), metaDataObject.getReference().getId() });
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/6d19e129/odf/odf-core/src/main/java/org/apache/atlas/odf/core/metadata/WritableMetadataStoreUtils.java
----------------------------------------------------------------------
diff --git a/odf/odf-core/src/main/java/org/apache/atlas/odf/core/metadata/WritableMetadataStoreUtils.java b/odf/odf-core/src/main/java/org/apache/atlas/odf/core/metadata/WritableMetadataStoreUtils.java
new file mode 100755
index 0000000..808b4d2
--- /dev/null
+++ b/odf/odf-core/src/main/java/org/apache/atlas/odf/core/metadata/WritableMetadataStoreUtils.java
@@ -0,0 +1,297 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.atlas.odf.core.metadata;
+
+import java.text.MessageFormat;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.UUID;
+
+import org.apache.atlas.odf.api.metadata.MetaDataObjectReference;
+import org.apache.atlas.odf.api.metadata.MetadataStore;
+import org.apache.atlas.odf.api.metadata.MetadataStoreException;
+import org.apache.atlas.odf.api.metadata.models.Annotation;
+import org.apache.atlas.odf.api.metadata.models.ClassificationAnnotation;
+import org.apache.atlas.odf.api.metadata.models.JDBCConnection;
+import org.apache.atlas.odf.api.metadata.models.JDBCConnectionInfo;
+import org.apache.atlas.odf.api.metadata.models.MetaDataObject;
+import org.apache.atlas.odf.api.metadata.models.BusinessTerm;
+import org.apache.atlas.odf.api.metadata.models.Column;
+import org.apache.atlas.odf.api.metadata.models.Connection;
+import org.apache.atlas.odf.api.metadata.models.ConnectionInfo;
+import org.apache.atlas.odf.api.metadata.models.DataFile;
+import org.apache.atlas.odf.api.metadata.models.DataFileFolder;
+import org.apache.atlas.odf.api.metadata.models.DataSet;
+import org.apache.atlas.odf.api.metadata.models.DataStore;
+import org.apache.atlas.odf.api.metadata.models.Database;
+import org.apache.atlas.odf.api.metadata.models.Document;
+import org.apache.atlas.odf.api.metadata.models.RelationalDataSet;
+import org.apache.atlas.odf.api.metadata.models.Schema;
+import org.apache.atlas.odf.api.metadata.models.Table;
+import org.apache.atlas.odf.api.metadata.models.UnknownDataSet;
+import org.apache.atlas.odf.api.metadata.models.ProfilingAnnotation;
+import org.apache.atlas.odf.api.metadata.models.RelationshipAnnotation;
+
+/**
+ * Utilities to be used for implementing the {@link WritableMetadataStore} interface, i.e. for
+ * adding support for an additional writable metadata store to ODF.
+ *
+ *
+ */
+public class WritableMetadataStoreUtils {
+
+	/**
+	 * Utility method for creating an populating a new {@link Column} object. The object will have a generated reference
+	 * that uses a random id and points to a given metadata store.
+	 *
+	 * @param mds Metadata store to which the reference of the new column should point.
+	 * @param name Name of the new column
+	 * @param dataType Data type of the new column
+	 * @param description Description of the new column
+	 * @return The resulting column object
+	 */
+	public static Column createColumn(String name, String dataType, String description) {
+		Column column = new Column();
+		column.setName(name);
+		column.setDescription(description);
+		column.setDataType(dataType);
+		return column;
+	}
+
+	public static String getFileUrl(String shortFileName) {
+		if (System.getProperty("os.name").toLowerCase().startsWith("windows")) {
+			return "file://localhost/c:/tmp/" + shortFileName;
+		} else {
+			return "file:///tmp/" + shortFileName;
+		}
+	}
+
+	/**
+	 * Utility method for genrating a new metadata object reference that uses a random id and points
+	 * to a given metadata store.
+	 *
+	 * @param mds Metadata store to which the new reference should point
+	 * @return The resulting metadata object reference
+	 */
+	public static MetaDataObjectReference generateMdoRef(MetadataStore mds) {
+		MetaDataObjectReference ref = new MetaDataObjectReference();
+		ref.setId(UUID.randomUUID().toString());
+		ref.setRepositoryId(mds.getRepositoryId());
+		ref.setUrl("");
+		return ref;
+	}
+
+	/**
+	 * Utility method providing the list of ODF example objects used for the ODF integration tests.
+	 * The references of the example objects point to a given metadata store.
+	 *
+	 * @param mds Metadata store
+	 * @return List of example objects
+	 */
+	public static void createSampleDataObjects(WritableMetadataStore mds) {
+		DataFile bankClients = new DataFile();
+		bankClients.setName("BankClientsShort");
+		bankClients.setDescription("A reduced sample data file containing bank clients.");
+		bankClients.setUrlString(getFileUrl("bank-clients-short.csv"));
+		mds.createObject(bankClients);
+		mds.addColumnReference(bankClients, mds.createObject(createColumn("CLIENT_ID", "string", "A client ID (column 1)")));
+		mds.addColumnReference(bankClients, mds.createObject(createColumn("NAME", "string", "A client name (column 2)")));
+		mds.addColumnReference(bankClients, mds.createObject(createColumn("ADDRESS", "string", "A client's address (column 3)")));
+		mds.addColumnReference(bankClients, mds.createObject(createColumn("ZIP", "string", "Zip code (column 4)")));
+		mds.addColumnReference(bankClients, mds.createObject(createColumn("AGE", "double", "Age in years (column 5)")));
+		mds.addColumnReference(bankClients, mds.createObject(createColumn("GENDER", "string", "Person gender (column 6)")));
+		mds.addColumnReference(bankClients, mds.createObject(createColumn("MARITAL_STATUS", "string", "Marital status (column 7)")));
+		mds.addColumnReference(bankClients, mds.createObject(createColumn("PROFESSION", "string", "Profession (column 8)")));
+		mds.addColumnReference(bankClients, mds.createObject(createColumn("NBR_YEARS_CLI", "double", "The number of years how long the client has been with us (column 9)")));
+		mds.addColumnReference(bankClients, mds.createObject(createColumn("SAVINGS_ACCOUNT", "string", "Savings account number (column 10)")));
+		mds.addColumnReference(bankClients, mds.createObject(createColumn("ONLINE_ACCESS", "string", "A flag indicating if the client accesses her accounts online (column 11)")));
+		mds.addColumnReference(bankClients, mds.createObject(createColumn("JOINED_ACCOUNTS", "string", "A flag indicating if the client has joined accounts (column 12)")));
+		mds.addColumnReference(bankClients, mds.createObject(createColumn("BANKCARD", "string", "A flag indicating if the client has a bankcard (column 13)")));
+		mds.addColumnReference(bankClients, mds.createObject(createColumn("AVERAGE_BALANCE", "double", "The average balance over the last year (column 14)")));
+		mds.addColumnReference(bankClients, mds.createObject(createColumn("ACCOUNT_ID", "int", "Account Id / number (column 15)")));
+		mds.addColumnReference(bankClients, mds.createObject(createColumn("ACCOUNT_TYPE", "string", "Type of account (column 16)")));
+		mds.addColumnReference(bankClients, mds.createObject(createColumn("EMAIL", "string", "A flag indicating if the client has joined accounts (column 17)")));
+		mds.addColumnReference(bankClients, mds.createObject(createColumn("CCN", "string", "Credit card number (column 18)")));
+		mds.addColumnReference(bankClients, mds.createObject(createColumn("PHONE1", "string", "Primary hone number (column 19)")));
+		mds.addColumnReference(bankClients, mds.createObject(createColumn("PHONE2", "string", "Secondary phone number (column 20)")));
+		mds.addColumnReference(bankClients, mds.createObject(createColumn("CC", "string", "CC indicator (column 21)")));
+		mds.addColumnReference(bankClients, mds.createObject(createColumn("CONTACT", "string", "Contact in case of emergency (column 22)")));
+
+		DataFile simpleExampleTable = new DataFile();
+		simpleExampleTable.setName("SimpleExampleTable");
+		simpleExampleTable.setDescription("A very simple example document referring to a local file.");
+		simpleExampleTable.setUrlString(getFileUrl("simple-example-table.csv"));
+		mds.createObject(simpleExampleTable);
+		mds.addColumnReference(simpleExampleTable, mds.createObject(createColumn("ColumnName1", "string", null)));
+		mds.addColumnReference(simpleExampleTable, mds.createObject(createColumn("ColumnName2", "int", null)));
+
+		Document simpleExampleURLDocument = new Document();
+		simpleExampleURLDocument.setName("Simple URL example document");
+		simpleExampleURLDocument.setDescription("A very simple example document referring to a publicly available URL");
+		simpleExampleURLDocument.setUrlString("https://www.wikipedia.org");
+		simpleExampleURLDocument.setEncoding("ASCII");
+		mds.createObject(simpleExampleURLDocument);
+
+		Document simpleExampleDocument = new Document();
+		simpleExampleDocument.setName("Simple local example document");
+		simpleExampleDocument.setDescription("A very simple example document referring to a local file");
+		simpleExampleDocument.setUrlString(getFileUrl("simple-example-document.txt"));
+		simpleExampleDocument.setEncoding("ASCII");
+		mds.createObject(simpleExampleDocument);
+
+		BusinessTerm bankClientTerm1 = new BusinessTerm();
+		bankClientTerm1.setName("Address");
+		bankClientTerm1.setDescription("The mail address of a person or organization");
+		bankClientTerm1.setAbbreviations(Arrays.asList(new String[] { "Addr" }));
+		bankClientTerm1.setExample("257 Great Lister Street P O BOX 1107 Birmingham");
+		bankClientTerm1.setUsage("Outgoing mail (physical).");
+		mds.createObject(bankClientTerm1);
+
+		BusinessTerm bankClientTerm2a = new BusinessTerm();
+		bankClientTerm2a.setName("Marital Status");
+		bankClientTerm2a.setDescription("The marital status of a person (single, married, divorced, or other).");
+		bankClientTerm2a.setAbbreviations(Arrays.asList(new String[] { "MS","MAST" }));
+		bankClientTerm2a.setExample("single");
+		bankClientTerm2a.setUsage("Contracting");
+		mds.createObject(bankClientTerm2a);
+
+		BusinessTerm bankClientTerm2b = new BusinessTerm();
+		bankClientTerm2b.setReference(generateMdoRef(mds));
+		bankClientTerm2b.setName("Marital Status");
+		bankClientTerm2b.setDescription("2nd term representing the marital status of a person.");
+		bankClientTerm2b.setAbbreviations(Arrays.asList(new String[] { "MS","MAST" }));
+		bankClientTerm2b.setExample("married");
+		bankClientTerm2b.setUsage("Human Resources");
+		mds.createObject(bankClientTerm2b);
+
+		BusinessTerm bankClientTerm3 = new BusinessTerm();
+		bankClientTerm3.setName("AVG Balance");
+		bankClientTerm3.setDescription("The average balance of an account over an amount of time, typically a year. Unit: Dollars.");
+		bankClientTerm3.setAbbreviations(Arrays.asList(new String[] { "AB","AVGB","AVGBAL" }));
+		bankClientTerm3.setExample("1000");
+		bankClientTerm3.setUsage("Contracting");
+		bankClientTerm3.setOriginRef("test-pointer-to-igc");
+		bankClientTerm3.setReplicaRefs(Arrays.asList(new String[] { "first-replica-pointer", "second-replica-pointer" }));
+		mds.createObject(bankClientTerm3);
+
+		BusinessTerm bankClientTerm4 = new BusinessTerm();
+		bankClientTerm4.setName("LASTNAME");
+		bankClientTerm4.setDescription("Last name of a person");
+		bankClientTerm4.setAbbreviations(Arrays.asList(new String[] { "LASTNME" }));
+		bankClientTerm4.setExample("1000");
+		bankClientTerm4.setUsage("Contracting");
+		mds.createObject(bankClientTerm4);
+
+		BusinessTerm bankClientTerm5a = new BusinessTerm();
+		bankClientTerm5a.setReference(generateMdoRef(mds));
+		bankClientTerm5a.setName("Credit Card Number");
+		bankClientTerm5a.setDescription("Credit card number of a customer");
+		bankClientTerm5a.setAbbreviations(Arrays.asList(new String[] { "CreNum", "CCN" }));
+		bankClientTerm5a.setExample("1234567");
+		bankClientTerm5a.setUsage("Contracting");
+		mds.createObject(bankClientTerm5a);
+
+		BusinessTerm bankClientTerm5b = new BusinessTerm();
+		bankClientTerm5b.setReference(generateMdoRef(mds));
+		bankClientTerm5b.setName("Credit Card Number");
+		bankClientTerm5b.setDescription("Credit card number of an employee");
+		bankClientTerm5b.setAbbreviations(Arrays.asList(new String[] {}));      // this one has no abbreviations
+		bankClientTerm5b.setExample("1234567");
+		bankClientTerm5b.setUsage("Human Resources");
+		mds.createObject(bankClientTerm5b);
+
+		BusinessTerm bankClientTermDataSetLevel = new BusinessTerm();
+		bankClientTermDataSetLevel.setName("Bank Clients");
+		bankClientTermDataSetLevel.setDescription("The only purpose of this term is to match the name of the data set BankClientsShort");
+		bankClientTermDataSetLevel.setAbbreviations(Arrays.asList(new String[] { "BC" }));
+		bankClientTermDataSetLevel.setExample("<none>");
+		bankClientTermDataSetLevel.setUsage("Integration testing of TermMatcher discovery service. Yields confidence value of 56.");
+		mds.createObject(bankClientTermDataSetLevel);
+
+		mds.commit();
+	}
+
+	/**
+	 * Utility method that returns the list of ODF base types that need to be supported by a metadata store in order to be used with ODF.
+	 *
+	 * @return List of the ODF base types
+	 */
+	public static final List<Class<?>> getBaseTypes() {
+		List<Class<?>> typeList = new ArrayList<Class<?>>();
+		typeList.add(MetaDataObject.class);
+		typeList.add(DataStore.class);
+		typeList.add(Database.class);
+		typeList.add(Connection.class);
+		typeList.add(JDBCConnection.class);
+		typeList.add(DataSet.class);
+		typeList.add(UnknownDataSet.class);
+		typeList.add(RelationalDataSet.class);
+		typeList.add(Column.class);
+		typeList.add(Table.class);
+		typeList.add(Schema.class);
+		typeList.add(DataFileFolder.class);
+		typeList.add(DataFile.class);
+		typeList.add(Document.class);
+		typeList.add(Annotation.class);
+		typeList.add(ProfilingAnnotation.class);
+		typeList.add(ClassificationAnnotation.class);
+		typeList.add(RelationshipAnnotation.class);
+		typeList.add(BusinessTerm.class);
+		return typeList;
+	}
+
+	/**
+	* Utility method that returns a connection info object for a given information asset.
+	*
+	* @return Connection info object
+	*/
+    public static ConnectionInfo getConnectionInfo(MetadataStore mds, MetaDataObject informationAsset) {
+		if (informationAsset instanceof Table) {
+			Schema schema = getParentOfType(mds, informationAsset, Schema.class);
+			Database database = getParentOfType(mds, schema, Database.class);
+			JDBCConnectionInfo jdbcConnectionInfo = new JDBCConnectionInfo();
+			jdbcConnectionInfo.setSchemaName(schema.getName());
+			jdbcConnectionInfo.setTableName(informationAsset.getName());
+			jdbcConnectionInfo.setConnections(mds.getConnections(database));
+			jdbcConnectionInfo.setAssetReference(informationAsset.getReference());
+            return jdbcConnectionInfo;
+        }
+		return null;
+    };
+
+    /**
+	 * Utility to return the parent of a metadata object casted to a given type.
+	 * An exception is thrown if the types don't match.
+	 *
+	 * @param mds Metadata store
+	 * @param metaDataObject Metadata object
+	 * @param type Class to which the parent should be casted
+	 * @return Parent object of the given metadata object
+	 */
+	public static <T> T getParentOfType(MetadataStore mds, MetaDataObject metaDataObject, Class<T> type) {
+		MetaDataObject parent = mds.getParent(metaDataObject);
+		if (parent == null) {
+			String errorMessage = MessageFormat.format("Cannot extract connection info for object id ''{0}'' because the parent object is null.", metaDataObject.getReference().getId());
+			throw new MetadataStoreException(errorMessage);
+		}
+		if (!type.isInstance(parent)) {
+			String errorMessage = MessageFormat.format("Parent of object ''{0}'' is expected to be of type ''{1}'' but is ''{2}''",
+					new Object[] { metaDataObject.getReference().getId(), type.getSimpleName(), parent.getClass().getName() });
+	        throw new MetadataStoreException(errorMessage);
+		}
+		return type.cast(parent);
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/6d19e129/odf/odf-core/src/main/java/org/apache/atlas/odf/core/notification/DefaultNotificationManager.java
----------------------------------------------------------------------
diff --git a/odf/odf-core/src/main/java/org/apache/atlas/odf/core/notification/DefaultNotificationManager.java b/odf/odf-core/src/main/java/org/apache/atlas/odf/core/notification/DefaultNotificationManager.java
new file mode 100755
index 0000000..f2f95ff
--- /dev/null
+++ b/odf/odf-core/src/main/java/org/apache/atlas/odf/core/notification/DefaultNotificationManager.java
@@ -0,0 +1,26 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.atlas.odf.core.notification;
+
+import java.util.ArrayList;
+import java.util.List;
+
+public class DefaultNotificationManager implements NotificationManager {
+
+	@Override
+	public List<NotificationListener> getListeners() {
+		return new ArrayList<>(); 
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/6d19e129/odf/odf-core/src/main/java/org/apache/atlas/odf/core/notification/NotificationListener.java
----------------------------------------------------------------------
diff --git a/odf/odf-core/src/main/java/org/apache/atlas/odf/core/notification/NotificationListener.java b/odf/odf-core/src/main/java/org/apache/atlas/odf/core/notification/NotificationListener.java
new file mode 100755
index 0000000..fb6c37a
--- /dev/null
+++ b/odf/odf-core/src/main/java/org/apache/atlas/odf/core/notification/NotificationListener.java
@@ -0,0 +1,35 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.atlas.odf.core.notification;
+
+import org.apache.atlas.odf.api.OpenDiscoveryFramework;
+
+public interface NotificationListener {
+	
+	/**
+	 * A human readable name for this listener. Used for logging and management.
+	 */
+	String getName();
+	
+	/**
+	 * The Kafka topic to listen on.
+	 */
+	String getTopicName();
+
+	/**
+	 * This is called whenever an event arrives. Typically, one would initiate
+	 * some analysis request on the passed odf instance.
+	 */
+	void onEvent(String event, OpenDiscoveryFramework odf);
+}

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/6d19e129/odf/odf-core/src/main/java/org/apache/atlas/odf/core/notification/NotificationManager.java
----------------------------------------------------------------------
diff --git a/odf/odf-core/src/main/java/org/apache/atlas/odf/core/notification/NotificationManager.java b/odf/odf-core/src/main/java/org/apache/atlas/odf/core/notification/NotificationManager.java
new file mode 100755
index 0000000..ce4d8ff
--- /dev/null
+++ b/odf/odf-core/src/main/java/org/apache/atlas/odf/core/notification/NotificationManager.java
@@ -0,0 +1,26 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.atlas.odf.core.notification;
+
+import java.util.List;
+
+/**
+ * Provide implementations in the odf-implementations.properties file(s).
+ *
+ */
+public interface NotificationManager {
+	
+	 List<NotificationListener> getListeners();
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/6d19e129/odf/odf-core/src/main/java/org/apache/atlas/odf/core/settings/SettingsManagerImpl.java
----------------------------------------------------------------------
diff --git a/odf/odf-core/src/main/java/org/apache/atlas/odf/core/settings/SettingsManagerImpl.java b/odf/odf-core/src/main/java/org/apache/atlas/odf/core/settings/SettingsManagerImpl.java
new file mode 100755
index 0000000..6b33cdd
--- /dev/null
+++ b/odf/odf-core/src/main/java/org/apache/atlas/odf/core/settings/SettingsManagerImpl.java
@@ -0,0 +1,137 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.atlas.odf.core.settings;
+
+import java.util.Map;
+import java.util.Properties;
+
+import org.apache.wink.json4j.JSONException;
+import org.apache.wink.json4j.JSONObject;
+
+import org.apache.atlas.odf.core.ODFInternalFactory;
+import org.apache.atlas.odf.core.configuration.ConfigContainer;
+import org.apache.atlas.odf.core.configuration.ConfigManager;
+import org.apache.atlas.odf.api.settings.SettingsManager;
+import org.apache.atlas.odf.api.settings.KafkaConsumerConfig;
+import org.apache.atlas.odf.api.settings.KafkaMessagingConfiguration;
+import org.apache.atlas.odf.api.settings.MessagingConfiguration;
+import org.apache.atlas.odf.api.settings.ODFSettings;
+import org.apache.atlas.odf.api.settings.validation.ValidationException;
+import org.apache.atlas.odf.json.JSONUtils;
+
+/**
+*
+* External Java API for reading and updating ODF settings
+*
+*/
+public class SettingsManagerImpl implements SettingsManager {
+	public static final String HIDDEN_PASSWORD_IDENTIFIER = "***hidden***";
+	private ConfigManager configManager;
+
+	public SettingsManagerImpl() {
+		ODFInternalFactory f = new ODFInternalFactory();
+		configManager = f.create(ConfigManager.class);
+	}
+
+	/**
+	 * Retrieve Kafka consumer properties
+	 * @return Current Kafka consumer properties
+	 */
+	public Properties getKafkaConsumerProperties() {
+		Properties props = new Properties();
+		MessagingConfiguration messagingConfig = getODFSettings().getMessagingConfiguration();
+		if (!(messagingConfig instanceof KafkaMessagingConfiguration)) {
+			return props;
+		}
+		KafkaConsumerConfig config = ((KafkaMessagingConfiguration) messagingConfig).getKafkaConsumerConfig();
+		try {
+			JSONObject configJSON = JSONUtils.toJSONObject(config);
+			for (Object key : configJSON.keySet()) {
+				props.setProperty((String) key, String.valueOf(configJSON.get(key)));
+			}
+		} catch (JSONException e) {
+			throw new RuntimeException("The kafka consumer config could not be parsed!", e);
+		}
+		return props;
+	}
+
+	/**
+	 * Retrieve Kafka producer properties
+	 * @return Current Kafka producer properties
+	 */
+	public Properties getKafkaProducerProperties() {
+		// Currently no producer properties are editable and therefore not
+		// stored in the config file
+		Properties props = new Properties();
+		props.setProperty("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
+		props.setProperty("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
+		return props;
+	}
+
+	/**
+	 * Retrieve overall ODF settings including plain passwords
+	 * @return Current ODF settings
+	 */
+	public ODFSettings getODFSettings() {
+		return configManager.getConfigContainer().getOdf();
+	}
+
+	/**
+	 * Retrieve overall ODF settings with hidden passwords
+	 * @return Current ODF settings
+	 */
+	public ODFSettings getODFSettingsHidePasswords() {
+		return this.configManager.getConfigContainerHidePasswords().getOdf();
+	}
+
+	/**
+	 * Update ODF settings
+	 * 
+	 * Passwords provided as plain text will be encrypted. If HIDDEN_PASSWORD_IDENTIFIER
+	 * is provided instead of a password, the stored password will remain unchanged.
+	 * 
+	 * @param Updated ODF settings
+	 */
+	public void updateODFSettings(ODFSettings update) throws ValidationException {
+		ConfigContainer cont = new ConfigContainer();
+		cont.setOdf(update);
+		this.configManager.updateConfigContainer(cont);
+	}
+
+	/**
+	 * Reset ODF settings to the defaults
+	 */
+	public void resetODFSettings() {
+		new ODFInternalFactory().create(ConfigManager.class).resetConfigContainer();
+	}
+
+	/**
+	 * Retrieve user defined ODF properties
+	 * @return Map of user defined ODF properties
+	 */
+	public Map<String, Object> getUserDefinedConfig() {
+		return getODFSettings().getUserDefined();
+	}
+
+	/**
+	 * Update user defined ODF properties
+	 * @param Map of user defined ODF properties
+	 * @throws ValidationException
+	 */
+	public void updateUserDefined(Map<String, Object> update) throws ValidationException {
+		ODFSettings odfConfig = new ODFSettings();
+		odfConfig.setUserDefined(update);
+		updateODFSettings(odfConfig);
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/6d19e129/odf/odf-core/src/main/java/org/apache/atlas/odf/core/store/ODFConfigurationStorage.java
----------------------------------------------------------------------
diff --git a/odf/odf-core/src/main/java/org/apache/atlas/odf/core/store/ODFConfigurationStorage.java b/odf/odf-core/src/main/java/org/apache/atlas/odf/core/store/ODFConfigurationStorage.java
new file mode 100755
index 0000000..5bfae91
--- /dev/null
+++ b/odf/odf-core/src/main/java/org/apache/atlas/odf/core/store/ODFConfigurationStorage.java
@@ -0,0 +1,31 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.atlas.odf.core.store;
+
+import org.apache.atlas.odf.core.configuration.ConfigContainer;
+
+public interface ODFConfigurationStorage {
+	
+	public void storeConfig(ConfigContainer container);
+
+	public ConfigContainer getConfig(ConfigContainer defaultConfiguration);
+
+	public void onConfigChange(ConfigContainer container);
+
+	public void addPendingConfigChange(String changeId);
+
+	public void removePendingConfigChange(String changeId);
+
+	public boolean isConfigChangePending(String changeId);
+}

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/6d19e129/odf/odf-core/src/main/resources/org/apache/atlas/odf/core/internal/odf-default-implementation.properties
----------------------------------------------------------------------
diff --git a/odf/odf-core/src/main/resources/org/apache/atlas/odf/core/internal/odf-default-implementation.properties b/odf/odf-core/src/main/resources/org/apache/atlas/odf/core/internal/odf-default-implementation.properties
new file mode 100755
index 0000000..c9c21d0
--- /dev/null
+++ b/odf/odf-core/src/main/resources/org/apache/atlas/odf/core/internal/odf-default-implementation.properties
@@ -0,0 +1,30 @@
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+# default implementations
+
+AnalysisRequestTrackerStore=org.apache.atlas.odf.core.controlcenter.DefaultStatusQueueStore
+ThreadManager=org.apache.atlas.odf.core.controlcenter.DefaultThreadManager
+MetadataStore=org.apache.atlas.odf.core.metadata.DefaultMetadataStore
+AnnotationStore=org.apache.atlas.odf.core.controlcenter.DefaultStatusQueueStore
+JDBCMetadataImporter=org.apache.atlas.odf.core.metadata.JDBCMetadataImporterImpl
+org.apache.atlas.odf.core.connectivity.DataSetRetriever=com.ibm.iis.odf.core.connectivity.DataSetRetrieverImpl
+SparkServiceExecutor=SparkServiceExecutorImpl
+Environment=org.apache.atlas.odf.core.StandaloneEnvironment
+AnalysisManager=org.apache.atlas.odf.core.analysis.AnalysisManagerImpl
+EngineManager=org.apache.atlas.odf.core.engine.EngineManagerImpl
+DiscoveryServiceManager=org.apache.atlas.odf.core.discoveryservice.DiscoveryServiceManagerImpl
+SettingsManager=org.apache.atlas.odf.core.settings.SettingsManagerImpl
+MessageEncryption=org.apache.atlas.odf.core.messaging.DefaultMessageEncryption
+TransactionContextExecutor=org.apache.atlas.odf.core.controlcenter.DefaultTransactionContextExecutor
+NotificationManager=org.apache.atlas.odf.core.notification.DefaultNotificationManager

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/6d19e129/odf/odf-core/src/main/resources/org/apache/atlas/odf/core/internal/odf-initial-configuration.json
----------------------------------------------------------------------
diff --git a/odf/odf-core/src/main/resources/org/apache/atlas/odf/core/internal/odf-initial-configuration.json b/odf/odf-core/src/main/resources/org/apache/atlas/odf/core/internal/odf-initial-configuration.json
new file mode 100755
index 0000000..0a81029
--- /dev/null
+++ b/odf/odf-core/src/main/resources/org/apache/atlas/odf/core/internal/odf-initial-configuration.json
@@ -0,0 +1,28 @@
+{
+	"odf" : {
+		"instanceId" : "odf-default-id-CHANGEME",
+		"odfUrl" : "https://localhost:58081/odf-web-1.2.0-SNAPSHOT",
+		"odfUser" : "sdp",
+		"odfPassword" : "ZzTeX3hKtVORgks+2TaLPWxerucPBoxK",
+		"consumeMessageHubEvents" : false,
+		"discoveryServiceWatcherWaitMs": 2000,
+		"reuseRequests": true,
+		"runAnalysisOnImport": false,
+		"runNewServicesOnRegistration": false,
+		"enableAnnotationPropagation": true,
+		"messagingConfiguration": {
+			"type": "com.ibm.iis.odf.api.settings.KafkaMessagingConfiguration",
+			"analysisRequestRetentionMs": 86400000,
+			"kafkaBrokerTopicReplication": 1,
+			"queueConsumerWaitMs": 5000,
+			"kafkaConsumerConfig": {
+				"offsetsStorage": "kafka",
+				"zookeeperSessionTimeoutMs": 400,
+				"zookeeperConnectionTimeoutMs": 6000
+			}
+		},
+		"userDefined": {
+		}
+	},
+	"registeredServices" : []
+}