You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@atlas.apache.org by ma...@apache.org on 2017/06/28 05:57:25 UTC

[12/25] incubator-atlas git commit: ATLAS-1898: initial commit of ODF

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/6d19e129/odf/odf-core/src/main/resources/org/apache/atlas/odf/core/metadata/internal/sampledata/sample-data-toc.properties
----------------------------------------------------------------------
diff --git a/odf/odf-core/src/main/resources/org/apache/atlas/odf/core/metadata/internal/sampledata/sample-data-toc.properties b/odf/odf-core/src/main/resources/org/apache/atlas/odf/core/metadata/internal/sampledata/sample-data-toc.properties
new file mode 100755
index 0000000..2bf7347
--- /dev/null
+++ b/odf/odf-core/src/main/resources/org/apache/atlas/odf/core/metadata/internal/sampledata/sample-data-toc.properties
@@ -0,0 +1,17 @@
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+# This file contains a list of the ODF sample data files
+simple-example-table.csv=
+simple-example-document.txt=
+bank-clients-short.csv=

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/6d19e129/odf/odf-core/src/main/resources/org/apache/atlas/odf/core/metadata/internal/sampledata/simple-example-document.txt
----------------------------------------------------------------------
diff --git a/odf/odf-core/src/main/resources/org/apache/atlas/odf/core/metadata/internal/sampledata/simple-example-document.txt b/odf/odf-core/src/main/resources/org/apache/atlas/odf/core/metadata/internal/sampledata/simple-example-document.txt
new file mode 100755
index 0000000..6bdeca2
--- /dev/null
+++ b/odf/odf-core/src/main/resources/org/apache/atlas/odf/core/metadata/internal/sampledata/simple-example-document.txt
@@ -0,0 +1 @@
+This is a simple example text.

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/6d19e129/odf/odf-core/src/main/resources/org/apache/atlas/odf/core/metadata/internal/sampledata/simple-example-table.csv
----------------------------------------------------------------------
diff --git a/odf/odf-core/src/main/resources/org/apache/atlas/odf/core/metadata/internal/sampledata/simple-example-table.csv b/odf/odf-core/src/main/resources/org/apache/atlas/odf/core/metadata/internal/sampledata/simple-example-table.csv
new file mode 100755
index 0000000..adbd1ab
--- /dev/null
+++ b/odf/odf-core/src/main/resources/org/apache/atlas/odf/core/metadata/internal/sampledata/simple-example-table.csv
@@ -0,0 +1,4 @@
+OMColumnName1,OMColumnName2
+aaaa,1
+bbbb,2
+cccc,3

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/6d19e129/odf/odf-core/src/main/resources/org/apache/atlas/odf/core/odfversion.txt
----------------------------------------------------------------------
diff --git a/odf/odf-core/src/main/resources/org/apache/atlas/odf/core/odfversion.txt b/odf/odf-core/src/main/resources/org/apache/atlas/odf/core/odfversion.txt
new file mode 100755
index 0000000..27b38ad
--- /dev/null
+++ b/odf/odf-core/src/main/resources/org/apache/atlas/odf/core/odfversion.txt
@@ -0,0 +1,14 @@
+#
+#  Licensed under the Apache License, Version 2.0 (the "License");
+#  you may not use this file except in compliance with the License.
+#  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+#  limitations under the License.
+#
+1.2.0-SNAPSHOT

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/6d19e129/odf/odf-core/src/test/java/org/apache/atlas/odf/core/integrationtest/ODFAPITestWithMetadataStoreBase.java
----------------------------------------------------------------------
diff --git a/odf/odf-core/src/test/java/org/apache/atlas/odf/core/integrationtest/ODFAPITestWithMetadataStoreBase.java b/odf/odf-core/src/test/java/org/apache/atlas/odf/core/integrationtest/ODFAPITestWithMetadataStoreBase.java
new file mode 100755
index 0000000..587ae30
--- /dev/null
+++ b/odf/odf-core/src/test/java/org/apache/atlas/odf/core/integrationtest/ODFAPITestWithMetadataStoreBase.java
@@ -0,0 +1,136 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.atlas.odf.core.integrationtest;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.logging.Level;
+
+import org.apache.atlas.odf.api.analysis.AnalysisManager;
+import org.apache.atlas.odf.api.analysis.AnalysisRequest;
+import org.apache.atlas.odf.api.analysis.AnalysisRequestStatus;
+import org.apache.atlas.odf.api.analysis.AnalysisResponse;
+import org.apache.atlas.odf.api.metadata.MetaDataObjectReference;
+import org.apache.atlas.odf.api.metadata.MetadataStore;
+import org.apache.atlas.odf.api.metadata.models.Annotation;
+import org.apache.atlas.odf.api.metadata.models.MetaDataObject;
+import org.apache.atlas.odf.core.ODFInternalFactory;
+import org.apache.atlas.odf.core.configuration.ConfigContainer;
+import org.apache.atlas.odf.json.JSONUtils;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.BeforeClass;
+
+import org.apache.atlas.odf.api.ODFFactory;
+import org.apache.atlas.odf.api.annotation.AnnotationStore;
+import org.apache.atlas.odf.api.annotation.AnnotationStoreUtils;
+import org.apache.atlas.odf.core.configuration.ConfigManager;
+import org.apache.atlas.odf.core.test.ODFTestBase;
+import org.apache.atlas.odf.core.test.discoveryservice.TestSyncDiscoveryServiceWritingAnnotations1;
+
+public class ODFAPITestWithMetadataStoreBase extends ODFTestBase {
+
+	@Before
+	public void createSampleData() throws Exception {
+		MetadataStore mds = new ODFFactory().create().getMetadataStore();
+		mds.resetAllData();
+		mds.createSampleData();
+	}
+
+	@BeforeClass
+	public static void registerServices() throws Exception {
+		ConfigContainer config = JSONUtils.readJSONObjectFromFileInClasspath(ConfigContainer.class, "org/apache/atlas/odf/core/test/internal/odf-initial-configuration.json",
+				ODFAPITestWithMetadataStoreBase.class.getClassLoader());
+		ConfigManager configManager = new ODFInternalFactory().create(ConfigManager.class);
+		configManager.updateConfigContainer(config);
+	}
+
+	protected List<MetaDataObjectReference> getTables(MetadataStore mds) {
+		List<MetaDataObjectReference> dataSets = mds.search(mds.newQueryBuilder().objectType("DataFile").build());
+		Assert.assertTrue(dataSets.size() > 0);
+		// take only maximal 5 data sets
+		int MAX_DATASETS = 5;
+		if (dataSets.size() > MAX_DATASETS) {
+			dataSets = dataSets.subList(0, MAX_DATASETS);
+		}
+		return dataSets;
+	}
+
+	public String test(String dsId, List<MetaDataObjectReference> dataSets, AnalysisRequestStatus.State expectedFinalState, boolean requestIsInvalid, String correlationId) throws Exception {
+		log.log(Level.INFO, "Testing ODF with metadata store. Discovery service Id: {0}, dataSets: {1}, expected state: {2}, correlationId: {3}, should request be invalid: {4}", new Object[] { dsId,
+				dataSets, expectedFinalState, correlationId, requestIsInvalid });
+		MetadataStore mds = new ODFFactory().create().getMetadataStore();
+		Assert.assertTrue(dataSets.size() > 0);
+
+		Assert.assertNotNull(mds);
+		AnalysisRequest request = new AnalysisRequest();
+		request.setDiscoveryServiceSequence(Collections.singletonList(dsId));
+		request.setDataSets(dataSets);
+		Map<String, Object> additionalProps = new HashMap<String, Object>();
+		additionalProps.put(TestSyncDiscoveryServiceWritingAnnotations1.REQUEST_PROPERTY_CORRELATION_ID, correlationId);
+		request.setAdditionalProperties(additionalProps);
+		AnalysisManager analysisManager = new ODFFactory().create().getAnalysisManager();
+		AnalysisResponse resp = analysisManager.runAnalysis(request);
+
+		log.info("Analysis started on data sets: " + dataSets + ", response: " + JSONUtils.toJSON(resp));
+		log.info("Response message: " + resp.getDetails());
+		if (requestIsInvalid) {
+			Assert.assertTrue(resp.isInvalidRequest());
+			return null;
+		}
+
+		Assert.assertFalse(resp.isInvalidRequest());
+		String id = resp.getId();
+		AnalysisRequestStatus status = null;
+		int maxPolls = 100;
+		do {
+			status = analysisManager.getAnalysisRequestStatus(id);
+			log.log(Level.INFO, "Poll request for request ID ''{0}'' (expected state: ''{3}''): state: ''{1}'', details: ''{2}''", new Object[] { id, status.getState(), status.getDetails(),
+					expectedFinalState });
+			maxPolls--;
+			Thread.sleep(1000);
+		} while (maxPolls > 0 && (status.getState() == AnalysisRequestStatus.State.ACTIVE || status.getState() == AnalysisRequestStatus.State.QUEUED));
+		log.log(Level.INFO, "Expected state: {0}, actual state: {1}", new Object[] { expectedFinalState, status.getState() });
+		Assert.assertEquals(expectedFinalState, status.getState());
+		return resp.getId();
+	}
+
+	public void checkMostRecentAnnotations(MetadataStore mds, AnnotationStore as, MetaDataObjectReference ref) {
+		Map<MetaDataObjectReference, MetaDataObject> ref2Retrieved = new HashMap<>();
+		for (Annotation annot : as.getAnnotations(ref, null)) {
+			ref2Retrieved.put(annot.getReference(), annot);
+		}
+
+		List<Annotation> mostRecentAnnotations = AnnotationStoreUtils.getMostRecentAnnotationsByType(as, ref);
+		Assert.assertNotNull(mostRecentAnnotations);
+		Assert.assertTrue(mostRecentAnnotations.size() <= ref2Retrieved.size());
+		Set<MetaDataObjectReference> mostRecentAnnoationRefs = new HashSet<>();
+		Set<String> annotationTypes = new HashSet<>();
+		for (Annotation annot : mostRecentAnnotations) {
+			// every annotation type occurs at most once
+			Assert.assertFalse( annotationTypes.contains(annot.getAnnotationType()));
+			mostRecentAnnoationRefs.add(annot.getReference());
+			annotationTypes.add(annot.getAnnotationType());
+		}
+
+		// all most recent annotations are a subset of all annotations
+		Assert.assertTrue(ref2Retrieved.keySet().containsAll(mostRecentAnnoationRefs));
+
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/6d19e129/odf/odf-core/src/test/java/org/apache/atlas/odf/core/integrationtest/ODFAPITestWithMetadataStoreExtendedAnnotations.java
----------------------------------------------------------------------
diff --git a/odf/odf-core/src/test/java/org/apache/atlas/odf/core/integrationtest/ODFAPITestWithMetadataStoreExtendedAnnotations.java b/odf/odf-core/src/test/java/org/apache/atlas/odf/core/integrationtest/ODFAPITestWithMetadataStoreExtendedAnnotations.java
new file mode 100755
index 0000000..f0742aa
--- /dev/null
+++ b/odf/odf-core/src/test/java/org/apache/atlas/odf/core/integrationtest/ODFAPITestWithMetadataStoreExtendedAnnotations.java
@@ -0,0 +1,74 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.atlas.odf.core.integrationtest;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import org.apache.atlas.odf.api.metadata.MetaDataObjectReference;
+import org.apache.atlas.odf.api.metadata.MetadataStore;
+import org.apache.atlas.odf.api.metadata.models.Annotation;
+import org.apache.atlas.odf.api.analysis.AnalysisRequestStatus.State;
+import org.apache.atlas.odf.api.annotation.AnnotationStore;
+import org.apache.atlas.odf.api.ODFFactory;
+import org.apache.atlas.odf.core.test.annotation.TestSyncDiscoveryServiceWritingExtendedAnnotations.MyObject;
+import org.apache.atlas.odf.core.test.annotation.TestSyncDiscoveryServiceWritingExtendedAnnotations.MyOtherObject;
+import org.apache.atlas.odf.core.test.annotation.TestSyncDiscoveryServiceWritingExtendedAnnotations.SyncDiscoveryServiceAnnotation;
+import org.apache.atlas.odf.json.JSONUtils;
+
+public class ODFAPITestWithMetadataStoreExtendedAnnotations extends ODFAPITestWithMetadataStoreBase {
+
+	@Test
+	public void testSuccessSyncExtendedAnnotations() throws Exception {
+		MetadataStore mds = new ODFFactory().create().getMetadataStore();
+		AnnotationStore as = new ODFFactory().create().getAnnotationStore();
+		List<MetaDataObjectReference> dataSets = getTables(mds);
+		String dsID = "synctestservice-with-extendedannotations";
+
+		String requestId = test(dsID, dataSets, State.FINISHED, false, null);
+
+		log.info("Checking if extended annotations exist for request ID: " + requestId);
+		for (MetaDataObjectReference dataSet : dataSets) {
+			List<SyncDiscoveryServiceAnnotation> annotations = new ArrayList<>();
+			List<Annotation> annots = as.getAnnotations(dataSet, null);
+			Assert.assertTrue(annots.size() >= 2);
+			
+			for (Annotation annot : annots) {		
+				Assert.assertNotNull(annot);
+				if (annot.getAnalysisRun().equals(requestId)) {
+					log.info("Found annotation: " + annot + ", json: " + JSONUtils.toJSON(annot));
+					Assert.assertNotNull(annot);
+					Assert.assertEquals(SyncDiscoveryServiceAnnotation.class, annot.getClass());
+					SyncDiscoveryServiceAnnotation extAnnot = (SyncDiscoveryServiceAnnotation) annot;
+					Assert.assertNotNull(extAnnot.getProp1());
+					Assert.assertEquals(extAnnot.getProp1().hashCode(), extAnnot.getProp2());
+					MyObject mo = extAnnot.getProp3();
+					Assert.assertNotNull(mo);
+					Assert.assertEquals("nested" + extAnnot.getProp1(), mo.getAnotherProp());
+					
+					MyOtherObject moo = mo.getYetAnotherProp();
+					Assert.assertNotNull(moo);
+					Assert.assertEquals("nestedtwolevels" + extAnnot.getProp1(), moo.getMyOtherObjectProperty());
+					annotations.add(extAnnot);
+				}
+			}
+			Assert.assertEquals(2, annotations.size());
+			// TODO check annotations list
+		}
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/6d19e129/odf/odf-core/src/test/java/org/apache/atlas/odf/core/integrationtest/ODFAPITestWithMetadataStoreJsonAnnotation.java
----------------------------------------------------------------------
diff --git a/odf/odf-core/src/test/java/org/apache/atlas/odf/core/integrationtest/ODFAPITestWithMetadataStoreJsonAnnotation.java b/odf/odf-core/src/test/java/org/apache/atlas/odf/core/integrationtest/ODFAPITestWithMetadataStoreJsonAnnotation.java
new file mode 100755
index 0000000..e47b316
--- /dev/null
+++ b/odf/odf-core/src/test/java/org/apache/atlas/odf/core/integrationtest/ODFAPITestWithMetadataStoreJsonAnnotation.java
@@ -0,0 +1,69 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.atlas.odf.core.integrationtest;
+
+import java.util.List;
+import java.util.logging.Logger;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import org.apache.atlas.odf.api.metadata.MetaDataObjectReference;
+import org.apache.atlas.odf.api.metadata.MetadataStore;
+import org.apache.atlas.odf.api.metadata.models.Annotation;
+import org.apache.atlas.odf.api.ODFFactory;
+import org.apache.atlas.odf.core.Utils;
+import org.apache.atlas.odf.api.analysis.AnalysisRequestStatus.State;
+import org.apache.atlas.odf.api.annotation.AnnotationStore;
+import org.apache.atlas.odf.core.test.ODFTestLogger;
+import org.apache.atlas.odf.json.JSONUtils;
+
+public class ODFAPITestWithMetadataStoreJsonAnnotation extends ODFAPITestWithMetadataStoreBase {
+
+	Logger logger = ODFTestLogger.get();
+
+	String expectedJson = Utils.getInputStreamAsString(this.getClass().getClassLoader().getResourceAsStream("org/apache/atlas/odf/core/integrationtest/metadata/internal/atlas/nested_annotation_example.json"), "UTF-8");
+
+	@Test
+	public void testSuccessSyncJsonAnnotations() throws Exception {
+
+		MetadataStore mds = new ODFFactory().create().getMetadataStore();
+		AnnotationStore as = new ODFFactory().create().getAnnotationStore();
+		List<MetaDataObjectReference> dataSets = getTables(mds);
+		String dsID = "synctestservice-with-json-annotations";
+
+		String requestId = test(dsID, dataSets, State.FINISHED, false, null);
+
+		log.info("Checking if annotations exist for request ID: " + requestId);
+		int numMatchingAnnotations = 0;
+		for (MetaDataObjectReference dataSet : dataSets) {
+			List<Annotation> annotationRefs = as.getAnnotations(dataSet, null);
+			Assert.assertTrue(annotationRefs.size() >= 1);
+			for (Annotation annot : annotationRefs) {
+				Assert.assertNotNull(annot);
+				if (annot.getAnalysisRun().equals(requestId)) {
+					log.info("Found annotation: " + annot + ", json: " + JSONUtils.toJSON(annot));
+					Assert.assertNotNull(annot);
+					String jsonProperties = annot.getJsonProperties();
+					Assert.assertNotNull(jsonProperties);
+					logger.info("Actual annotation string: " + jsonProperties + ". Expected json: " + expectedJson);
+					Assert.assertEquals(expectedJson, jsonProperties);
+					numMatchingAnnotations++;
+				}
+			}
+//			Assert.assertEquals(1, numMatchingAnnotations);
+		}
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/6d19e129/odf/odf-core/src/test/java/org/apache/atlas/odf/core/integrationtest/ODFAPITestWithMetadataStoreSimple.java
----------------------------------------------------------------------
diff --git a/odf/odf-core/src/test/java/org/apache/atlas/odf/core/integrationtest/ODFAPITestWithMetadataStoreSimple.java b/odf/odf-core/src/test/java/org/apache/atlas/odf/core/integrationtest/ODFAPITestWithMetadataStoreSimple.java
new file mode 100755
index 0000000..6b7c9b9
--- /dev/null
+++ b/odf/odf-core/src/test/java/org/apache/atlas/odf/core/integrationtest/ODFAPITestWithMetadataStoreSimple.java
@@ -0,0 +1,134 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.atlas.odf.core.integrationtest;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.UUID;
+
+import org.apache.atlas.odf.api.analysis.AnalysisRequestStatus;
+import org.apache.atlas.odf.api.annotation.AnnotationStore;
+import org.apache.atlas.odf.api.metadata.MetaDataObjectReference;
+import org.apache.atlas.odf.api.metadata.MetadataStore;
+import org.apache.atlas.odf.api.metadata.models.Annotation;
+import org.apache.atlas.odf.core.test.ODFTestBase;
+import org.apache.atlas.odf.core.test.discoveryservice.TestSyncDiscoveryServiceWritingAnnotations1;
+import org.apache.wink.json4j.JSONObject;
+import org.junit.Assert;
+import org.junit.Test;
+
+import org.apache.atlas.odf.api.ODFFactory;
+
+public class ODFAPITestWithMetadataStoreSimple extends ODFAPITestWithMetadataStoreBase {
+
+	public ODFAPITestWithMetadataStoreSimple() {
+		ODFTestBase.log.info("Classpath: " + System.getProperty("java.class.path"));
+	}
+
+	@Test
+	public void testSuccessASync() throws Exception {
+		testSuccess("asynctestservice-with-annotations");
+	}
+
+	@Test
+	public void testSuccessSync() throws Exception {
+		testSuccess("synctestservice-with-annotations");
+	}
+
+	void testSuccess(String dsId) throws Exception {
+		MetadataStore mds = new ODFFactory().create().getMetadataStore();
+		AnnotationStore as = new ODFFactory().create().getAnnotationStore();
+		List<MetaDataObjectReference> dataSets = getTables(mds);
+
+		String correlationId = UUID.randomUUID().toString();
+		
+		String requestId = test(dsId, dataSets, AnalysisRequestStatus.State.FINISHED, false, correlationId);
+		Thread.sleep(3000); // give time for notifications to arrive
+
+		List<MetaDataObjectReference> annotationsOfThisRun = new ArrayList<>();
+		
+		ODFTestBase.log.info("Checking if annotations exist");
+		for (MetaDataObjectReference dataSet : dataSets) {
+			List<Annotation> retrievedAnnotations = as.getAnnotations(dataSet, null);
+			Assert.assertTrue(retrievedAnnotations.size() > 0);
+			List<Annotation> annotations = new ArrayList<>();
+			for (Annotation annot : retrievedAnnotations) {
+				Assert.assertNotNull(annot);
+				Assert.assertNotNull(annot.getAnalysisRun());
+				if (annot.getAnalysisRun().equals(requestId)) {
+					annotationsOfThisRun.add(annot.getReference());
+					Assert.assertNotNull(annot.getJsonProperties());
+					JSONObject props = new JSONObject(annot.getJsonProperties());
+					if (props != null) {
+						String annotCorrId = (String) props.get(TestSyncDiscoveryServiceWritingAnnotations1.REQUEST_PROPERTY_CORRELATION_ID);
+						if (annotCorrId != null) {
+							Assert.assertNotNull(annot.getAnnotationType());
+						}
+					}
+					annotations.add(annot);
+				}
+			}
+			ODFTestBase.log.info("Checking that annotation notifications were received");
+			// check that we got notified of all annotations
+			
+			// assume at least that those new annotations were created
+			Assert.assertTrue(TestSyncDiscoveryServiceWritingAnnotations1.getNumberOfAnnotations() <= annotations.size());
+			int found = 0;
+			for (int i = 0; i < TestSyncDiscoveryServiceWritingAnnotations1.getNumberOfAnnotations(); i++) {
+				String[] annotValues = TestSyncDiscoveryServiceWritingAnnotations1.getPropsOfNthAnnotation(i);
+				for (Annotation annotation : annotations) {
+					if (annotation.getAnnotationType() != null) {
+						if (annotation.getAnnotationType().equals(annotValues[0])) {
+							JSONObject jo = new JSONObject(annotation.getJsonProperties());
+							String foundCorrelationId = (String) jo.get(TestSyncDiscoveryServiceWritingAnnotations1.REQUEST_PROPERTY_CORRELATION_ID);
+							// only look at those where the correlation ID property is set
+							if (correlationId.equals(foundCorrelationId)) {
+								String val = (String) jo.get(annotValues[1]);
+								Assert.assertEquals(annotValues[2], val);
+								Assert.assertEquals(requestId, annotation.getAnalysisRun());
+								// annotation types and the JSON properties match
+								found++;
+							}
+						}
+					}
+				}
+			}
+			// assert that we have found all and not more
+			Assert.assertEquals(TestSyncDiscoveryServiceWritingAnnotations1.getNumberOfAnnotations(), found);
+
+			checkMostRecentAnnotations(mds, new ODFFactory().create().getAnnotationStore(), dataSet);
+		}
+	}
+
+	
+	
+	@Test
+	public void testFailureASync() throws Exception {
+		testFailure("asynctestservice-with-annotations");
+	}
+
+	@Test
+	public void testFailureSync() throws Exception {
+		testFailure("synctestservice-with-annotations");
+	}
+
+	void testFailure(String dsId) throws Exception {
+		MetaDataObjectReference invalidRef = new MetaDataObjectReference();
+		invalidRef.setId("error-this-is-hopefully-an-invalid-id");
+		List<MetaDataObjectReference> dataSets = Collections.singletonList(invalidRef);
+		test(dsId, dataSets, AnalysisRequestStatus.State.ERROR, true, UUID.randomUUID().toString());
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/6d19e129/odf/odf-core/src/test/java/org/apache/atlas/odf/core/integrationtest/connectivity/DataSetRetrieverTest.java
----------------------------------------------------------------------
diff --git a/odf/odf-core/src/test/java/org/apache/atlas/odf/core/integrationtest/connectivity/DataSetRetrieverTest.java b/odf/odf-core/src/test/java/org/apache/atlas/odf/core/integrationtest/connectivity/DataSetRetrieverTest.java
new file mode 100755
index 0000000..af70b5a
--- /dev/null
+++ b/odf/odf-core/src/test/java/org/apache/atlas/odf/core/integrationtest/connectivity/DataSetRetrieverTest.java
@@ -0,0 +1,92 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.atlas.odf.core.integrationtest.connectivity;
+
+import java.sql.ResultSet;
+import java.util.List;
+import java.util.logging.Logger;
+
+import org.apache.atlas.odf.api.metadata.MetadataStore;
+import org.apache.atlas.odf.api.metadata.models.Table;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import org.apache.atlas.odf.api.metadata.MetaDataObjectReference;
+import org.apache.atlas.odf.api.connectivity.DataSetRetriever;
+import org.apache.atlas.odf.api.connectivity.DataSetRetrieverImpl;
+import org.apache.atlas.odf.api.connectivity.JDBCRetrievalResult;
+import org.apache.atlas.odf.api.discoveryservice.datasets.MaterializedDataSet;
+import org.apache.atlas.odf.api.ODFFactory;
+import org.apache.atlas.odf.core.integrationtest.metadata.importer.JDBCMetadataImporterTest;
+import org.apache.atlas.odf.core.test.ODFTestBase;
+import org.apache.atlas.odf.core.test.ODFTestLogger;
+
+public class DataSetRetrieverTest extends ODFTestBase {
+
+	static Logger logger = ODFTestLogger.get();
+	
+	static MetadataStore createMetadataStore() throws Exception {
+		return new ODFFactory().create().getMetadataStore();
+	}
+	
+	@BeforeClass
+	public static void setupImport() throws Exception {
+		MetadataStore mds = createMetadataStore();
+		// create sample data only if it has not been created yet
+		mds.createSampleData();
+		JDBCMetadataImporterTest.runTestImport(mds);
+	}
+	
+	@Test
+	public void testDataSetRetrievalJDBC() throws Exception {
+		MetadataStore ams = createMetadataStore();
+		DataSetRetriever retriever = new DataSetRetrieverImpl(ams);
+		List<MetaDataObjectReference> refs = ams.search(ams.newQueryBuilder().objectType("Table").build());
+		Assert.assertTrue(refs.size() > 0);
+		int retrievedDataSets = 0;
+		for (MetaDataObjectReference ref : refs) {
+			Table table = (Table) ams.retrieve(ref);
+			logger.info("Retrieving table: " + table.getName() + ", " + table.getReference().getUrl());
+			if (retriever.canRetrieveDataSet(table)) {
+				retrievedDataSets++;
+				MaterializedDataSet mds = retriever.retrieveRelationalDataSet(table);
+				Assert.assertNotNull(mds);
+				Assert.assertEquals(table, mds.getTable());
+				int numberOfColumns = ams.getColumns(table).size();
+				Assert.assertEquals(numberOfColumns, mds.getColumns().size());
+				Assert.assertNotNull(mds.getData());
+				Assert.assertTrue(mds.getData().size() > 0);
+				for (List<Object> row : mds.getData()) {
+					Assert.assertEquals(row.size(),numberOfColumns);
+				}
+				
+				// now test JDBC method
+				JDBCRetrievalResult jdbcResult = retriever.retrieveTableAsJDBCResultSet(table);
+				ResultSet rs = jdbcResult.getPreparedStatement().executeQuery();
+				Assert.assertEquals(mds.getColumns().size(), rs.getMetaData().getColumnCount());
+				int count = 0;
+				while (rs.next()) {
+					count++;
+				}
+				Assert.assertEquals(mds.getData().size(), count);
+				
+				// only run one test
+				break;
+			}
+		}
+		Assert.assertEquals("Number of retrieved data sets does not meet the expected value. ", 1, retrievedDataSets);
+		
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/6d19e129/odf/odf-core/src/test/java/org/apache/atlas/odf/core/integrationtest/metadata/MetadataStoreTestBase.java
----------------------------------------------------------------------
diff --git a/odf/odf-core/src/test/java/org/apache/atlas/odf/core/integrationtest/metadata/MetadataStoreTestBase.java b/odf/odf-core/src/test/java/org/apache/atlas/odf/core/integrationtest/metadata/MetadataStoreTestBase.java
new file mode 100755
index 0000000..47d3a3d
--- /dev/null
+++ b/odf/odf-core/src/test/java/org/apache/atlas/odf/core/integrationtest/metadata/MetadataStoreTestBase.java
@@ -0,0 +1,303 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.atlas.odf.core.integrationtest.metadata;
+
+import java.text.MessageFormat;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.UUID;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import org.apache.atlas.odf.core.metadata.WritableMetadataStore;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import org.apache.atlas.odf.api.ODFFactory;
+import org.apache.atlas.odf.api.metadata.DefaultMetadataQueryBuilder;
+import org.apache.atlas.odf.api.metadata.MetaDataObjectReference;
+import org.apache.atlas.odf.api.metadata.MetadataQueryBuilder;
+import org.apache.atlas.odf.api.metadata.models.Schema;
+import org.apache.atlas.odf.api.metadata.models.Table;
+import org.apache.atlas.odf.api.metadata.models.ProfilingAnnotation;
+import org.apache.atlas.odf.api.metadata.models.RelationshipAnnotation;
+import org.apache.atlas.odf.api.metadata.MetadataStore;
+import org.apache.atlas.odf.api.metadata.MetadataStoreException;
+import org.apache.atlas.odf.api.metadata.models.ClassificationAnnotation;
+import org.apache.atlas.odf.api.metadata.models.Column;
+import org.apache.atlas.odf.api.metadata.models.Connection;
+import org.apache.atlas.odf.api.metadata.models.DataFile;
+import org.apache.atlas.odf.api.metadata.models.DataFileFolder;
+import org.apache.atlas.odf.api.metadata.models.JDBCConnection;
+import org.apache.atlas.odf.api.metadata.models.JDBCConnectionInfo;
+import org.apache.atlas.odf.api.metadata.models.MetaDataObject;
+import org.apache.atlas.odf.api.metadata.models.Database;
+
+public abstract class MetadataStoreTestBase {
+	private Logger logger = Logger.getLogger(MetadataStoreTestBase.class.getName());
+	private static final String analysisRun = UUID.randomUUID().toString();
+
+	protected abstract MetadataStore getMetadataStore();
+
+	public static WritableMetadataStore getWritableMetadataStore() {
+		MetadataStore mds = new ODFFactory().create().getMetadataStore();
+		if (!(mds instanceof WritableMetadataStore)) {
+			String errorText = "The MetadataStore implementation ''{0}'' does not support the WritableMetadataStore interface.";
+			Assert.fail(MessageFormat.format(errorText , mds.getClass()));
+			return null;
+		}
+		return (WritableMetadataStore) mds;
+	}
+
+	public static void createAdditionalTestData(WritableMetadataStore mds) {
+		MetaDataObjectReference bankClientsShortRef = mds.search(mds.newQueryBuilder().objectType("DataFile").simpleCondition("name", MetadataQueryBuilder.COMPARATOR.EQUALS, "BankClientsShort").build()).get(0);
+
+		JDBCConnection connection = new JDBCConnection();
+		connection.setName("connection1");
+
+		Table table1 = new Table();
+		table1.setName("table1");
+		Table table2 = new Table();
+		table2.setName("table2");
+
+		Schema schema1 = new Schema();
+		schema1.setName("schema1");
+		MetaDataObjectReference schemaRef = mds.createObject(schema1);
+		mds.addTableReference(schema1, mds.createObject(table1));
+		mds.addTableReference(schema1, mds.createObject(table2));
+
+		Database dataStore = new Database();
+		dataStore.setName("database1");
+		mds.createObject(dataStore);
+		mds.addSchemaReference(dataStore, schemaRef);
+		mds.addConnectionReference(dataStore, mds.createObject(connection));
+
+		DataFile file1 = new DataFile();
+		file1.setName("file1");
+		DataFile file2 = new DataFile();
+		file2.setName("file2");
+
+		DataFileFolder nestedFolder = new DataFileFolder();
+		nestedFolder.setName("nestedFolder");
+		MetaDataObjectReference nestedFolderRef = mds.createObject(nestedFolder);
+		mds.addDataFileReference(nestedFolder, mds.createObject(file1));
+		mds.addDataFileReference(nestedFolder, mds.createObject(file2));
+
+		DataFileFolder rootFolder = new DataFileFolder();
+		rootFolder.setName("rootFolder");
+		mds.createObject(rootFolder);
+		mds.addDataFileFolderReference(rootFolder, nestedFolderRef);
+
+		ProfilingAnnotation pa = new ProfilingAnnotation();
+		pa.setName("A profiling annotation");
+		pa.setProfiledObject(bankClientsShortRef);
+		pa.setAnalysisRun(analysisRun);
+		mds.createObject(pa);
+
+		ClassificationAnnotation ca = new ClassificationAnnotation();
+		ca.setName("A classification annotation");
+		ca.setClassifiedObject(bankClientsShortRef);
+		ca.setAnalysisRun(analysisRun);
+		ca.setClassifyingObjects(Collections.singletonList(bankClientsShortRef));
+		mds.createObject(ca);
+
+		RelationshipAnnotation ra = new RelationshipAnnotation();
+		ra.setName("A relationship annotation");
+		ra.setRelatedObjects(Collections.singletonList(bankClientsShortRef));
+		ra.setAnalysisRun(analysisRun);
+		mds.createObject(ra);
+
+		mds.commit();
+	}
+
+	@Before
+	public void createSampleData() {
+		WritableMetadataStore mds = getWritableMetadataStore();
+		mds.resetAllData();
+		mds.createSampleData();
+		createAdditionalTestData(mds);
+	}
+
+	public static void checkQueryResults(MetadataStore mds, String[] expectedObjectNames, String searchTerm, boolean isSubset) {
+		HashSet<String> expectedResults = new HashSet<String>(Arrays.asList(expectedObjectNames));
+		List<MetaDataObjectReference> searchResult = mds.search(searchTerm);
+		Set<String> foundResults = new HashSet<>();
+		for (MetaDataObjectReference ref : searchResult) {
+			foundResults.add(mds.retrieve(ref).getName());
+		}
+		if (isSubset) {
+			String messageText = "Metadata search term ''{0}'' did not return expected subset of objects. Expected ''{1}'' but received ''{2}''.";
+			Assert.assertTrue(MessageFormat.format(messageText, new Object[] {searchTerm, expectedResults, foundResults}), foundResults.containsAll(expectedResults));
+		} else {
+			String messageText = "Metadata search term ''{0}'' did not return expected results. Expected ''{1}'' but received ''{2}''.";
+			Assert.assertTrue(MessageFormat.format(messageText, new Object[] {searchTerm, expectedResults, foundResults}), foundResults.equals(expectedResults));
+		}
+	}
+
+	public static void checkReferencedObjects(String[] expectedObjectNames, List<? extends MetaDataObject> referencedObjects, boolean isSubset) {
+		HashSet<String> expectedResults = new HashSet<String>(Arrays.asList(expectedObjectNames));
+		Set<String> actualNames = new HashSet<>();
+		for (MetaDataObject obj : referencedObjects) {
+			actualNames.add(obj.getName());
+		}
+		if (isSubset) {
+			String messageText = "Actual object names ''{0}'' are not a subset of expected names ''{1}''.";
+			Assert.assertTrue(MessageFormat.format(messageText, new Object[] { actualNames, expectedResults }), actualNames.containsAll(expectedResults));
+		} else {
+			String messageText = "Actual object names ''{0}'' do not match expected names ''{1}''.";
+			Assert.assertTrue(MessageFormat.format(messageText, new Object[] { actualNames, expectedResults }), actualNames.equals(expectedResults));
+		}
+	}
+
+	void checkFailingQuery(MetadataStore mds, String searchTerm) {
+		try {
+			logger.log(Level.INFO, "Checking incorrect query \"{0}\"", searchTerm);
+			List<MetaDataObjectReference> searchResult = mds.search(searchTerm);
+			if (searchResult != null) {
+				// Search must return null or throw exception
+				Assert.fail(MessageFormat.format("Incorrect query \"{0}\" did not throw the expected exception.", searchTerm));
+			}
+		} catch (MetadataStoreException e) {
+			logger.log(Level.INFO, "Catching expected exception.", e);
+		}
+	}
+
+	@Test
+	public void testSearchAndRetrieve() {
+		MetadataStore mds = getMetadataStore();
+		MetaDataObjectReference bankClientsShortRef = mds.search(mds.newQueryBuilder().objectType("DataFile").simpleCondition("name", MetadataQueryBuilder.COMPARATOR.EQUALS, "BankClientsShort").build()).get(0);
+		Assert.assertEquals("The metadata store did not retrieve the object with the expected name.", "BankClientsShort", mds.retrieve(bankClientsShortRef).getName());
+
+		// Test queries with conditions
+		checkQueryResults(mds, new String[] { "BankClientsShort" }, mds.newQueryBuilder().objectType("DataFile").simpleCondition("name", MetadataQueryBuilder.COMPARATOR.EQUALS, "BankClientsShort").build(), false);
+		checkQueryResults(mds, new String[] { "SimpleExampleTable", "file2", "file1"}, mds.newQueryBuilder().objectType("DataFile").simpleCondition("name", MetadataQueryBuilder.COMPARATOR.NOT_EQUALS, "BankClientsShort").build(), false);
+		checkQueryResults(mds, new String[] { "NAME" },
+				mds.newQueryBuilder().objectType("Column").simpleCondition("name", MetadataQueryBuilder.COMPARATOR.EQUALS, "NAME").simpleCondition("dataType", MetadataQueryBuilder.COMPARATOR.EQUALS, "string").build(), false);
+
+		// Test type hierarchy
+		checkQueryResults(mds, new String[] { "BankClientsShort", "SimpleExampleTable" }, mds.newQueryBuilder().objectType("DataFile").build(), true);
+		checkQueryResults(mds, new String[] { "BankClientsShort", "SimpleExampleTable" }, mds.newQueryBuilder().objectType("RelationalDataSet").build(), true);
+		checkQueryResults(mds, new String[] { "BankClientsShort", "SimpleExampleTable", "Simple URL example document", "Simple local example document", "table1", "table2", "file2", "file1" }, mds.newQueryBuilder().objectType("DataSet").build(), false);
+		checkQueryResults(mds, new String[] { "BankClientsShort" }, mds.newQueryBuilder().objectType("MetaDataObject").simpleCondition("name", MetadataQueryBuilder.COMPARATOR.EQUALS, "BankClientsShort").build(), false);
+	}
+	
+	public static Database getDatabaseTestObject(MetadataStore mds) {
+		String dataStoreQuery = mds.newQueryBuilder().objectType("DataStore").build();
+		MetadataStoreTestBase.checkQueryResults(mds, new String[] { "database1"}, dataStoreQuery, false);
+		return (Database) mds.retrieve(mds.search(dataStoreQuery).get(0));
+	}
+
+	public static Table getTableTestObject(MetadataStore mds) {
+		String tableQuery = mds.newQueryBuilder().objectType("Table").simpleCondition("name", MetadataQueryBuilder.COMPARATOR.EQUALS, "table1").build();
+		MetadataStoreTestBase.checkQueryResults(mds, new String[] { "table1"}, tableQuery, false);
+		return (Table) mds.retrieve(mds.search(tableQuery).get(0));
+	}
+
+	public static DataFile getDataFileTestObject(MetadataStore mds) {
+		String dataFileQuery = mds.newQueryBuilder().objectType("DataFile").simpleCondition("name", MetadataQueryBuilder.COMPARATOR.EQUALS, "SimpleExampleTable").build();
+		MetadataStoreTestBase.checkQueryResults(mds, new String[] { "SimpleExampleTable"}, dataFileQuery, false);
+		return (DataFile) mds.retrieve(mds.search(dataFileQuery).get(0));
+	}
+
+	public static DataFileFolder getDataFileFolderTestObject(MetadataStore mds) {
+		String folderQuery = mds.newQueryBuilder().objectType("DataFileFolder").simpleCondition("name", MetadataQueryBuilder.COMPARATOR.EQUALS, "rootFolder").build();
+		MetadataStoreTestBase.checkQueryResults(mds, new String[] { "rootFolder"}, folderQuery, false);
+		return (DataFileFolder) mds.retrieve(mds.search(folderQuery).get(0));
+	}
+
+	public static void checkReferences(MetadataStore mds, Database database) throws Exception {
+		List<Schema> schemaList = mds.getSchemas(database);
+		MetadataStoreTestBase.checkReferencedObjects(new String[] { "schema1" }, schemaList, false);
+		List<Table> tableList = mds.getTables(schemaList.get(0));
+		MetadataStoreTestBase.checkReferencedObjects(new String[] { "table1", "table2" }, tableList, false);
+		List<Connection> connectionList = mds.getConnections(database);
+		MetadataStoreTestBase.checkReferencedObjects(new String[] { "connection1" }, connectionList, false);
+	}
+
+	public static void checkReferences(MetadataStore mds, Table table) throws Exception {
+		JDBCConnectionInfo connectionInfo = (JDBCConnectionInfo) mds.getConnectionInfo(table);
+		Assert.assertTrue("Connection is not set in connection info.", connectionInfo.getConnections().size() > 0);
+		Assert.assertEquals("Connection does not match expected name.", "connection1", connectionInfo.getConnections().get(0).getName());
+		Assert.assertEquals("Schema name of connection info does not match expected value.", "schema1", connectionInfo.getSchemaName());
+	}
+
+	public static void checkReferences(MetadataStore mds, DataFileFolder folder) throws Exception {
+		List<DataFileFolder> nestedFolderList = mds.getDataFileFolders(folder);
+		MetadataStoreTestBase.checkReferencedObjects(new String[] { "nestedFolder" }, nestedFolderList, false);
+		List<DataFile> fileList = mds.getDataFiles(nestedFolderList.get(0));
+		MetadataStoreTestBase.checkReferencedObjects(new String[] { "file1", "file2" }, fileList, false);
+	}
+
+	public static void checkReferences(MetadataStore mds, DataFile file) throws Exception {
+		List<Column> columnList = mds.getColumns(file);
+		MetadataStoreTestBase.checkReferencedObjects(new String[] { "ColumnName1", "ColumnName2" }, columnList, false);
+		MetadataStoreTestBase.checkReferencedObjects(new String[] { "SimpleExampleTable" }, Collections.singletonList(mds.getParent(columnList.get(0))), false);
+		MetadataStoreTestBase.checkReferencedObjects(new String[] { "ColumnName1", "ColumnName2" }, mds.getChildren(file), false);
+	}
+
+	@Test
+	public void testReferences() throws Exception {
+		MetadataStore mds = getMetadataStore();
+		checkReferences(mds, getDatabaseTestObject(mds));
+		checkReferences(mds, getTableTestObject(mds));
+		checkReferences(mds, getDataFileFolderTestObject(mds));
+		checkReferences(mds, getDataFileTestObject(mds));
+	}
+
+	@Test
+	public void testErrorHandling() {
+		MetadataStore mds = getMetadataStore();
+		MetaDataObjectReference nonExistentRef = new MetaDataObjectReference();
+		nonExistentRef.setId("non-existing-reference-id");
+		nonExistentRef.setRepositoryId(mds.getRepositoryId());
+
+		Assert.assertEquals("A null value was expected when retrieving a non-existend object.", null, mds.retrieve(nonExistentRef));
+		String errorText = "Metadata search should have returned an empty result set.";
+		Assert.assertEquals(errorText,  mds.search(mds.newQueryBuilder().objectType("nonExistentType").build()), new ArrayList<MetaDataObjectReference>());
+		Assert.assertEquals(errorText,  mds.search(mds.newQueryBuilder().objectType("DataFile").simpleCondition("name", MetadataQueryBuilder.COMPARATOR.EQUALS, "nonExistentName").build()), new ArrayList<MetaDataObjectReference>());
+
+		if (!mds.getProperties().get(MetadataStore.STORE_PROPERTY_TYPE).equals("atlas")) {
+			// Skip this test because Atlas accepts this query as text search
+			checkFailingQuery(mds, "justAsSingleToken");
+			// Skip this test of Atlas because it does not return an error
+			String validQueryWithCondition = mds.newQueryBuilder().objectType("DataFile").simpleCondition("name", MetadataQueryBuilder.COMPARATOR.EQUALS, "BankClientsShort").build();
+			checkFailingQuery(mds, validQueryWithCondition + DefaultMetadataQueryBuilder.SEPARATOR_STRING + "additionalTrailingToken");
+			String validDataSetQuery = mds.newQueryBuilder().objectType("DataFile").build();
+			checkFailingQuery(mds, validDataSetQuery + DefaultMetadataQueryBuilder.SEPARATOR_STRING + "additionalTrailingToken");
+		}
+	}
+
+	@Test
+	public void testAnnotations() {
+		MetadataStore mds = getMetadataStore();
+
+		String annotationQueryString = mds.newQueryBuilder().objectType("Annotation").build();
+		checkQueryResults(mds, new String[] { "A profiling annotation", "A classification annotation", "A relationship annotation" }, annotationQueryString, false);
+		String analysisRunQuery = mds.newQueryBuilder().objectType("Annotation").simpleCondition("analysisRun", MetadataQueryBuilder.COMPARATOR.EQUALS, analysisRun).build();
+		checkQueryResults(mds, new String[] { "A profiling annotation", "A classification annotation", "A relationship annotation" }, analysisRunQuery, false);
+	}
+
+	@Test
+	public void testResetAllData() {
+		MetadataStore mds = getMetadataStore();
+		mds.resetAllData();
+		String emptyResultSet = mds.newQueryBuilder().objectType("MetaDataObject").build();
+		checkQueryResults(mds, new String[] {}, emptyResultSet, false);
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/6d19e129/odf/odf-core/src/test/java/org/apache/atlas/odf/core/integrationtest/metadata/WritableMetadataStoreTest.java
----------------------------------------------------------------------
diff --git a/odf/odf-core/src/test/java/org/apache/atlas/odf/core/integrationtest/metadata/WritableMetadataStoreTest.java b/odf/odf-core/src/test/java/org/apache/atlas/odf/core/integrationtest/metadata/WritableMetadataStoreTest.java
new file mode 100755
index 0000000..5012ab3
--- /dev/null
+++ b/odf/odf-core/src/test/java/org/apache/atlas/odf/core/integrationtest/metadata/WritableMetadataStoreTest.java
@@ -0,0 +1,24 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.atlas.odf.core.integrationtest.metadata;
+
+import org.apache.atlas.odf.api.ODFFactory;
+import org.apache.atlas.odf.api.metadata.MetadataStore;
+
+public class WritableMetadataStoreTest extends MetadataStoreTestBase{
+
+	protected MetadataStore getMetadataStore() {
+		return new ODFFactory().create().getMetadataStore();
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/6d19e129/odf/odf-core/src/test/java/org/apache/atlas/odf/core/integrationtest/metadata/importer/JDBCMetadataImporterTest.java
----------------------------------------------------------------------
diff --git a/odf/odf-core/src/test/java/org/apache/atlas/odf/core/integrationtest/metadata/importer/JDBCMetadataImporterTest.java b/odf/odf-core/src/test/java/org/apache/atlas/odf/core/integrationtest/metadata/importer/JDBCMetadataImporterTest.java
new file mode 100755
index 0000000..1f00a94
--- /dev/null
+++ b/odf/odf-core/src/test/java/org/apache/atlas/odf/core/integrationtest/metadata/importer/JDBCMetadataImporterTest.java
@@ -0,0 +1,214 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.atlas.odf.core.integrationtest.metadata.importer;
+
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.text.MessageFormat;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import org.apache.atlas.odf.api.ODFFactory;
+import org.apache.atlas.odf.api.metadata.MetaDataObjectReference;
+import org.apache.atlas.odf.api.metadata.MetadataQueryBuilder;
+import org.apache.atlas.odf.api.metadata.MetadataStore;
+import org.apache.atlas.odf.api.metadata.importer.JDBCMetadataImportResult;
+import org.apache.atlas.odf.api.metadata.importer.JDBCMetadataImporter;
+import org.apache.atlas.odf.core.ODFInternalFactory;
+import org.apache.atlas.odf.core.test.ODFTestBase;
+import org.apache.atlas.odf.json.JSONUtils;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import org.apache.atlas.odf.api.metadata.models.JDBCConnection;
+import org.apache.atlas.odf.api.metadata.models.Schema;
+import org.apache.atlas.odf.api.metadata.models.Column;
+import org.apache.atlas.odf.api.metadata.models.Database;
+import org.apache.atlas.odf.api.metadata.models.Table;
+
+public class JDBCMetadataImporterTest extends ODFTestBase {
+	static Logger logger = Logger.getLogger(JDBCMetadataImporterTest.class.getName());
+
+	static boolean testDBRan = false;
+	public static final String SOURCE_DB1 = "DBSAMPLE1";
+	public static final String SOURCE_DB2 = "DBSAMPLE2";
+	public static final String DATABASE1_NAME = SOURCE_DB1;
+	public static final String DATABASE2_NAME =SOURCE_DB2;
+	public static final String SCHEMA1_NAME = "APP1";
+	public static final String SCHEMA2_NAME = "APP2";
+	public static final String TABLE1_NAME = "EMPLOYEE" + System.currentTimeMillis();
+	public static final String TABLE2_NAME = "EMPLOYEE_SHORT" + System.currentTimeMillis();
+
+	@BeforeClass
+	public static void populateTestDB() throws Exception {
+		if (testDBRan) {
+			return;
+		}
+		createTestTables(SOURCE_DB1, SCHEMA1_NAME, TABLE1_NAME, TABLE2_NAME);
+		createTestTables(SOURCE_DB1, SCHEMA2_NAME, TABLE1_NAME, TABLE2_NAME);
+		// Switch table names so that the table named TABLE2_NAME has more columns in the SOURCE_DB2 than it has in SOURCE_DB1
+		createTestTables(SOURCE_DB2, SCHEMA1_NAME, TABLE2_NAME, TABLE1_NAME);
+		testDBRan = true;
+	}
+
+	private static String getConnectionUrl(String dbName) {
+		String dbDir = "/tmp/odf-derby/" + dbName;
+		String connectionURL = "jdbc:derby:" + dbDir + ";create=true";
+		return connectionURL;
+	}
+
+	private static void createTestTables(String dbName, String schemaName, String tableName1, String tableName2) throws Exception {
+		Connection conn = DriverManager.getConnection(getConnectionUrl(dbName));
+
+		String[] stats = new String[] {
+		"CREATE TABLE " + schemaName + "." + tableName1 + " (\r\n" + //
+				"		EMPNO CHAR(6) NOT NULL,\r\n" + //
+				"		FIRSTNME VARCHAR(12) NOT NULL,\r\n" + // 
+				"		MIDINIT CHAR(1),\r\n" + //
+				"		LASTNAME VARCHAR(15) NOT NULL,\r\n" + // 
+				"		WORKDEPT CHAR(3),\r\n" + //
+				"		PHONENO CHAR(4),\r\n" + //
+				"		HIREDATE DATE,\r\n" + //
+				"		JOB CHAR(8),\r\n" + //
+				"		EDLEVEL SMALLINT NOT NULL,\r\n" + // 
+				"		SEX CHAR(1),\r\n" + //
+				"		BIRTHDATE DATE,\r\n" + //
+				"		SALARY DECIMAL(9 , 2),\r\n" + // 
+				"		BONUS DECIMAL(9 , 2),\r\n" + //
+				"		COMM DECIMAL(9 , 2)\r\n" + //
+				"	)",			
+		"INSERT INTO " + schemaName + "." + tableName1 + " VALUES ('000010','CHRISTINE','I','HAAS','A00','3978','1995-01-01','PRES    ',18,'F','1963-08-24',152750.00,1000.00,4220.00)",
+		"INSERT INTO " + schemaName + "." + tableName1 + " VALUES ('000020','MICHAEL','L','THOMPSON','B01','3476','2003-10-10','MANAGER ',18,'M','1978-02-02',94250.00,800.00,3300.00)",
+		// Note that the 2nd table has a subset of the columns of the first table
+		"CREATE TABLE " + schemaName + "." + tableName2 + " (\r\n" + //
+				"		EMPNO CHAR(6) NOT NULL,\r\n" + //
+				"		FIRSTNME VARCHAR(12) NOT NULL,\r\n" + //
+				"		MIDINIT CHAR(1),\r\n" + //
+				"		LASTNAME VARCHAR(15) NOT NULL\r\n" + //
+				"	)",
+		"INSERT INTO " + schemaName + "." + tableName2 + " VALUES ('000010','CHRISTINE','I','HAAS')",
+		"INSERT INTO " + schemaName + "." + tableName2 + " VALUES ('000020','MICHAEL','L','THOMPSON')"
+		};
+
+		for (String stat : stats) {
+			boolean result = conn.createStatement().execute(stat);
+			logger.info("Result of statement: " + result);
+		}
+	}
+
+	private static void runTestImport(MetadataStore mds, String connectionDbName, String importDbName, String schemaName, String tableName) throws Exception {
+		populateTestDB();
+		JDBCMetadataImporter importer = new ODFInternalFactory().create(JDBCMetadataImporter.class);
+		JDBCConnection conn = new JDBCConnection();
+		conn.setJdbcConnectionString(getConnectionUrl(connectionDbName));
+		conn.setUser("dummyUser");
+		conn.setPassword("dummyPassword");
+		JDBCMetadataImportResult importResult = importer.importTables(conn, importDbName, schemaName, tableName);
+		Assert.assertTrue("JDBCMetadataImportResult does not refer to imported database.", importResult.getDatabaseName().equals(importDbName));
+		Assert.assertTrue("JDBCMetadataImportResult does not refer to imported table.", importResult.getTableNames().contains(schemaName + "." + tableName));
+	}
+
+	public static void runTestImport(MetadataStore mds) throws Exception {
+		runTestImport(mds, SOURCE_DB1, DATABASE1_NAME, SCHEMA1_NAME, TABLE1_NAME);
+	}
+
+	@Test
+	public void testSimpleImport() throws Exception {
+		MetadataStore ams = new ODFFactory().create().getMetadataStore();
+		ams.resetAllData();
+
+		List<String> expectedDatabases = new ArrayList<String>();
+		HashMap<String, List<String>> expectedSchemasForDatabase = new HashMap<String, List<String>>();
+		HashMap<String, List<String>> expectedTablesForSchema = new HashMap<String, List<String>>();
+		HashMap<String, List<String>> expectedColumnsForTable = new HashMap<String, List<String>>();
+
+		runTestImport(ams, SOURCE_DB1, DATABASE1_NAME, SCHEMA1_NAME, TABLE1_NAME);
+
+		expectedDatabases.add(DATABASE1_NAME);
+		expectedSchemasForDatabase.put(DATABASE1_NAME, new ArrayList<String>());
+		expectedSchemasForDatabase.get(DATABASE1_NAME).add(SCHEMA1_NAME);
+		expectedTablesForSchema.put(SCHEMA1_NAME, new ArrayList<String>());
+		expectedTablesForSchema.get(SCHEMA1_NAME).add(TABLE1_NAME);
+		expectedColumnsForTable.put(TABLE1_NAME, new ArrayList<String>());
+		expectedColumnsForTable.get(TABLE1_NAME).addAll(Arrays.asList(new String[] { "EMPNO", "FIRSTNME", "MIDINIT", "LASTNAME",
+				"WORKDEPT", "PHONENO", "HIREDATE", "JOB", "EDLEVEL", "SEX", "BIRTHDATE", "SALARY", "BONUS", "COMM" }));
+		validateImportedObjects(ams, expectedDatabases, expectedSchemasForDatabase, expectedTablesForSchema, expectedColumnsForTable);
+
+		// Add another table to an existing schema in an existing database
+		runTestImport(ams, SOURCE_DB1, DATABASE1_NAME, SCHEMA1_NAME, TABLE2_NAME);
+
+		expectedTablesForSchema.get(SCHEMA1_NAME).add(TABLE2_NAME);
+		expectedColumnsForTable.put(TABLE2_NAME, new ArrayList<String>());
+		expectedColumnsForTable.get(TABLE2_NAME).addAll(Arrays.asList(new String[] { "EMPNO", "FIRSTNME", "MIDINIT", "LASTNAME" }));
+		validateImportedObjects(ams, expectedDatabases, expectedSchemasForDatabase, expectedTablesForSchema, expectedColumnsForTable);
+
+		// Add another schema and table to an existing database
+		runTestImport(ams, SOURCE_DB1, DATABASE1_NAME, SCHEMA2_NAME, TABLE1_NAME);
+
+		expectedSchemasForDatabase.get(DATABASE1_NAME).add(SCHEMA2_NAME);
+		expectedTablesForSchema.put(SCHEMA2_NAME, new ArrayList<String>());
+		expectedTablesForSchema.get(SCHEMA2_NAME).add(TABLE1_NAME);
+		validateImportedObjects(ams, expectedDatabases, expectedSchemasForDatabase, expectedTablesForSchema, expectedColumnsForTable);
+
+		// Import TABLE2_NAME again from SOURCE_DB2 where it has more columns than in SOURCE_DB1
+		runTestImport(ams, SOURCE_DB2, DATABASE1_NAME, SCHEMA1_NAME, TABLE2_NAME);
+
+		// validate that additional columns have been added to the existing table object TABLE2_NAME.
+		expectedColumnsForTable.get(TABLE2_NAME).addAll(Arrays.asList(new String[] { "WORKDEPT", "PHONENO", "HIREDATE", "JOB", "EDLEVEL", "SEX", "BIRTHDATE", "SALARY", "BONUS", "COMM" }));
+		validateImportedObjects(ams, expectedDatabases, expectedSchemasForDatabase, expectedTablesForSchema, expectedColumnsForTable);
+	}
+
+	private void validateImportedObjects(MetadataStore mds, List<String> expectedDatabases, HashMap<String, List<String>> expectedSchemasForDatabase, HashMap<String,
+			List<String>> expectedTablesForSchema, HashMap<String, List<String>> expectedColumnsForTable) throws Exception{
+		for (String dbName : expectedDatabases) {
+			String query = mds.newQueryBuilder().objectType("Database").simpleCondition("name", MetadataQueryBuilder.COMPARATOR.EQUALS, dbName).build();
+			List<MetaDataObjectReference> dbs = mds.search(query);
+			Assert.assertEquals("Number of databases does not match expected value.", 1, dbs.size());
+			Database database = (Database) mds.retrieve(dbs.get(0));
+			logger.log(Level.INFO, MessageFormat.format("Reference ''{0}''.", JSONUtils.toJSON(database)));
+			int numberOfMatchingConnections = 0;
+			for (org.apache.atlas.odf.api.metadata.models.Connection con : mds.getConnections(database)) {
+				if (getConnectionUrl(database.getName()).equals(((JDBCConnection) mds.retrieve(con.getReference())).getJdbcConnectionString())) {
+					numberOfMatchingConnections++;
+				}
+			}
+			Assert.assertEquals("Number of matching JDBC connections does not match expected value.", 1, numberOfMatchingConnections);
+			List<String> actualSchemaNames = new ArrayList<String>();
+			for (Schema schema : mds.getSchemas(database)) {
+				actualSchemaNames.add(schema.getName());
+
+				List<String> actualTableNames = new ArrayList<String>();
+				for (Table table : mds.getTables(schema)) {
+					actualTableNames.add(table.getName());
+
+					List<String> actualColumnNames = new ArrayList<String>();
+					for (Column column : mds.getColumns(table)) {
+						actualColumnNames.add(column.getName());
+					}
+					Assert.assertTrue("Expected columns are missing from metadata store.", actualColumnNames.containsAll(expectedColumnsForTable.get(table.getName())));
+					Assert.assertTrue("Importer has not imported all expected columns.", expectedColumnsForTable.get(table.getName()).containsAll(actualColumnNames));
+				}
+				Assert.assertTrue("Expected tables are missing from metadata store.", actualTableNames.containsAll(expectedTablesForSchema.get(schema.getName())));
+				Assert.assertTrue("Importer has not imported all expected tables.", expectedTablesForSchema.get(schema.getName()).containsAll(actualTableNames));
+			}
+			Assert.assertTrue("Expected schemas are missing from metadata store.", actualSchemaNames.containsAll(expectedSchemasForDatabase.get(database.getName())));
+			Assert.assertTrue("Importer has not imported all expected schemas.", expectedSchemasForDatabase.get(database.getName()).containsAll(actualSchemaNames));
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/6d19e129/odf/odf-core/src/test/java/org/apache/atlas/odf/core/integrationtest/metadata/internal/spark/SparkDiscoveryServiceLocalTest.java
----------------------------------------------------------------------
diff --git a/odf/odf-core/src/test/java/org/apache/atlas/odf/core/integrationtest/metadata/internal/spark/SparkDiscoveryServiceLocalTest.java b/odf/odf-core/src/test/java/org/apache/atlas/odf/core/integrationtest/metadata/internal/spark/SparkDiscoveryServiceLocalTest.java
new file mode 100755
index 0000000..ec0aa9a
--- /dev/null
+++ b/odf/odf-core/src/test/java/org/apache/atlas/odf/core/integrationtest/metadata/internal/spark/SparkDiscoveryServiceLocalTest.java
@@ -0,0 +1,243 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.atlas.odf.core.integrationtest.metadata.internal.spark;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import org.apache.atlas.odf.api.analysis.AnalysisRequest;
+import org.apache.atlas.odf.api.analysis.AnalysisRequestStatus;
+import org.apache.atlas.odf.api.analysis.AnalysisResponse;
+import org.apache.atlas.odf.api.discoveryservice.DiscoveryServiceManager;
+import org.apache.atlas.odf.api.metadata.MetadataQueryBuilder;
+import org.apache.atlas.odf.api.metadata.MetadataStore;
+import org.apache.atlas.odf.api.settings.ODFSettings;
+import org.apache.atlas.odf.api.settings.SettingsManager;
+import org.apache.atlas.odf.json.JSONUtils;
+import org.apache.wink.json4j.JSONException;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import org.apache.atlas.odf.api.metadata.MetaDataObjectReference;
+import org.apache.atlas.odf.api.metadata.models.Annotation;
+import org.apache.atlas.odf.api.metadata.models.DataFile;
+import org.apache.atlas.odf.api.metadata.models.DataSet;
+import org.apache.atlas.odf.api.metadata.models.RelationalDataSet;
+import org.apache.atlas.odf.api.metadata.models.Table;
+import org.apache.atlas.odf.api.annotation.AnnotationStore;
+import org.apache.atlas.odf.api.ODFFactory;
+import org.apache.atlas.odf.api.analysis.AnalysisManager;
+import org.apache.atlas.odf.api.discoveryservice.DiscoveryServiceEndpoint;
+import org.apache.atlas.odf.api.discoveryservice.DiscoveryServiceProperties;
+import org.apache.atlas.odf.api.discoveryservice.DiscoveryServiceSparkEndpoint;
+import org.apache.atlas.odf.api.discoveryservice.ServiceNotFoundException;
+import org.apache.atlas.odf.api.discoveryservice.DiscoveryServiceSparkEndpoint.SERVICE_INTERFACE_TYPE;
+import org.apache.atlas.odf.api.settings.SparkConfig;
+import org.apache.atlas.odf.core.test.ODFTestBase;
+
+public class SparkDiscoveryServiceLocalTest extends ODFTestBase {
+	protected static Logger logger = Logger.getLogger(SparkDiscoveryServiceLocalTest.class.getName());
+	public static int WAIT_MS_BETWEEN_POLLING = 2000;
+	public static int MAX_NUMBER_OF_POLLS = 400;
+	public static String DISCOVERY_SERVICE_ID = "spark-summary-statistics-example-service";
+	public static String DASHDB_DB = "BLUDB";
+	public static String DASHDB_SCHEMA = "SAMPLES";
+	public static String DASHDB_TABLE = "CUST_RETENTION_LIFE_DURATION";
+	public static enum DATASET_TYPE {
+		FILE, TABLE
+	}
+
+	@BeforeClass
+	public static void createSampleData() throws Exception {
+		MetadataStore mds = new ODFFactory().create().getMetadataStore();
+		if (mds.search(mds.newQueryBuilder().objectType("DataFile").simpleCondition("name", MetadataQueryBuilder.COMPARATOR.EQUALS, "BankClientsShort").build()).size() == 0) {
+			mds.createSampleData();
+		}
+	}
+
+	public static SparkConfig getLocalSparkConfig() {
+		SparkConfig config = new SparkConfig();
+		config.setClusterMasterUrl("local");
+		return config;
+	}
+
+	public static DiscoveryServiceProperties getSparkSummaryStatisticsService() throws JSONException {
+		DiscoveryServiceProperties dsProperties = new DiscoveryServiceProperties();
+		dsProperties.setId(DISCOVERY_SERVICE_ID);
+		dsProperties.setName("Spark summary statistics service");
+		dsProperties.setDescription("Example discovery service calling summary statistics Spark application");
+		dsProperties.setCustomDescription("");
+		dsProperties.setIconUrl("spark.png");
+		dsProperties.setLink("http://www.spark.apache.org");
+		dsProperties.setPrerequisiteAnnotationTypes(null);
+		dsProperties.setResultingAnnotationTypes(null);
+		dsProperties.setSupportedObjectTypes(null);
+		dsProperties.setAssignedObjectTypes(null);
+		dsProperties.setAssignedObjectCandidates(null);
+		dsProperties.setParallelismCount(2);
+		DiscoveryServiceSparkEndpoint endpoint = new DiscoveryServiceSparkEndpoint();
+		endpoint.setJar("META-INF/spark/odf-spark-example-application-1.2.0-SNAPSHOT.jar");
+		endpoint.setClassName("org.apache.atlas.odf.core.spark.SummaryStatistics");
+		endpoint.setInputMethod(SERVICE_INTERFACE_TYPE.DataFrame);
+		dsProperties.setEndpoint(JSONUtils.convert(endpoint, DiscoveryServiceEndpoint.class));
+		return dsProperties;
+	}
+
+	public static DiscoveryServiceProperties getSparkDiscoveryServiceExample() throws JSONException {
+		DiscoveryServiceProperties dsProperties = new DiscoveryServiceProperties();
+		dsProperties.setId(DISCOVERY_SERVICE_ID);
+		dsProperties.setName("Spark summary statistics service");
+		dsProperties.setDescription("Example discovery service calling summary statistics Spark application");
+		dsProperties.setCustomDescription("");
+		dsProperties.setIconUrl("spark.png");
+		dsProperties.setLink("http://www.spark.apache.org");
+		dsProperties.setPrerequisiteAnnotationTypes(null);
+		dsProperties.setResultingAnnotationTypes(null);
+		dsProperties.setSupportedObjectTypes(null);
+		dsProperties.setAssignedObjectTypes(null);
+		dsProperties.setAssignedObjectCandidates(null);
+		dsProperties.setParallelismCount(2);
+		DiscoveryServiceSparkEndpoint endpoint = new DiscoveryServiceSparkEndpoint();
+		endpoint.setJar("META-INF/spark/odf-spark-example-application-1.2.0-SNAPSHOT.jar");
+		endpoint.setClassName("org.apache.atlas.odf.core.spark.SparkDiscoveryServiceExample");
+		endpoint.setInputMethod(SERVICE_INTERFACE_TYPE.Generic);
+		dsProperties.setEndpoint(JSONUtils.convert(endpoint, DiscoveryServiceEndpoint.class));
+		return dsProperties;
+	}
+
+	public static DataFile getTestDataFile(MetadataStore mds) {
+		DataFile dataSet = null;
+		List<MetaDataObjectReference> refs = mds.search(mds.newQueryBuilder().objectType("DataFile").build());
+		for (MetaDataObjectReference ref : refs) {
+			DataFile file = (DataFile) mds.retrieve(ref);
+			if (file.getName().equals("BankClientsShort")) {
+				dataSet = file;
+				break;
+			}
+		}
+		Assert.assertNotNull(dataSet);
+		logger.log(Level.INFO, "Testing Spark discovery service on metadata object {0} (ref: {1})", new Object[] { dataSet.getName(), dataSet.getReference() });
+		return dataSet;
+	}
+
+	public static Table getTestTable(MetadataStore mds) {
+		Table dataSet = null;
+		List<MetaDataObjectReference> refs = mds.search(mds.newQueryBuilder().objectType("Table").build());
+		for (MetaDataObjectReference ref : refs) {
+			Table table = (Table) mds.retrieve(ref);
+			if (table.getName().equals(DASHDB_TABLE)) {
+				dataSet = table;
+				break;
+			}
+		}
+		Assert.assertNotNull(dataSet);
+		logger.log(Level.INFO, "Testing Spark discovery service on metadata object {0} (ref: {1})", new Object[] { dataSet.getName(), dataSet.getReference() });
+		return dataSet;
+	}
+
+	public static AnalysisRequest getSparkAnalysisRequest(DataSet dataSet) {
+		AnalysisRequest request = new AnalysisRequest();
+		List<MetaDataObjectReference> dataSetRefs = new ArrayList<>();
+		dataSetRefs.add(dataSet.getReference());
+		request.setDataSets(dataSetRefs);
+		List<String> serviceIds = Arrays.asList(new String[]{DISCOVERY_SERVICE_ID});
+		request.setDiscoveryServiceSequence(serviceIds);
+		return request;
+	}
+
+	public void runSparkServiceTest(SparkConfig sparkConfig, DATASET_TYPE dataSetType, DiscoveryServiceProperties regInfo, String[] annotationNames) throws Exception{
+		logger.info("Using Spark configuration: " + JSONUtils.toJSON(sparkConfig));
+		SettingsManager config = new ODFFactory().create().getSettingsManager();
+		ODFSettings settings = config.getODFSettings();
+		settings.setSparkConfig(sparkConfig);
+		config.updateODFSettings(settings);
+
+		logger.info("Using discovery service: " + JSONUtils.toJSON(regInfo));
+		DiscoveryServiceManager discoveryServicesManager = new ODFFactory().create().getDiscoveryServiceManager();
+		AnalysisManager analysisManager = new ODFFactory().create().getAnalysisManager();
+
+		try {
+			discoveryServicesManager.deleteDiscoveryService(DISCOVERY_SERVICE_ID);
+		} catch(ServiceNotFoundException e) {
+			// Ignore exception because service may not exist
+		}
+		discoveryServicesManager.createDiscoveryService(regInfo);
+
+		MetadataStore mds = new ODFFactory().create().getMetadataStore();
+		Assert.assertNotNull(mds);
+		AnnotationStore as = new ODFFactory().create().getAnnotationStore();
+		Assert.assertNotNull(as);
+
+		RelationalDataSet dataSet = null;
+		if (dataSetType == DATASET_TYPE.FILE) {
+			dataSet = getTestDataFile(mds);
+		} else if (dataSetType == DATASET_TYPE.TABLE) {
+			dataSet = getTestTable(mds);
+		} else {
+			Assert.fail();
+		}
+
+		logger.info("Using dataset: " + JSONUtils.toJSON(dataSet));
+
+		AnalysisRequest request = getSparkAnalysisRequest(dataSet);
+		logger.info("Using analysis request: " + JSONUtils.toJSON(request));
+
+		logger.info("Starting analysis...");
+		AnalysisResponse response = analysisManager.runAnalysis(request);
+		Assert.assertNotNull(response);
+		String requestId = response.getId();
+		Assert.assertNotNull(requestId);
+		logger.info("Request id is " + requestId + ".");
+
+		logger.info("Waiting for request to finish");
+		AnalysisRequestStatus status = null;
+		int maxPolls = MAX_NUMBER_OF_POLLS;
+		do {
+			status = analysisManager.getAnalysisRequestStatus(requestId);
+			logger.log(Level.INFO, "Poll request for request ID ''{0}'', state: ''{1}'', details: ''{2}''", new Object[] { requestId, status.getState(), status.getDetails() });
+			maxPolls--;
+			try {
+				Thread.sleep(WAIT_MS_BETWEEN_POLLING);
+			} catch (InterruptedException e) {
+				logger.log(Level.INFO, "Exception thrown: ", e);
+			}
+		} while (maxPolls > 0 && (status.getState() == AnalysisRequestStatus.State.ACTIVE || status.getState() == AnalysisRequestStatus.State.QUEUED || status.getState() == AnalysisRequestStatus.State.NOT_FOUND));
+		if (maxPolls == 0) {
+			logger.log(Level.INFO, "Request ''{0}'' is not finished yet, don't wait for it", requestId);
+		}
+		Assert.assertEquals(AnalysisRequestStatus.State.FINISHED, status.getState());
+
+		List<Annotation> annots = as.getAnnotations(null, status.getRequest().getId());
+		logger.info("Number of annotations created: " + annots.size());
+		Assert.assertTrue("No annotations have been created.", annots.size() > 0);
+
+		logger.log(Level.INFO, "Request ''{0}'' is finished.", requestId);
+
+		discoveryServicesManager.deleteDiscoveryService(DISCOVERY_SERVICE_ID);
+	}
+
+	@Test
+	public void testLocalSparkClusterWithLocalDataFile() throws Exception{
+		runSparkServiceTest(getLocalSparkConfig(), DATASET_TYPE.FILE, getSparkSummaryStatisticsService(), new String[] { "SparkSummaryStatisticsAnnotation", "SparkTableAnnotation" });
+	}
+
+	@Test
+	public void testLocalSparkClusterWithLocalDataFileAndDiscoveryServiceRequest() throws Exception{
+		runSparkServiceTest(getLocalSparkConfig(), DATASET_TYPE.FILE, getSparkDiscoveryServiceExample(), new String[] { "SparkSummaryStatisticsAnnotation", "SparkTableAnnotation" });
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/6d19e129/odf/odf-core/src/test/java/org/apache/atlas/odf/core/integrationtest/metadata/models/CachedMetadataStoreTest.java
----------------------------------------------------------------------
diff --git a/odf/odf-core/src/test/java/org/apache/atlas/odf/core/integrationtest/metadata/models/CachedMetadataStoreTest.java b/odf/odf-core/src/test/java/org/apache/atlas/odf/core/integrationtest/metadata/models/CachedMetadataStoreTest.java
new file mode 100755
index 0000000..4168b0e
--- /dev/null
+++ b/odf/odf-core/src/test/java/org/apache/atlas/odf/core/integrationtest/metadata/models/CachedMetadataStoreTest.java
@@ -0,0 +1,54 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.atlas.odf.core.integrationtest.metadata.models;
+
+import java.util.logging.Logger;
+
+import org.apache.atlas.odf.core.integrationtest.metadata.MetadataStoreTestBase;
+import org.apache.atlas.odf.core.metadata.WritableMetadataStore;
+import org.apache.atlas.odf.core.test.ODFTestLogger;
+import org.junit.Test;
+
+import org.apache.atlas.odf.api.metadata.models.CachedMetadataStore;
+import org.apache.atlas.odf.api.metadata.models.DataFile;
+import org.apache.atlas.odf.api.metadata.models.DataFileFolder;
+import org.apache.atlas.odf.api.metadata.models.Database;
+import org.apache.atlas.odf.api.metadata.models.Table;
+import org.apache.atlas.odf.core.test.TimerTestBase;
+
+public class CachedMetadataStoreTest extends TimerTestBase {
+	static protected Logger logger = ODFTestLogger.get();
+
+	@Test
+	public void testMetaDataCache() throws Exception {
+		// Note that only a subset of the metadata store test cases are used here because the MetaDataCache does not support queries
+		WritableMetadataStore mds = MetadataStoreTestBase.getWritableMetadataStore();
+		mds.resetAllData();
+		mds.createSampleData();
+		MetadataStoreTestBase.createAdditionalTestData(mds);
+	
+		Database database = MetadataStoreTestBase.getDatabaseTestObject(mds);
+		MetadataStoreTestBase.checkReferences(new CachedMetadataStore(CachedMetadataStore.retrieveMetaDataCache(mds, database)), database);
+
+		Table table = MetadataStoreTestBase.getTableTestObject(mds);
+		MetadataStoreTestBase.checkReferences(new CachedMetadataStore(CachedMetadataStore.retrieveMetaDataCache(mds, table)), table); 
+
+		DataFileFolder folder = MetadataStoreTestBase.getDataFileFolderTestObject(mds);
+		MetadataStoreTestBase.checkReferences(new CachedMetadataStore(CachedMetadataStore.retrieveMetaDataCache(mds, folder)), folder);
+
+		DataFile file = MetadataStoreTestBase.getDataFileTestObject(mds);
+		MetadataStoreTestBase.checkReferences(mds, file);
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/6d19e129/odf/odf-core/src/test/java/org/apache/atlas/odf/core/test/ODFInternalFactoryTest.java
----------------------------------------------------------------------
diff --git a/odf/odf-core/src/test/java/org/apache/atlas/odf/core/test/ODFInternalFactoryTest.java b/odf/odf-core/src/test/java/org/apache/atlas/odf/core/test/ODFInternalFactoryTest.java
new file mode 100755
index 0000000..75d41c5
--- /dev/null
+++ b/odf/odf-core/src/test/java/org/apache/atlas/odf/core/test/ODFInternalFactoryTest.java
@@ -0,0 +1,58 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.atlas.odf.core.test;
+
+import static org.junit.Assert.assertNotNull;
+
+import java.util.logging.Logger;
+
+import org.apache.atlas.odf.core.ODFInternalFactory;
+import org.apache.atlas.odf.core.controlcenter.AnalysisRequestTrackerStore;
+import org.apache.atlas.odf.core.controlcenter.ExecutorServiceFactory;
+import org.apache.atlas.odf.core.messaging.DiscoveryServiceQueueManager;
+import org.junit.Test;
+
+import org.apache.atlas.odf.core.controlcenter.ControlCenter;
+import org.apache.atlas.odf.core.controlcenter.ThreadManager;
+import org.apache.atlas.odf.core.notification.NotificationManager;
+
+public class ODFInternalFactoryTest extends TimerTestBase {
+
+	Logger logger = ODFTestLogger.get();
+
+	@Test
+	public void testFactoryInstantiations() throws Exception {
+		try {
+			ODFInternalFactory factory = new ODFInternalFactory();
+			Class<?>[] interfaces = new Class<?>[] { //
+			DiscoveryServiceQueueManager.class, //
+					ControlCenter.class, //
+					AnalysisRequestTrackerStore.class, //
+					ThreadManager.class, //
+					ExecutorServiceFactory.class, //
+					NotificationManager.class, //
+					DiscoveryServiceQueueManager.class, //
+			};
+			for (Class<?> cl : interfaces) {
+				Object o = factory.create(cl);
+				assertNotNull(o);
+				logger.info("Object created for class " + cl.getName() + ": " + o.getClass().getName());
+			}
+		} catch (Exception e) {
+			e.printStackTrace();
+			throw e;
+		}
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/6d19e129/odf/odf-core/src/test/java/org/apache/atlas/odf/core/test/ODFTestBase.java
----------------------------------------------------------------------
diff --git a/odf/odf-core/src/test/java/org/apache/atlas/odf/core/test/ODFTestBase.java b/odf/odf-core/src/test/java/org/apache/atlas/odf/core/test/ODFTestBase.java
new file mode 100755
index 0000000..867f0a9
--- /dev/null
+++ b/odf/odf-core/src/test/java/org/apache/atlas/odf/core/test/ODFTestBase.java
@@ -0,0 +1,67 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.atlas.odf.core.test;
+
+import java.util.logging.Logger;
+
+import org.apache.atlas.odf.api.engine.SystemHealth;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import org.apache.atlas.odf.api.ODFFactory;
+import org.apache.atlas.odf.api.engine.EngineManager;
+
+/**
+ * All JUnit test cases that require proper Kafka setup should inherit from this class.
+ * 
+ *
+ */
+public class ODFTestBase extends TimerTestBase {
+
+	static protected Logger log = ODFTestLogger.get();
+	@Test
+	public void testHealth() {
+		testHealth(true);
+	}
+
+	private void testHealth(boolean kafkaRunning) {
+		log.info("Starting health check...");
+		EngineManager engineManager = new ODFFactory().create().getEngineManager();
+		SystemHealth health = engineManager.checkHealthStatus();
+		if (!kafkaRunning) {
+			Assert.assertEquals(SystemHealth.HealthStatus.ERROR, health.getStatus());
+		} else {
+			Assert.assertEquals(SystemHealth.HealthStatus.OK, health.getStatus());
+		}
+		log.info("Health check finished");
+	}
+
+	@BeforeClass
+	public static void startup() throws Exception {
+		TestEnvironment.startAll();
+	}
+
+	@Before
+	public void setup() throws Exception {
+		testHealth(true);
+	}
+
+	@After
+	public void tearDown() throws Exception {
+		testHealth(true);
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/6d19e129/odf/odf-core/src/test/java/org/apache/atlas/odf/core/test/ODFTestLogger.java
----------------------------------------------------------------------
diff --git a/odf/odf-core/src/test/java/org/apache/atlas/odf/core/test/ODFTestLogger.java b/odf/odf-core/src/test/java/org/apache/atlas/odf/core/test/ODFTestLogger.java
new file mode 100755
index 0000000..a845157
--- /dev/null
+++ b/odf/odf-core/src/test/java/org/apache/atlas/odf/core/test/ODFTestLogger.java
@@ -0,0 +1,24 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.atlas.odf.core.test;
+
+import java.util.logging.Logger;
+
+public class ODFTestLogger {
+	
+	public static Logger get() {
+		return Logger.getLogger(ODFTestLogger.class.getName());
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/6d19e129/odf/odf-core/src/test/java/org/apache/atlas/odf/core/test/ODFTestcase.java
----------------------------------------------------------------------
diff --git a/odf/odf-core/src/test/java/org/apache/atlas/odf/core/test/ODFTestcase.java b/odf/odf-core/src/test/java/org/apache/atlas/odf/core/test/ODFTestcase.java
new file mode 100755
index 0000000..525dc83
--- /dev/null
+++ b/odf/odf-core/src/test/java/org/apache/atlas/odf/core/test/ODFTestcase.java
@@ -0,0 +1,27 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.atlas.odf.core.test;
+
+import org.junit.BeforeClass;
+
+import org.apache.atlas.odf.api.ODFFactory;
+
+public class ODFTestcase extends TimerTestBase {
+	@BeforeClass
+	public static void setupBeforeClass() {
+		TestEnvironment.startAll();
+		// Initialize analysis manager
+		new ODFFactory().create().getAnalysisManager();
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/6d19e129/odf/odf-core/src/test/java/org/apache/atlas/odf/core/test/TestEnvironment.java
----------------------------------------------------------------------
diff --git a/odf/odf-core/src/test/java/org/apache/atlas/odf/core/test/TestEnvironment.java b/odf/odf-core/src/test/java/org/apache/atlas/odf/core/test/TestEnvironment.java
new file mode 100755
index 0000000..06d407e
--- /dev/null
+++ b/odf/odf-core/src/test/java/org/apache/atlas/odf/core/test/TestEnvironment.java
@@ -0,0 +1,67 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.atlas.odf.core.test;
+
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import org.apache.atlas.odf.core.Environment;
+import org.apache.atlas.odf.core.ODFInitializer;
+import org.apache.atlas.odf.core.ODFInternalFactory;
+
+/**
+ * The class can be used to start components required for testing.
+ *
+ *
+ */
+public class TestEnvironment {
+
+	static Logger logger = Logger.getLogger(TestEnvironment.class.getName());
+
+	public static String MESSAGING_CLASS = "org.apache.atlas.odf.core.test.messaging.kafka.TestEnvironmentMessagingInitializer";
+
+	public static <T> T createObject(String className, Class<T> clazz) {
+		ClassLoader cl = TestEnvironment.class.getClassLoader();
+		// messaging
+		try {
+			Class<?> tei = cl.loadClass(className);
+			return (T) tei.newInstance();
+		} catch (Exception exc) {
+			logger.log(Level.WARNING, "An exception occurred when starting the messaging test environment", exc);
+		}
+		return null;
+	}
+
+	public static void start(String className) {
+		TestEnvironmentInitializer initializer = createObject(className, TestEnvironmentInitializer.class);
+		if (initializer != null) {
+			initializer.start();
+		}
+	}
+
+	public static void startMessaging() {
+		if ("true".equals(new ODFInternalFactory().create(Environment.class).getProperty("odf.dont.start.messaging"))) {
+			// do nothing
+			logger.info("Messaging test environment not started because environment variable odf.dont.start.messaging is set");
+		} else {
+			start(MESSAGING_CLASS);
+		}
+	}
+
+	public static void startAll() {
+		startMessaging();
+		ODFInitializer.start();
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/6d19e129/odf/odf-core/src/test/java/org/apache/atlas/odf/core/test/TestEnvironmentInitializer.java
----------------------------------------------------------------------
diff --git a/odf/odf-core/src/test/java/org/apache/atlas/odf/core/test/TestEnvironmentInitializer.java b/odf/odf-core/src/test/java/org/apache/atlas/odf/core/test/TestEnvironmentInitializer.java
new file mode 100755
index 0000000..b4a0022
--- /dev/null
+++ b/odf/odf-core/src/test/java/org/apache/atlas/odf/core/test/TestEnvironmentInitializer.java
@@ -0,0 +1,22 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.atlas.odf.core.test;
+
+public interface TestEnvironmentInitializer {
+	void start();
+	
+	void stop();
+	
+	String getName();
+}