You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@atlas.apache.org by ma...@apache.org on 2017/06/28 05:57:25 UTC
[12/25] incubator-atlas git commit: ATLAS-1898: initial commit of ODF
http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/6d19e129/odf/odf-core/src/main/resources/org/apache/atlas/odf/core/metadata/internal/sampledata/sample-data-toc.properties
----------------------------------------------------------------------
diff --git a/odf/odf-core/src/main/resources/org/apache/atlas/odf/core/metadata/internal/sampledata/sample-data-toc.properties b/odf/odf-core/src/main/resources/org/apache/atlas/odf/core/metadata/internal/sampledata/sample-data-toc.properties
new file mode 100755
index 0000000..2bf7347
--- /dev/null
+++ b/odf/odf-core/src/main/resources/org/apache/atlas/odf/core/metadata/internal/sampledata/sample-data-toc.properties
@@ -0,0 +1,17 @@
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+# This file contains a list of the ODF sample data files
+simple-example-table.csv=
+simple-example-document.txt=
+bank-clients-short.csv=
http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/6d19e129/odf/odf-core/src/main/resources/org/apache/atlas/odf/core/metadata/internal/sampledata/simple-example-document.txt
----------------------------------------------------------------------
diff --git a/odf/odf-core/src/main/resources/org/apache/atlas/odf/core/metadata/internal/sampledata/simple-example-document.txt b/odf/odf-core/src/main/resources/org/apache/atlas/odf/core/metadata/internal/sampledata/simple-example-document.txt
new file mode 100755
index 0000000..6bdeca2
--- /dev/null
+++ b/odf/odf-core/src/main/resources/org/apache/atlas/odf/core/metadata/internal/sampledata/simple-example-document.txt
@@ -0,0 +1 @@
+This is a simple example text.
http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/6d19e129/odf/odf-core/src/main/resources/org/apache/atlas/odf/core/metadata/internal/sampledata/simple-example-table.csv
----------------------------------------------------------------------
diff --git a/odf/odf-core/src/main/resources/org/apache/atlas/odf/core/metadata/internal/sampledata/simple-example-table.csv b/odf/odf-core/src/main/resources/org/apache/atlas/odf/core/metadata/internal/sampledata/simple-example-table.csv
new file mode 100755
index 0000000..adbd1ab
--- /dev/null
+++ b/odf/odf-core/src/main/resources/org/apache/atlas/odf/core/metadata/internal/sampledata/simple-example-table.csv
@@ -0,0 +1,4 @@
+OMColumnName1,OMColumnName2
+aaaa,1
+bbbb,2
+cccc,3
http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/6d19e129/odf/odf-core/src/main/resources/org/apache/atlas/odf/core/odfversion.txt
----------------------------------------------------------------------
diff --git a/odf/odf-core/src/main/resources/org/apache/atlas/odf/core/odfversion.txt b/odf/odf-core/src/main/resources/org/apache/atlas/odf/core/odfversion.txt
new file mode 100755
index 0000000..27b38ad
--- /dev/null
+++ b/odf/odf-core/src/main/resources/org/apache/atlas/odf/core/odfversion.txt
@@ -0,0 +1,14 @@
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+1.2.0-SNAPSHOT
http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/6d19e129/odf/odf-core/src/test/java/org/apache/atlas/odf/core/integrationtest/ODFAPITestWithMetadataStoreBase.java
----------------------------------------------------------------------
diff --git a/odf/odf-core/src/test/java/org/apache/atlas/odf/core/integrationtest/ODFAPITestWithMetadataStoreBase.java b/odf/odf-core/src/test/java/org/apache/atlas/odf/core/integrationtest/ODFAPITestWithMetadataStoreBase.java
new file mode 100755
index 0000000..587ae30
--- /dev/null
+++ b/odf/odf-core/src/test/java/org/apache/atlas/odf/core/integrationtest/ODFAPITestWithMetadataStoreBase.java
@@ -0,0 +1,136 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.atlas.odf.core.integrationtest;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.logging.Level;
+
+import org.apache.atlas.odf.api.analysis.AnalysisManager;
+import org.apache.atlas.odf.api.analysis.AnalysisRequest;
+import org.apache.atlas.odf.api.analysis.AnalysisRequestStatus;
+import org.apache.atlas.odf.api.analysis.AnalysisResponse;
+import org.apache.atlas.odf.api.metadata.MetaDataObjectReference;
+import org.apache.atlas.odf.api.metadata.MetadataStore;
+import org.apache.atlas.odf.api.metadata.models.Annotation;
+import org.apache.atlas.odf.api.metadata.models.MetaDataObject;
+import org.apache.atlas.odf.core.ODFInternalFactory;
+import org.apache.atlas.odf.core.configuration.ConfigContainer;
+import org.apache.atlas.odf.json.JSONUtils;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.BeforeClass;
+
+import org.apache.atlas.odf.api.ODFFactory;
+import org.apache.atlas.odf.api.annotation.AnnotationStore;
+import org.apache.atlas.odf.api.annotation.AnnotationStoreUtils;
+import org.apache.atlas.odf.core.configuration.ConfigManager;
+import org.apache.atlas.odf.core.test.ODFTestBase;
+import org.apache.atlas.odf.core.test.discoveryservice.TestSyncDiscoveryServiceWritingAnnotations1;
+
+public class ODFAPITestWithMetadataStoreBase extends ODFTestBase {
+
+ @Before
+ public void createSampleData() throws Exception {
+ MetadataStore mds = new ODFFactory().create().getMetadataStore();
+ mds.resetAllData();
+ mds.createSampleData();
+ }
+
+ @BeforeClass
+ public static void registerServices() throws Exception {
+ ConfigContainer config = JSONUtils.readJSONObjectFromFileInClasspath(ConfigContainer.class, "org/apache/atlas/odf/core/test/internal/odf-initial-configuration.json",
+ ODFAPITestWithMetadataStoreBase.class.getClassLoader());
+ ConfigManager configManager = new ODFInternalFactory().create(ConfigManager.class);
+ configManager.updateConfigContainer(config);
+ }
+
+ protected List<MetaDataObjectReference> getTables(MetadataStore mds) {
+ List<MetaDataObjectReference> dataSets = mds.search(mds.newQueryBuilder().objectType("DataFile").build());
+ Assert.assertTrue(dataSets.size() > 0);
+ // take only maximal 5 data sets
+ int MAX_DATASETS = 5;
+ if (dataSets.size() > MAX_DATASETS) {
+ dataSets = dataSets.subList(0, MAX_DATASETS);
+ }
+ return dataSets;
+ }
+
+ public String test(String dsId, List<MetaDataObjectReference> dataSets, AnalysisRequestStatus.State expectedFinalState, boolean requestIsInvalid, String correlationId) throws Exception {
+ log.log(Level.INFO, "Testing ODF with metadata store. Discovery service Id: {0}, dataSets: {1}, expected state: {2}, correlationId: {3}, should request be invalid: {4}", new Object[] { dsId,
+ dataSets, expectedFinalState, correlationId, requestIsInvalid });
+ MetadataStore mds = new ODFFactory().create().getMetadataStore();
+ Assert.assertTrue(dataSets.size() > 0);
+
+ Assert.assertNotNull(mds);
+ AnalysisRequest request = new AnalysisRequest();
+ request.setDiscoveryServiceSequence(Collections.singletonList(dsId));
+ request.setDataSets(dataSets);
+ Map<String, Object> additionalProps = new HashMap<String, Object>();
+ additionalProps.put(TestSyncDiscoveryServiceWritingAnnotations1.REQUEST_PROPERTY_CORRELATION_ID, correlationId);
+ request.setAdditionalProperties(additionalProps);
+ AnalysisManager analysisManager = new ODFFactory().create().getAnalysisManager();
+ AnalysisResponse resp = analysisManager.runAnalysis(request);
+
+ log.info("Analysis started on data sets: " + dataSets + ", response: " + JSONUtils.toJSON(resp));
+ log.info("Response message: " + resp.getDetails());
+ if (requestIsInvalid) {
+ Assert.assertTrue(resp.isInvalidRequest());
+ return null;
+ }
+
+ Assert.assertFalse(resp.isInvalidRequest());
+ String id = resp.getId();
+ AnalysisRequestStatus status = null;
+ int maxPolls = 100;
+ do {
+ status = analysisManager.getAnalysisRequestStatus(id);
+ log.log(Level.INFO, "Poll request for request ID ''{0}'' (expected state: ''{3}''): state: ''{1}'', details: ''{2}''", new Object[] { id, status.getState(), status.getDetails(),
+ expectedFinalState });
+ maxPolls--;
+ Thread.sleep(1000);
+ } while (maxPolls > 0 && (status.getState() == AnalysisRequestStatus.State.ACTIVE || status.getState() == AnalysisRequestStatus.State.QUEUED));
+ log.log(Level.INFO, "Expected state: {0}, actual state: {1}", new Object[] { expectedFinalState, status.getState() });
+ Assert.assertEquals(expectedFinalState, status.getState());
+ return resp.getId();
+ }
+
+ public void checkMostRecentAnnotations(MetadataStore mds, AnnotationStore as, MetaDataObjectReference ref) {
+ Map<MetaDataObjectReference, MetaDataObject> ref2Retrieved = new HashMap<>();
+ for (Annotation annot : as.getAnnotations(ref, null)) {
+ ref2Retrieved.put(annot.getReference(), annot);
+ }
+
+ List<Annotation> mostRecentAnnotations = AnnotationStoreUtils.getMostRecentAnnotationsByType(as, ref);
+ Assert.assertNotNull(mostRecentAnnotations);
+ Assert.assertTrue(mostRecentAnnotations.size() <= ref2Retrieved.size());
+ Set<MetaDataObjectReference> mostRecentAnnoationRefs = new HashSet<>();
+ Set<String> annotationTypes = new HashSet<>();
+ for (Annotation annot : mostRecentAnnotations) {
+ // every annotation type occurs at most once
+ Assert.assertFalse( annotationTypes.contains(annot.getAnnotationType()));
+ mostRecentAnnoationRefs.add(annot.getReference());
+ annotationTypes.add(annot.getAnnotationType());
+ }
+
+ // all most recent annotations are a subset of all annotations
+ Assert.assertTrue(ref2Retrieved.keySet().containsAll(mostRecentAnnoationRefs));
+
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/6d19e129/odf/odf-core/src/test/java/org/apache/atlas/odf/core/integrationtest/ODFAPITestWithMetadataStoreExtendedAnnotations.java
----------------------------------------------------------------------
diff --git a/odf/odf-core/src/test/java/org/apache/atlas/odf/core/integrationtest/ODFAPITestWithMetadataStoreExtendedAnnotations.java b/odf/odf-core/src/test/java/org/apache/atlas/odf/core/integrationtest/ODFAPITestWithMetadataStoreExtendedAnnotations.java
new file mode 100755
index 0000000..f0742aa
--- /dev/null
+++ b/odf/odf-core/src/test/java/org/apache/atlas/odf/core/integrationtest/ODFAPITestWithMetadataStoreExtendedAnnotations.java
@@ -0,0 +1,74 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.atlas.odf.core.integrationtest;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import org.apache.atlas.odf.api.metadata.MetaDataObjectReference;
+import org.apache.atlas.odf.api.metadata.MetadataStore;
+import org.apache.atlas.odf.api.metadata.models.Annotation;
+import org.apache.atlas.odf.api.analysis.AnalysisRequestStatus.State;
+import org.apache.atlas.odf.api.annotation.AnnotationStore;
+import org.apache.atlas.odf.api.ODFFactory;
+import org.apache.atlas.odf.core.test.annotation.TestSyncDiscoveryServiceWritingExtendedAnnotations.MyObject;
+import org.apache.atlas.odf.core.test.annotation.TestSyncDiscoveryServiceWritingExtendedAnnotations.MyOtherObject;
+import org.apache.atlas.odf.core.test.annotation.TestSyncDiscoveryServiceWritingExtendedAnnotations.SyncDiscoveryServiceAnnotation;
+import org.apache.atlas.odf.json.JSONUtils;
+
+public class ODFAPITestWithMetadataStoreExtendedAnnotations extends ODFAPITestWithMetadataStoreBase {
+
+ @Test
+ public void testSuccessSyncExtendedAnnotations() throws Exception {
+ MetadataStore mds = new ODFFactory().create().getMetadataStore();
+ AnnotationStore as = new ODFFactory().create().getAnnotationStore();
+ List<MetaDataObjectReference> dataSets = getTables(mds);
+ String dsID = "synctestservice-with-extendedannotations";
+
+ String requestId = test(dsID, dataSets, State.FINISHED, false, null);
+
+ log.info("Checking if extended annotations exist for request ID: " + requestId);
+ for (MetaDataObjectReference dataSet : dataSets) {
+ List<SyncDiscoveryServiceAnnotation> annotations = new ArrayList<>();
+ List<Annotation> annots = as.getAnnotations(dataSet, null);
+ Assert.assertTrue(annots.size() >= 2);
+
+ for (Annotation annot : annots) {
+ Assert.assertNotNull(annot);
+ if (annot.getAnalysisRun().equals(requestId)) {
+ log.info("Found annotation: " + annot + ", json: " + JSONUtils.toJSON(annot));
+ Assert.assertNotNull(annot);
+ Assert.assertEquals(SyncDiscoveryServiceAnnotation.class, annot.getClass());
+ SyncDiscoveryServiceAnnotation extAnnot = (SyncDiscoveryServiceAnnotation) annot;
+ Assert.assertNotNull(extAnnot.getProp1());
+ Assert.assertEquals(extAnnot.getProp1().hashCode(), extAnnot.getProp2());
+ MyObject mo = extAnnot.getProp3();
+ Assert.assertNotNull(mo);
+ Assert.assertEquals("nested" + extAnnot.getProp1(), mo.getAnotherProp());
+
+ MyOtherObject moo = mo.getYetAnotherProp();
+ Assert.assertNotNull(moo);
+ Assert.assertEquals("nestedtwolevels" + extAnnot.getProp1(), moo.getMyOtherObjectProperty());
+ annotations.add(extAnnot);
+ }
+ }
+ Assert.assertEquals(2, annotations.size());
+ // TODO check annotations list
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/6d19e129/odf/odf-core/src/test/java/org/apache/atlas/odf/core/integrationtest/ODFAPITestWithMetadataStoreJsonAnnotation.java
----------------------------------------------------------------------
diff --git a/odf/odf-core/src/test/java/org/apache/atlas/odf/core/integrationtest/ODFAPITestWithMetadataStoreJsonAnnotation.java b/odf/odf-core/src/test/java/org/apache/atlas/odf/core/integrationtest/ODFAPITestWithMetadataStoreJsonAnnotation.java
new file mode 100755
index 0000000..e47b316
--- /dev/null
+++ b/odf/odf-core/src/test/java/org/apache/atlas/odf/core/integrationtest/ODFAPITestWithMetadataStoreJsonAnnotation.java
@@ -0,0 +1,69 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.atlas.odf.core.integrationtest;
+
+import java.util.List;
+import java.util.logging.Logger;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import org.apache.atlas.odf.api.metadata.MetaDataObjectReference;
+import org.apache.atlas.odf.api.metadata.MetadataStore;
+import org.apache.atlas.odf.api.metadata.models.Annotation;
+import org.apache.atlas.odf.api.ODFFactory;
+import org.apache.atlas.odf.core.Utils;
+import org.apache.atlas.odf.api.analysis.AnalysisRequestStatus.State;
+import org.apache.atlas.odf.api.annotation.AnnotationStore;
+import org.apache.atlas.odf.core.test.ODFTestLogger;
+import org.apache.atlas.odf.json.JSONUtils;
+
+public class ODFAPITestWithMetadataStoreJsonAnnotation extends ODFAPITestWithMetadataStoreBase {
+
+ Logger logger = ODFTestLogger.get();
+
+ String expectedJson = Utils.getInputStreamAsString(this.getClass().getClassLoader().getResourceAsStream("org/apache/atlas/odf/core/integrationtest/metadata/internal/atlas/nested_annotation_example.json"), "UTF-8");
+
+ @Test
+ public void testSuccessSyncJsonAnnotations() throws Exception {
+
+ MetadataStore mds = new ODFFactory().create().getMetadataStore();
+ AnnotationStore as = new ODFFactory().create().getAnnotationStore();
+ List<MetaDataObjectReference> dataSets = getTables(mds);
+ String dsID = "synctestservice-with-json-annotations";
+
+ String requestId = test(dsID, dataSets, State.FINISHED, false, null);
+
+ log.info("Checking if annotations exist for request ID: " + requestId);
+ int numMatchingAnnotations = 0;
+ for (MetaDataObjectReference dataSet : dataSets) {
+ List<Annotation> annotationRefs = as.getAnnotations(dataSet, null);
+ Assert.assertTrue(annotationRefs.size() >= 1);
+ for (Annotation annot : annotationRefs) {
+ Assert.assertNotNull(annot);
+ if (annot.getAnalysisRun().equals(requestId)) {
+ log.info("Found annotation: " + annot + ", json: " + JSONUtils.toJSON(annot));
+ Assert.assertNotNull(annot);
+ String jsonProperties = annot.getJsonProperties();
+ Assert.assertNotNull(jsonProperties);
+ logger.info("Actual annotation string: " + jsonProperties + ". Expected json: " + expectedJson);
+ Assert.assertEquals(expectedJson, jsonProperties);
+ numMatchingAnnotations++;
+ }
+ }
+// Assert.assertEquals(1, numMatchingAnnotations);
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/6d19e129/odf/odf-core/src/test/java/org/apache/atlas/odf/core/integrationtest/ODFAPITestWithMetadataStoreSimple.java
----------------------------------------------------------------------
diff --git a/odf/odf-core/src/test/java/org/apache/atlas/odf/core/integrationtest/ODFAPITestWithMetadataStoreSimple.java b/odf/odf-core/src/test/java/org/apache/atlas/odf/core/integrationtest/ODFAPITestWithMetadataStoreSimple.java
new file mode 100755
index 0000000..6b7c9b9
--- /dev/null
+++ b/odf/odf-core/src/test/java/org/apache/atlas/odf/core/integrationtest/ODFAPITestWithMetadataStoreSimple.java
@@ -0,0 +1,134 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.atlas.odf.core.integrationtest;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.UUID;
+
+import org.apache.atlas.odf.api.analysis.AnalysisRequestStatus;
+import org.apache.atlas.odf.api.annotation.AnnotationStore;
+import org.apache.atlas.odf.api.metadata.MetaDataObjectReference;
+import org.apache.atlas.odf.api.metadata.MetadataStore;
+import org.apache.atlas.odf.api.metadata.models.Annotation;
+import org.apache.atlas.odf.core.test.ODFTestBase;
+import org.apache.atlas.odf.core.test.discoveryservice.TestSyncDiscoveryServiceWritingAnnotations1;
+import org.apache.wink.json4j.JSONObject;
+import org.junit.Assert;
+import org.junit.Test;
+
+import org.apache.atlas.odf.api.ODFFactory;
+
+public class ODFAPITestWithMetadataStoreSimple extends ODFAPITestWithMetadataStoreBase {
+
+ public ODFAPITestWithMetadataStoreSimple() {
+ ODFTestBase.log.info("Classpath: " + System.getProperty("java.class.path"));
+ }
+
+ @Test
+ public void testSuccessASync() throws Exception {
+ testSuccess("asynctestservice-with-annotations");
+ }
+
+ @Test
+ public void testSuccessSync() throws Exception {
+ testSuccess("synctestservice-with-annotations");
+ }
+
+ void testSuccess(String dsId) throws Exception {
+ MetadataStore mds = new ODFFactory().create().getMetadataStore();
+ AnnotationStore as = new ODFFactory().create().getAnnotationStore();
+ List<MetaDataObjectReference> dataSets = getTables(mds);
+
+ String correlationId = UUID.randomUUID().toString();
+
+ String requestId = test(dsId, dataSets, AnalysisRequestStatus.State.FINISHED, false, correlationId);
+ Thread.sleep(3000); // give time for notifications to arrive
+
+ List<MetaDataObjectReference> annotationsOfThisRun = new ArrayList<>();
+
+ ODFTestBase.log.info("Checking if annotations exist");
+ for (MetaDataObjectReference dataSet : dataSets) {
+ List<Annotation> retrievedAnnotations = as.getAnnotations(dataSet, null);
+ Assert.assertTrue(retrievedAnnotations.size() > 0);
+ List<Annotation> annotations = new ArrayList<>();
+ for (Annotation annot : retrievedAnnotations) {
+ Assert.assertNotNull(annot);
+ Assert.assertNotNull(annot.getAnalysisRun());
+ if (annot.getAnalysisRun().equals(requestId)) {
+ annotationsOfThisRun.add(annot.getReference());
+ Assert.assertNotNull(annot.getJsonProperties());
+ JSONObject props = new JSONObject(annot.getJsonProperties());
+ if (props != null) {
+ String annotCorrId = (String) props.get(TestSyncDiscoveryServiceWritingAnnotations1.REQUEST_PROPERTY_CORRELATION_ID);
+ if (annotCorrId != null) {
+ Assert.assertNotNull(annot.getAnnotationType());
+ }
+ }
+ annotations.add(annot);
+ }
+ }
+ ODFTestBase.log.info("Checking that annotation notifications were received");
+ // check that we got notified of all annotations
+
+ // assume at least that those new annotations were created
+ Assert.assertTrue(TestSyncDiscoveryServiceWritingAnnotations1.getNumberOfAnnotations() <= annotations.size());
+ int found = 0;
+ for (int i = 0; i < TestSyncDiscoveryServiceWritingAnnotations1.getNumberOfAnnotations(); i++) {
+ String[] annotValues = TestSyncDiscoveryServiceWritingAnnotations1.getPropsOfNthAnnotation(i);
+ for (Annotation annotation : annotations) {
+ if (annotation.getAnnotationType() != null) {
+ if (annotation.getAnnotationType().equals(annotValues[0])) {
+ JSONObject jo = new JSONObject(annotation.getJsonProperties());
+ String foundCorrelationId = (String) jo.get(TestSyncDiscoveryServiceWritingAnnotations1.REQUEST_PROPERTY_CORRELATION_ID);
+ // only look at those where the correlation ID property is set
+ if (correlationId.equals(foundCorrelationId)) {
+ String val = (String) jo.get(annotValues[1]);
+ Assert.assertEquals(annotValues[2], val);
+ Assert.assertEquals(requestId, annotation.getAnalysisRun());
+ // annotation types and the JSON properties match
+ found++;
+ }
+ }
+ }
+ }
+ }
+ // assert that we have found all and not more
+ Assert.assertEquals(TestSyncDiscoveryServiceWritingAnnotations1.getNumberOfAnnotations(), found);
+
+ checkMostRecentAnnotations(mds, new ODFFactory().create().getAnnotationStore(), dataSet);
+ }
+ }
+
+
+
+ @Test
+ public void testFailureASync() throws Exception {
+ testFailure("asynctestservice-with-annotations");
+ }
+
+ @Test
+ public void testFailureSync() throws Exception {
+ testFailure("synctestservice-with-annotations");
+ }
+
+ void testFailure(String dsId) throws Exception {
+ MetaDataObjectReference invalidRef = new MetaDataObjectReference();
+ invalidRef.setId("error-this-is-hopefully-an-invalid-id");
+ List<MetaDataObjectReference> dataSets = Collections.singletonList(invalidRef);
+ test(dsId, dataSets, AnalysisRequestStatus.State.ERROR, true, UUID.randomUUID().toString());
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/6d19e129/odf/odf-core/src/test/java/org/apache/atlas/odf/core/integrationtest/connectivity/DataSetRetrieverTest.java
----------------------------------------------------------------------
diff --git a/odf/odf-core/src/test/java/org/apache/atlas/odf/core/integrationtest/connectivity/DataSetRetrieverTest.java b/odf/odf-core/src/test/java/org/apache/atlas/odf/core/integrationtest/connectivity/DataSetRetrieverTest.java
new file mode 100755
index 0000000..af70b5a
--- /dev/null
+++ b/odf/odf-core/src/test/java/org/apache/atlas/odf/core/integrationtest/connectivity/DataSetRetrieverTest.java
@@ -0,0 +1,92 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.atlas.odf.core.integrationtest.connectivity;
+
+import java.sql.ResultSet;
+import java.util.List;
+import java.util.logging.Logger;
+
+import org.apache.atlas.odf.api.metadata.MetadataStore;
+import org.apache.atlas.odf.api.metadata.models.Table;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import org.apache.atlas.odf.api.metadata.MetaDataObjectReference;
+import org.apache.atlas.odf.api.connectivity.DataSetRetriever;
+import org.apache.atlas.odf.api.connectivity.DataSetRetrieverImpl;
+import org.apache.atlas.odf.api.connectivity.JDBCRetrievalResult;
+import org.apache.atlas.odf.api.discoveryservice.datasets.MaterializedDataSet;
+import org.apache.atlas.odf.api.ODFFactory;
+import org.apache.atlas.odf.core.integrationtest.metadata.importer.JDBCMetadataImporterTest;
+import org.apache.atlas.odf.core.test.ODFTestBase;
+import org.apache.atlas.odf.core.test.ODFTestLogger;
+
+public class DataSetRetrieverTest extends ODFTestBase {
+
+ static Logger logger = ODFTestLogger.get();
+
+ static MetadataStore createMetadataStore() throws Exception {
+ return new ODFFactory().create().getMetadataStore();
+ }
+
+ @BeforeClass
+ public static void setupImport() throws Exception {
+ MetadataStore mds = createMetadataStore();
+ // create sample data only if it has not been created yet
+ mds.createSampleData();
+ JDBCMetadataImporterTest.runTestImport(mds);
+ }
+
+ @Test
+ public void testDataSetRetrievalJDBC() throws Exception {
+ MetadataStore ams = createMetadataStore();
+ DataSetRetriever retriever = new DataSetRetrieverImpl(ams);
+ List<MetaDataObjectReference> refs = ams.search(ams.newQueryBuilder().objectType("Table").build());
+ Assert.assertTrue(refs.size() > 0);
+ int retrievedDataSets = 0;
+ for (MetaDataObjectReference ref : refs) {
+ Table table = (Table) ams.retrieve(ref);
+ logger.info("Retrieving table: " + table.getName() + ", " + table.getReference().getUrl());
+ if (retriever.canRetrieveDataSet(table)) {
+ retrievedDataSets++;
+ MaterializedDataSet mds = retriever.retrieveRelationalDataSet(table);
+ Assert.assertNotNull(mds);
+ Assert.assertEquals(table, mds.getTable());
+ int numberOfColumns = ams.getColumns(table).size();
+ Assert.assertEquals(numberOfColumns, mds.getColumns().size());
+ Assert.assertNotNull(mds.getData());
+ Assert.assertTrue(mds.getData().size() > 0);
+ for (List<Object> row : mds.getData()) {
+ Assert.assertEquals(row.size(),numberOfColumns);
+ }
+
+ // now test JDBC method
+ JDBCRetrievalResult jdbcResult = retriever.retrieveTableAsJDBCResultSet(table);
+ ResultSet rs = jdbcResult.getPreparedStatement().executeQuery();
+ Assert.assertEquals(mds.getColumns().size(), rs.getMetaData().getColumnCount());
+ int count = 0;
+ while (rs.next()) {
+ count++;
+ }
+ Assert.assertEquals(mds.getData().size(), count);
+
+ // only run one test
+ break;
+ }
+ }
+ Assert.assertEquals("Number of retrieved data sets does not meet the expected value. ", 1, retrievedDataSets);
+
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/6d19e129/odf/odf-core/src/test/java/org/apache/atlas/odf/core/integrationtest/metadata/MetadataStoreTestBase.java
----------------------------------------------------------------------
diff --git a/odf/odf-core/src/test/java/org/apache/atlas/odf/core/integrationtest/metadata/MetadataStoreTestBase.java b/odf/odf-core/src/test/java/org/apache/atlas/odf/core/integrationtest/metadata/MetadataStoreTestBase.java
new file mode 100755
index 0000000..47d3a3d
--- /dev/null
+++ b/odf/odf-core/src/test/java/org/apache/atlas/odf/core/integrationtest/metadata/MetadataStoreTestBase.java
@@ -0,0 +1,303 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.atlas.odf.core.integrationtest.metadata;
+
+import java.text.MessageFormat;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.UUID;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import org.apache.atlas.odf.core.metadata.WritableMetadataStore;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import org.apache.atlas.odf.api.ODFFactory;
+import org.apache.atlas.odf.api.metadata.DefaultMetadataQueryBuilder;
+import org.apache.atlas.odf.api.metadata.MetaDataObjectReference;
+import org.apache.atlas.odf.api.metadata.MetadataQueryBuilder;
+import org.apache.atlas.odf.api.metadata.models.Schema;
+import org.apache.atlas.odf.api.metadata.models.Table;
+import org.apache.atlas.odf.api.metadata.models.ProfilingAnnotation;
+import org.apache.atlas.odf.api.metadata.models.RelationshipAnnotation;
+import org.apache.atlas.odf.api.metadata.MetadataStore;
+import org.apache.atlas.odf.api.metadata.MetadataStoreException;
+import org.apache.atlas.odf.api.metadata.models.ClassificationAnnotation;
+import org.apache.atlas.odf.api.metadata.models.Column;
+import org.apache.atlas.odf.api.metadata.models.Connection;
+import org.apache.atlas.odf.api.metadata.models.DataFile;
+import org.apache.atlas.odf.api.metadata.models.DataFileFolder;
+import org.apache.atlas.odf.api.metadata.models.JDBCConnection;
+import org.apache.atlas.odf.api.metadata.models.JDBCConnectionInfo;
+import org.apache.atlas.odf.api.metadata.models.MetaDataObject;
+import org.apache.atlas.odf.api.metadata.models.Database;
+
+public abstract class MetadataStoreTestBase {
+ private Logger logger = Logger.getLogger(MetadataStoreTestBase.class.getName());
+ private static final String analysisRun = UUID.randomUUID().toString();
+
+ protected abstract MetadataStore getMetadataStore();
+
+ public static WritableMetadataStore getWritableMetadataStore() {
+ MetadataStore mds = new ODFFactory().create().getMetadataStore();
+ if (!(mds instanceof WritableMetadataStore)) {
+ String errorText = "The MetadataStore implementation ''{0}'' does not support the WritableMetadataStore interface.";
+ Assert.fail(MessageFormat.format(errorText , mds.getClass()));
+ return null;
+ }
+ return (WritableMetadataStore) mds;
+ }
+
+ public static void createAdditionalTestData(WritableMetadataStore mds) {
+ MetaDataObjectReference bankClientsShortRef = mds.search(mds.newQueryBuilder().objectType("DataFile").simpleCondition("name", MetadataQueryBuilder.COMPARATOR.EQUALS, "BankClientsShort").build()).get(0);
+
+ JDBCConnection connection = new JDBCConnection();
+ connection.setName("connection1");
+
+ Table table1 = new Table();
+ table1.setName("table1");
+ Table table2 = new Table();
+ table2.setName("table2");
+
+ Schema schema1 = new Schema();
+ schema1.setName("schema1");
+ MetaDataObjectReference schemaRef = mds.createObject(schema1);
+ mds.addTableReference(schema1, mds.createObject(table1));
+ mds.addTableReference(schema1, mds.createObject(table2));
+
+ Database dataStore = new Database();
+ dataStore.setName("database1");
+ mds.createObject(dataStore);
+ mds.addSchemaReference(dataStore, schemaRef);
+ mds.addConnectionReference(dataStore, mds.createObject(connection));
+
+ DataFile file1 = new DataFile();
+ file1.setName("file1");
+ DataFile file2 = new DataFile();
+ file2.setName("file2");
+
+ DataFileFolder nestedFolder = new DataFileFolder();
+ nestedFolder.setName("nestedFolder");
+ MetaDataObjectReference nestedFolderRef = mds.createObject(nestedFolder);
+ mds.addDataFileReference(nestedFolder, mds.createObject(file1));
+ mds.addDataFileReference(nestedFolder, mds.createObject(file2));
+
+ DataFileFolder rootFolder = new DataFileFolder();
+ rootFolder.setName("rootFolder");
+ mds.createObject(rootFolder);
+ mds.addDataFileFolderReference(rootFolder, nestedFolderRef);
+
+ ProfilingAnnotation pa = new ProfilingAnnotation();
+ pa.setName("A profiling annotation");
+ pa.setProfiledObject(bankClientsShortRef);
+ pa.setAnalysisRun(analysisRun);
+ mds.createObject(pa);
+
+ ClassificationAnnotation ca = new ClassificationAnnotation();
+ ca.setName("A classification annotation");
+ ca.setClassifiedObject(bankClientsShortRef);
+ ca.setAnalysisRun(analysisRun);
+ ca.setClassifyingObjects(Collections.singletonList(bankClientsShortRef));
+ mds.createObject(ca);
+
+ RelationshipAnnotation ra = new RelationshipAnnotation();
+ ra.setName("A relationship annotation");
+ ra.setRelatedObjects(Collections.singletonList(bankClientsShortRef));
+ ra.setAnalysisRun(analysisRun);
+ mds.createObject(ra);
+
+ mds.commit();
+ }
+
+ @Before
+ public void createSampleData() {
+ WritableMetadataStore mds = getWritableMetadataStore();
+ mds.resetAllData();
+ mds.createSampleData();
+ createAdditionalTestData(mds);
+ }
+
+ public static void checkQueryResults(MetadataStore mds, String[] expectedObjectNames, String searchTerm, boolean isSubset) {
+ HashSet<String> expectedResults = new HashSet<String>(Arrays.asList(expectedObjectNames));
+ List<MetaDataObjectReference> searchResult = mds.search(searchTerm);
+ Set<String> foundResults = new HashSet<>();
+ for (MetaDataObjectReference ref : searchResult) {
+ foundResults.add(mds.retrieve(ref).getName());
+ }
+ if (isSubset) {
+ String messageText = "Metadata search term ''{0}'' did not return expected subset of objects. Expected ''{1}'' but received ''{2}''.";
+ Assert.assertTrue(MessageFormat.format(messageText, new Object[] {searchTerm, expectedResults, foundResults}), foundResults.containsAll(expectedResults));
+ } else {
+ String messageText = "Metadata search term ''{0}'' did not return expected results. Expected ''{1}'' but received ''{2}''.";
+ Assert.assertTrue(MessageFormat.format(messageText, new Object[] {searchTerm, expectedResults, foundResults}), foundResults.equals(expectedResults));
+ }
+ }
+
+ public static void checkReferencedObjects(String[] expectedObjectNames, List<? extends MetaDataObject> referencedObjects, boolean isSubset) {
+ HashSet<String> expectedResults = new HashSet<String>(Arrays.asList(expectedObjectNames));
+ Set<String> actualNames = new HashSet<>();
+ for (MetaDataObject obj : referencedObjects) {
+ actualNames.add(obj.getName());
+ }
+ if (isSubset) {
+ String messageText = "Actual object names ''{0}'' are not a subset of expected names ''{1}''.";
+ Assert.assertTrue(MessageFormat.format(messageText, new Object[] { actualNames, expectedResults }), actualNames.containsAll(expectedResults));
+ } else {
+ String messageText = "Actual object names ''{0}'' do not match expected names ''{1}''.";
+ Assert.assertTrue(MessageFormat.format(messageText, new Object[] { actualNames, expectedResults }), actualNames.equals(expectedResults));
+ }
+ }
+
+ void checkFailingQuery(MetadataStore mds, String searchTerm) {
+ try {
+ logger.log(Level.INFO, "Checking incorrect query \"{0}\"", searchTerm);
+ List<MetaDataObjectReference> searchResult = mds.search(searchTerm);
+ if (searchResult != null) {
+ // Search must return null or throw exception
+ Assert.fail(MessageFormat.format("Incorrect query \"{0}\" did not throw the expected exception.", searchTerm));
+ }
+ } catch (MetadataStoreException e) {
+ logger.log(Level.INFO, "Catching expected exception.", e);
+ }
+ }
+
+ @Test
+ public void testSearchAndRetrieve() {
+ MetadataStore mds = getMetadataStore();
+ MetaDataObjectReference bankClientsShortRef = mds.search(mds.newQueryBuilder().objectType("DataFile").simpleCondition("name", MetadataQueryBuilder.COMPARATOR.EQUALS, "BankClientsShort").build()).get(0);
+ Assert.assertEquals("The metadata store did not retrieve the object with the expected name.", "BankClientsShort", mds.retrieve(bankClientsShortRef).getName());
+
+ // Test queries with conditions
+ checkQueryResults(mds, new String[] { "BankClientsShort" }, mds.newQueryBuilder().objectType("DataFile").simpleCondition("name", MetadataQueryBuilder.COMPARATOR.EQUALS, "BankClientsShort").build(), false);
+ checkQueryResults(mds, new String[] { "SimpleExampleTable", "file2", "file1"}, mds.newQueryBuilder().objectType("DataFile").simpleCondition("name", MetadataQueryBuilder.COMPARATOR.NOT_EQUALS, "BankClientsShort").build(), false);
+ checkQueryResults(mds, new String[] { "NAME" },
+ mds.newQueryBuilder().objectType("Column").simpleCondition("name", MetadataQueryBuilder.COMPARATOR.EQUALS, "NAME").simpleCondition("dataType", MetadataQueryBuilder.COMPARATOR.EQUALS, "string").build(), false);
+
+ // Test type hierarchy
+ checkQueryResults(mds, new String[] { "BankClientsShort", "SimpleExampleTable" }, mds.newQueryBuilder().objectType("DataFile").build(), true);
+ checkQueryResults(mds, new String[] { "BankClientsShort", "SimpleExampleTable" }, mds.newQueryBuilder().objectType("RelationalDataSet").build(), true);
+ checkQueryResults(mds, new String[] { "BankClientsShort", "SimpleExampleTable", "Simple URL example document", "Simple local example document", "table1", "table2", "file2", "file1" }, mds.newQueryBuilder().objectType("DataSet").build(), false);
+ checkQueryResults(mds, new String[] { "BankClientsShort" }, mds.newQueryBuilder().objectType("MetaDataObject").simpleCondition("name", MetadataQueryBuilder.COMPARATOR.EQUALS, "BankClientsShort").build(), false);
+ }
+
+ public static Database getDatabaseTestObject(MetadataStore mds) {
+ String dataStoreQuery = mds.newQueryBuilder().objectType("DataStore").build();
+ MetadataStoreTestBase.checkQueryResults(mds, new String[] { "database1"}, dataStoreQuery, false);
+ return (Database) mds.retrieve(mds.search(dataStoreQuery).get(0));
+ }
+
+ public static Table getTableTestObject(MetadataStore mds) {
+ String tableQuery = mds.newQueryBuilder().objectType("Table").simpleCondition("name", MetadataQueryBuilder.COMPARATOR.EQUALS, "table1").build();
+ MetadataStoreTestBase.checkQueryResults(mds, new String[] { "table1"}, tableQuery, false);
+ return (Table) mds.retrieve(mds.search(tableQuery).get(0));
+ }
+
+ public static DataFile getDataFileTestObject(MetadataStore mds) {
+ String dataFileQuery = mds.newQueryBuilder().objectType("DataFile").simpleCondition("name", MetadataQueryBuilder.COMPARATOR.EQUALS, "SimpleExampleTable").build();
+ MetadataStoreTestBase.checkQueryResults(mds, new String[] { "SimpleExampleTable"}, dataFileQuery, false);
+ return (DataFile) mds.retrieve(mds.search(dataFileQuery).get(0));
+ }
+
+ public static DataFileFolder getDataFileFolderTestObject(MetadataStore mds) {
+ String folderQuery = mds.newQueryBuilder().objectType("DataFileFolder").simpleCondition("name", MetadataQueryBuilder.COMPARATOR.EQUALS, "rootFolder").build();
+ MetadataStoreTestBase.checkQueryResults(mds, new String[] { "rootFolder"}, folderQuery, false);
+ return (DataFileFolder) mds.retrieve(mds.search(folderQuery).get(0));
+ }
+
+ public static void checkReferences(MetadataStore mds, Database database) throws Exception {
+ List<Schema> schemaList = mds.getSchemas(database);
+ MetadataStoreTestBase.checkReferencedObjects(new String[] { "schema1" }, schemaList, false);
+ List<Table> tableList = mds.getTables(schemaList.get(0));
+ MetadataStoreTestBase.checkReferencedObjects(new String[] { "table1", "table2" }, tableList, false);
+ List<Connection> connectionList = mds.getConnections(database);
+ MetadataStoreTestBase.checkReferencedObjects(new String[] { "connection1" }, connectionList, false);
+ }
+
+ public static void checkReferences(MetadataStore mds, Table table) throws Exception {
+ JDBCConnectionInfo connectionInfo = (JDBCConnectionInfo) mds.getConnectionInfo(table);
+ Assert.assertTrue("Connection is not set in connection info.", connectionInfo.getConnections().size() > 0);
+ Assert.assertEquals("Connection does not match expected name.", "connection1", connectionInfo.getConnections().get(0).getName());
+ Assert.assertEquals("Schema name of connection info does not match expected value.", "schema1", connectionInfo.getSchemaName());
+ }
+
+ public static void checkReferences(MetadataStore mds, DataFileFolder folder) throws Exception {
+ List<DataFileFolder> nestedFolderList = mds.getDataFileFolders(folder);
+ MetadataStoreTestBase.checkReferencedObjects(new String[] { "nestedFolder" }, nestedFolderList, false);
+ List<DataFile> fileList = mds.getDataFiles(nestedFolderList.get(0));
+ MetadataStoreTestBase.checkReferencedObjects(new String[] { "file1", "file2" }, fileList, false);
+ }
+
+ public static void checkReferences(MetadataStore mds, DataFile file) throws Exception {
+ List<Column> columnList = mds.getColumns(file);
+ MetadataStoreTestBase.checkReferencedObjects(new String[] { "ColumnName1", "ColumnName2" }, columnList, false);
+ MetadataStoreTestBase.checkReferencedObjects(new String[] { "SimpleExampleTable" }, Collections.singletonList(mds.getParent(columnList.get(0))), false);
+ MetadataStoreTestBase.checkReferencedObjects(new String[] { "ColumnName1", "ColumnName2" }, mds.getChildren(file), false);
+ }
+
+ @Test
+ public void testReferences() throws Exception {
+ MetadataStore mds = getMetadataStore();
+ checkReferences(mds, getDatabaseTestObject(mds));
+ checkReferences(mds, getTableTestObject(mds));
+ checkReferences(mds, getDataFileFolderTestObject(mds));
+ checkReferences(mds, getDataFileTestObject(mds));
+ }
+
+ @Test
+ public void testErrorHandling() {
+ MetadataStore mds = getMetadataStore();
+ MetaDataObjectReference nonExistentRef = new MetaDataObjectReference();
+ nonExistentRef.setId("non-existing-reference-id");
+ nonExistentRef.setRepositoryId(mds.getRepositoryId());
+
+ Assert.assertEquals("A null value was expected when retrieving a non-existend object.", null, mds.retrieve(nonExistentRef));
+ String errorText = "Metadata search should have returned an empty result set.";
+ Assert.assertEquals(errorText, mds.search(mds.newQueryBuilder().objectType("nonExistentType").build()), new ArrayList<MetaDataObjectReference>());
+ Assert.assertEquals(errorText, mds.search(mds.newQueryBuilder().objectType("DataFile").simpleCondition("name", MetadataQueryBuilder.COMPARATOR.EQUALS, "nonExistentName").build()), new ArrayList<MetaDataObjectReference>());
+
+ if (!mds.getProperties().get(MetadataStore.STORE_PROPERTY_TYPE).equals("atlas")) {
+ // Skip this test because Atlas accepts this query as text search
+ checkFailingQuery(mds, "justAsSingleToken");
+ // Skip this test of Atlas because it does not return an error
+ String validQueryWithCondition = mds.newQueryBuilder().objectType("DataFile").simpleCondition("name", MetadataQueryBuilder.COMPARATOR.EQUALS, "BankClientsShort").build();
+ checkFailingQuery(mds, validQueryWithCondition + DefaultMetadataQueryBuilder.SEPARATOR_STRING + "additionalTrailingToken");
+ String validDataSetQuery = mds.newQueryBuilder().objectType("DataFile").build();
+ checkFailingQuery(mds, validDataSetQuery + DefaultMetadataQueryBuilder.SEPARATOR_STRING + "additionalTrailingToken");
+ }
+ }
+
+ @Test
+ public void testAnnotations() {
+ MetadataStore mds = getMetadataStore();
+
+ String annotationQueryString = mds.newQueryBuilder().objectType("Annotation").build();
+ checkQueryResults(mds, new String[] { "A profiling annotation", "A classification annotation", "A relationship annotation" }, annotationQueryString, false);
+ String analysisRunQuery = mds.newQueryBuilder().objectType("Annotation").simpleCondition("analysisRun", MetadataQueryBuilder.COMPARATOR.EQUALS, analysisRun).build();
+ checkQueryResults(mds, new String[] { "A profiling annotation", "A classification annotation", "A relationship annotation" }, analysisRunQuery, false);
+ }
+
+ @Test
+ public void testResetAllData() {
+ MetadataStore mds = getMetadataStore();
+ mds.resetAllData();
+ String emptyResultSet = mds.newQueryBuilder().objectType("MetaDataObject").build();
+ checkQueryResults(mds, new String[] {}, emptyResultSet, false);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/6d19e129/odf/odf-core/src/test/java/org/apache/atlas/odf/core/integrationtest/metadata/WritableMetadataStoreTest.java
----------------------------------------------------------------------
diff --git a/odf/odf-core/src/test/java/org/apache/atlas/odf/core/integrationtest/metadata/WritableMetadataStoreTest.java b/odf/odf-core/src/test/java/org/apache/atlas/odf/core/integrationtest/metadata/WritableMetadataStoreTest.java
new file mode 100755
index 0000000..5012ab3
--- /dev/null
+++ b/odf/odf-core/src/test/java/org/apache/atlas/odf/core/integrationtest/metadata/WritableMetadataStoreTest.java
@@ -0,0 +1,24 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.atlas.odf.core.integrationtest.metadata;
+
+import org.apache.atlas.odf.api.ODFFactory;
+import org.apache.atlas.odf.api.metadata.MetadataStore;
+
+public class WritableMetadataStoreTest extends MetadataStoreTestBase{
+
+ protected MetadataStore getMetadataStore() {
+ return new ODFFactory().create().getMetadataStore();
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/6d19e129/odf/odf-core/src/test/java/org/apache/atlas/odf/core/integrationtest/metadata/importer/JDBCMetadataImporterTest.java
----------------------------------------------------------------------
diff --git a/odf/odf-core/src/test/java/org/apache/atlas/odf/core/integrationtest/metadata/importer/JDBCMetadataImporterTest.java b/odf/odf-core/src/test/java/org/apache/atlas/odf/core/integrationtest/metadata/importer/JDBCMetadataImporterTest.java
new file mode 100755
index 0000000..1f00a94
--- /dev/null
+++ b/odf/odf-core/src/test/java/org/apache/atlas/odf/core/integrationtest/metadata/importer/JDBCMetadataImporterTest.java
@@ -0,0 +1,214 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.atlas.odf.core.integrationtest.metadata.importer;
+
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.text.MessageFormat;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import org.apache.atlas.odf.api.ODFFactory;
+import org.apache.atlas.odf.api.metadata.MetaDataObjectReference;
+import org.apache.atlas.odf.api.metadata.MetadataQueryBuilder;
+import org.apache.atlas.odf.api.metadata.MetadataStore;
+import org.apache.atlas.odf.api.metadata.importer.JDBCMetadataImportResult;
+import org.apache.atlas.odf.api.metadata.importer.JDBCMetadataImporter;
+import org.apache.atlas.odf.core.ODFInternalFactory;
+import org.apache.atlas.odf.core.test.ODFTestBase;
+import org.apache.atlas.odf.json.JSONUtils;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import org.apache.atlas.odf.api.metadata.models.JDBCConnection;
+import org.apache.atlas.odf.api.metadata.models.Schema;
+import org.apache.atlas.odf.api.metadata.models.Column;
+import org.apache.atlas.odf.api.metadata.models.Database;
+import org.apache.atlas.odf.api.metadata.models.Table;
+
+public class JDBCMetadataImporterTest extends ODFTestBase {
+ static Logger logger = Logger.getLogger(JDBCMetadataImporterTest.class.getName());
+
+ static boolean testDBRan = false;
+ public static final String SOURCE_DB1 = "DBSAMPLE1";
+ public static final String SOURCE_DB2 = "DBSAMPLE2";
+ public static final String DATABASE1_NAME = SOURCE_DB1;
+ public static final String DATABASE2_NAME =SOURCE_DB2;
+ public static final String SCHEMA1_NAME = "APP1";
+ public static final String SCHEMA2_NAME = "APP2";
+ public static final String TABLE1_NAME = "EMPLOYEE" + System.currentTimeMillis();
+ public static final String TABLE2_NAME = "EMPLOYEE_SHORT" + System.currentTimeMillis();
+
+ @BeforeClass
+ public static void populateTestDB() throws Exception {
+ if (testDBRan) {
+ return;
+ }
+ createTestTables(SOURCE_DB1, SCHEMA1_NAME, TABLE1_NAME, TABLE2_NAME);
+ createTestTables(SOURCE_DB1, SCHEMA2_NAME, TABLE1_NAME, TABLE2_NAME);
+ // Switch table names so that the table named TABLE2_NAME has more columns in the SOURCE_DB2 than it has in SOURCE_DB1
+ createTestTables(SOURCE_DB2, SCHEMA1_NAME, TABLE2_NAME, TABLE1_NAME);
+ testDBRan = true;
+ }
+
+ private static String getConnectionUrl(String dbName) {
+ String dbDir = "/tmp/odf-derby/" + dbName;
+ String connectionURL = "jdbc:derby:" + dbDir + ";create=true";
+ return connectionURL;
+ }
+
+ private static void createTestTables(String dbName, String schemaName, String tableName1, String tableName2) throws Exception {
+ Connection conn = DriverManager.getConnection(getConnectionUrl(dbName));
+
+ String[] stats = new String[] {
+ "CREATE TABLE " + schemaName + "." + tableName1 + " (\r\n" + //
+ " EMPNO CHAR(6) NOT NULL,\r\n" + //
+ " FIRSTNME VARCHAR(12) NOT NULL,\r\n" + //
+ " MIDINIT CHAR(1),\r\n" + //
+ " LASTNAME VARCHAR(15) NOT NULL,\r\n" + //
+ " WORKDEPT CHAR(3),\r\n" + //
+ " PHONENO CHAR(4),\r\n" + //
+ " HIREDATE DATE,\r\n" + //
+ " JOB CHAR(8),\r\n" + //
+ " EDLEVEL SMALLINT NOT NULL,\r\n" + //
+ " SEX CHAR(1),\r\n" + //
+ " BIRTHDATE DATE,\r\n" + //
+ " SALARY DECIMAL(9 , 2),\r\n" + //
+ " BONUS DECIMAL(9 , 2),\r\n" + //
+ " COMM DECIMAL(9 , 2)\r\n" + //
+ " )",
+ "INSERT INTO " + schemaName + "." + tableName1 + " VALUES ('000010','CHRISTINE','I','HAAS','A00','3978','1995-01-01','PRES ',18,'F','1963-08-24',152750.00,1000.00,4220.00)",
+ "INSERT INTO " + schemaName + "." + tableName1 + " VALUES ('000020','MICHAEL','L','THOMPSON','B01','3476','2003-10-10','MANAGER ',18,'M','1978-02-02',94250.00,800.00,3300.00)",
+ // Note that the 2nd table has a subset of the columns of the first table
+ "CREATE TABLE " + schemaName + "." + tableName2 + " (\r\n" + //
+ " EMPNO CHAR(6) NOT NULL,\r\n" + //
+ " FIRSTNME VARCHAR(12) NOT NULL,\r\n" + //
+ " MIDINIT CHAR(1),\r\n" + //
+ " LASTNAME VARCHAR(15) NOT NULL\r\n" + //
+ " )",
+ "INSERT INTO " + schemaName + "." + tableName2 + " VALUES ('000010','CHRISTINE','I','HAAS')",
+ "INSERT INTO " + schemaName + "." + tableName2 + " VALUES ('000020','MICHAEL','L','THOMPSON')"
+ };
+
+ for (String stat : stats) {
+ boolean result = conn.createStatement().execute(stat);
+ logger.info("Result of statement: " + result);
+ }
+ }
+
+ private static void runTestImport(MetadataStore mds, String connectionDbName, String importDbName, String schemaName, String tableName) throws Exception {
+ populateTestDB();
+ JDBCMetadataImporter importer = new ODFInternalFactory().create(JDBCMetadataImporter.class);
+ JDBCConnection conn = new JDBCConnection();
+ conn.setJdbcConnectionString(getConnectionUrl(connectionDbName));
+ conn.setUser("dummyUser");
+ conn.setPassword("dummyPassword");
+ JDBCMetadataImportResult importResult = importer.importTables(conn, importDbName, schemaName, tableName);
+ Assert.assertTrue("JDBCMetadataImportResult does not refer to imported database.", importResult.getDatabaseName().equals(importDbName));
+ Assert.assertTrue("JDBCMetadataImportResult does not refer to imported table.", importResult.getTableNames().contains(schemaName + "." + tableName));
+ }
+
+ public static void runTestImport(MetadataStore mds) throws Exception {
+ runTestImport(mds, SOURCE_DB1, DATABASE1_NAME, SCHEMA1_NAME, TABLE1_NAME);
+ }
+
+ @Test
+ public void testSimpleImport() throws Exception {
+ MetadataStore ams = new ODFFactory().create().getMetadataStore();
+ ams.resetAllData();
+
+ List<String> expectedDatabases = new ArrayList<String>();
+ HashMap<String, List<String>> expectedSchemasForDatabase = new HashMap<String, List<String>>();
+ HashMap<String, List<String>> expectedTablesForSchema = new HashMap<String, List<String>>();
+ HashMap<String, List<String>> expectedColumnsForTable = new HashMap<String, List<String>>();
+
+ runTestImport(ams, SOURCE_DB1, DATABASE1_NAME, SCHEMA1_NAME, TABLE1_NAME);
+
+ expectedDatabases.add(DATABASE1_NAME);
+ expectedSchemasForDatabase.put(DATABASE1_NAME, new ArrayList<String>());
+ expectedSchemasForDatabase.get(DATABASE1_NAME).add(SCHEMA1_NAME);
+ expectedTablesForSchema.put(SCHEMA1_NAME, new ArrayList<String>());
+ expectedTablesForSchema.get(SCHEMA1_NAME).add(TABLE1_NAME);
+ expectedColumnsForTable.put(TABLE1_NAME, new ArrayList<String>());
+ expectedColumnsForTable.get(TABLE1_NAME).addAll(Arrays.asList(new String[] { "EMPNO", "FIRSTNME", "MIDINIT", "LASTNAME",
+ "WORKDEPT", "PHONENO", "HIREDATE", "JOB", "EDLEVEL", "SEX", "BIRTHDATE", "SALARY", "BONUS", "COMM" }));
+ validateImportedObjects(ams, expectedDatabases, expectedSchemasForDatabase, expectedTablesForSchema, expectedColumnsForTable);
+
+ // Add another table to an existing schema in an existing database
+ runTestImport(ams, SOURCE_DB1, DATABASE1_NAME, SCHEMA1_NAME, TABLE2_NAME);
+
+ expectedTablesForSchema.get(SCHEMA1_NAME).add(TABLE2_NAME);
+ expectedColumnsForTable.put(TABLE2_NAME, new ArrayList<String>());
+ expectedColumnsForTable.get(TABLE2_NAME).addAll(Arrays.asList(new String[] { "EMPNO", "FIRSTNME", "MIDINIT", "LASTNAME" }));
+ validateImportedObjects(ams, expectedDatabases, expectedSchemasForDatabase, expectedTablesForSchema, expectedColumnsForTable);
+
+ // Add another schema and table to an existing database
+ runTestImport(ams, SOURCE_DB1, DATABASE1_NAME, SCHEMA2_NAME, TABLE1_NAME);
+
+ expectedSchemasForDatabase.get(DATABASE1_NAME).add(SCHEMA2_NAME);
+ expectedTablesForSchema.put(SCHEMA2_NAME, new ArrayList<String>());
+ expectedTablesForSchema.get(SCHEMA2_NAME).add(TABLE1_NAME);
+ validateImportedObjects(ams, expectedDatabases, expectedSchemasForDatabase, expectedTablesForSchema, expectedColumnsForTable);
+
+ // Import TABLE2_NAME again from SOURCE_DB2 where it has more columns than in SOURCE_DB1
+ runTestImport(ams, SOURCE_DB2, DATABASE1_NAME, SCHEMA1_NAME, TABLE2_NAME);
+
+ // validate that additional columns have been added to the existing table object TABLE2_NAME.
+ expectedColumnsForTable.get(TABLE2_NAME).addAll(Arrays.asList(new String[] { "WORKDEPT", "PHONENO", "HIREDATE", "JOB", "EDLEVEL", "SEX", "BIRTHDATE", "SALARY", "BONUS", "COMM" }));
+ validateImportedObjects(ams, expectedDatabases, expectedSchemasForDatabase, expectedTablesForSchema, expectedColumnsForTable);
+ }
+
+ private void validateImportedObjects(MetadataStore mds, List<String> expectedDatabases, HashMap<String, List<String>> expectedSchemasForDatabase, HashMap<String,
+ List<String>> expectedTablesForSchema, HashMap<String, List<String>> expectedColumnsForTable) throws Exception{
+ for (String dbName : expectedDatabases) {
+ String query = mds.newQueryBuilder().objectType("Database").simpleCondition("name", MetadataQueryBuilder.COMPARATOR.EQUALS, dbName).build();
+ List<MetaDataObjectReference> dbs = mds.search(query);
+ Assert.assertEquals("Number of databases does not match expected value.", 1, dbs.size());
+ Database database = (Database) mds.retrieve(dbs.get(0));
+ logger.log(Level.INFO, MessageFormat.format("Reference ''{0}''.", JSONUtils.toJSON(database)));
+ int numberOfMatchingConnections = 0;
+ for (org.apache.atlas.odf.api.metadata.models.Connection con : mds.getConnections(database)) {
+ if (getConnectionUrl(database.getName()).equals(((JDBCConnection) mds.retrieve(con.getReference())).getJdbcConnectionString())) {
+ numberOfMatchingConnections++;
+ }
+ }
+ Assert.assertEquals("Number of matching JDBC connections does not match expected value.", 1, numberOfMatchingConnections);
+ List<String> actualSchemaNames = new ArrayList<String>();
+ for (Schema schema : mds.getSchemas(database)) {
+ actualSchemaNames.add(schema.getName());
+
+ List<String> actualTableNames = new ArrayList<String>();
+ for (Table table : mds.getTables(schema)) {
+ actualTableNames.add(table.getName());
+
+ List<String> actualColumnNames = new ArrayList<String>();
+ for (Column column : mds.getColumns(table)) {
+ actualColumnNames.add(column.getName());
+ }
+ Assert.assertTrue("Expected columns are missing from metadata store.", actualColumnNames.containsAll(expectedColumnsForTable.get(table.getName())));
+ Assert.assertTrue("Importer has not imported all expected columns.", expectedColumnsForTable.get(table.getName()).containsAll(actualColumnNames));
+ }
+ Assert.assertTrue("Expected tables are missing from metadata store.", actualTableNames.containsAll(expectedTablesForSchema.get(schema.getName())));
+ Assert.assertTrue("Importer has not imported all expected tables.", expectedTablesForSchema.get(schema.getName()).containsAll(actualTableNames));
+ }
+ Assert.assertTrue("Expected schemas are missing from metadata store.", actualSchemaNames.containsAll(expectedSchemasForDatabase.get(database.getName())));
+ Assert.assertTrue("Importer has not imported all expected schemas.", expectedSchemasForDatabase.get(database.getName()).containsAll(actualSchemaNames));
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/6d19e129/odf/odf-core/src/test/java/org/apache/atlas/odf/core/integrationtest/metadata/internal/spark/SparkDiscoveryServiceLocalTest.java
----------------------------------------------------------------------
diff --git a/odf/odf-core/src/test/java/org/apache/atlas/odf/core/integrationtest/metadata/internal/spark/SparkDiscoveryServiceLocalTest.java b/odf/odf-core/src/test/java/org/apache/atlas/odf/core/integrationtest/metadata/internal/spark/SparkDiscoveryServiceLocalTest.java
new file mode 100755
index 0000000..ec0aa9a
--- /dev/null
+++ b/odf/odf-core/src/test/java/org/apache/atlas/odf/core/integrationtest/metadata/internal/spark/SparkDiscoveryServiceLocalTest.java
@@ -0,0 +1,243 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.atlas.odf.core.integrationtest.metadata.internal.spark;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import org.apache.atlas.odf.api.analysis.AnalysisRequest;
+import org.apache.atlas.odf.api.analysis.AnalysisRequestStatus;
+import org.apache.atlas.odf.api.analysis.AnalysisResponse;
+import org.apache.atlas.odf.api.discoveryservice.DiscoveryServiceManager;
+import org.apache.atlas.odf.api.metadata.MetadataQueryBuilder;
+import org.apache.atlas.odf.api.metadata.MetadataStore;
+import org.apache.atlas.odf.api.settings.ODFSettings;
+import org.apache.atlas.odf.api.settings.SettingsManager;
+import org.apache.atlas.odf.json.JSONUtils;
+import org.apache.wink.json4j.JSONException;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import org.apache.atlas.odf.api.metadata.MetaDataObjectReference;
+import org.apache.atlas.odf.api.metadata.models.Annotation;
+import org.apache.atlas.odf.api.metadata.models.DataFile;
+import org.apache.atlas.odf.api.metadata.models.DataSet;
+import org.apache.atlas.odf.api.metadata.models.RelationalDataSet;
+import org.apache.atlas.odf.api.metadata.models.Table;
+import org.apache.atlas.odf.api.annotation.AnnotationStore;
+import org.apache.atlas.odf.api.ODFFactory;
+import org.apache.atlas.odf.api.analysis.AnalysisManager;
+import org.apache.atlas.odf.api.discoveryservice.DiscoveryServiceEndpoint;
+import org.apache.atlas.odf.api.discoveryservice.DiscoveryServiceProperties;
+import org.apache.atlas.odf.api.discoveryservice.DiscoveryServiceSparkEndpoint;
+import org.apache.atlas.odf.api.discoveryservice.ServiceNotFoundException;
+import org.apache.atlas.odf.api.discoveryservice.DiscoveryServiceSparkEndpoint.SERVICE_INTERFACE_TYPE;
+import org.apache.atlas.odf.api.settings.SparkConfig;
+import org.apache.atlas.odf.core.test.ODFTestBase;
+
+public class SparkDiscoveryServiceLocalTest extends ODFTestBase {
+ protected static Logger logger = Logger.getLogger(SparkDiscoveryServiceLocalTest.class.getName());
+ public static int WAIT_MS_BETWEEN_POLLING = 2000;
+ public static int MAX_NUMBER_OF_POLLS = 400;
+ public static String DISCOVERY_SERVICE_ID = "spark-summary-statistics-example-service";
+ public static String DASHDB_DB = "BLUDB";
+ public static String DASHDB_SCHEMA = "SAMPLES";
+ public static String DASHDB_TABLE = "CUST_RETENTION_LIFE_DURATION";
+ public static enum DATASET_TYPE {
+ FILE, TABLE
+ }
+
+ @BeforeClass
+ public static void createSampleData() throws Exception {
+ MetadataStore mds = new ODFFactory().create().getMetadataStore();
+ if (mds.search(mds.newQueryBuilder().objectType("DataFile").simpleCondition("name", MetadataQueryBuilder.COMPARATOR.EQUALS, "BankClientsShort").build()).size() == 0) {
+ mds.createSampleData();
+ }
+ }
+
+ public static SparkConfig getLocalSparkConfig() {
+ SparkConfig config = new SparkConfig();
+ config.setClusterMasterUrl("local");
+ return config;
+ }
+
+ public static DiscoveryServiceProperties getSparkSummaryStatisticsService() throws JSONException {
+ DiscoveryServiceProperties dsProperties = new DiscoveryServiceProperties();
+ dsProperties.setId(DISCOVERY_SERVICE_ID);
+ dsProperties.setName("Spark summary statistics service");
+ dsProperties.setDescription("Example discovery service calling summary statistics Spark application");
+ dsProperties.setCustomDescription("");
+ dsProperties.setIconUrl("spark.png");
+ dsProperties.setLink("http://www.spark.apache.org");
+ dsProperties.setPrerequisiteAnnotationTypes(null);
+ dsProperties.setResultingAnnotationTypes(null);
+ dsProperties.setSupportedObjectTypes(null);
+ dsProperties.setAssignedObjectTypes(null);
+ dsProperties.setAssignedObjectCandidates(null);
+ dsProperties.setParallelismCount(2);
+ DiscoveryServiceSparkEndpoint endpoint = new DiscoveryServiceSparkEndpoint();
+ endpoint.setJar("META-INF/spark/odf-spark-example-application-1.2.0-SNAPSHOT.jar");
+ endpoint.setClassName("org.apache.atlas.odf.core.spark.SummaryStatistics");
+ endpoint.setInputMethod(SERVICE_INTERFACE_TYPE.DataFrame);
+ dsProperties.setEndpoint(JSONUtils.convert(endpoint, DiscoveryServiceEndpoint.class));
+ return dsProperties;
+ }
+
+ public static DiscoveryServiceProperties getSparkDiscoveryServiceExample() throws JSONException {
+ DiscoveryServiceProperties dsProperties = new DiscoveryServiceProperties();
+ dsProperties.setId(DISCOVERY_SERVICE_ID);
+ dsProperties.setName("Spark summary statistics service");
+ dsProperties.setDescription("Example discovery service calling summary statistics Spark application");
+ dsProperties.setCustomDescription("");
+ dsProperties.setIconUrl("spark.png");
+ dsProperties.setLink("http://www.spark.apache.org");
+ dsProperties.setPrerequisiteAnnotationTypes(null);
+ dsProperties.setResultingAnnotationTypes(null);
+ dsProperties.setSupportedObjectTypes(null);
+ dsProperties.setAssignedObjectTypes(null);
+ dsProperties.setAssignedObjectCandidates(null);
+ dsProperties.setParallelismCount(2);
+ DiscoveryServiceSparkEndpoint endpoint = new DiscoveryServiceSparkEndpoint();
+ endpoint.setJar("META-INF/spark/odf-spark-example-application-1.2.0-SNAPSHOT.jar");
+ endpoint.setClassName("org.apache.atlas.odf.core.spark.SparkDiscoveryServiceExample");
+ endpoint.setInputMethod(SERVICE_INTERFACE_TYPE.Generic);
+ dsProperties.setEndpoint(JSONUtils.convert(endpoint, DiscoveryServiceEndpoint.class));
+ return dsProperties;
+ }
+
+ public static DataFile getTestDataFile(MetadataStore mds) {
+ DataFile dataSet = null;
+ List<MetaDataObjectReference> refs = mds.search(mds.newQueryBuilder().objectType("DataFile").build());
+ for (MetaDataObjectReference ref : refs) {
+ DataFile file = (DataFile) mds.retrieve(ref);
+ if (file.getName().equals("BankClientsShort")) {
+ dataSet = file;
+ break;
+ }
+ }
+ Assert.assertNotNull(dataSet);
+ logger.log(Level.INFO, "Testing Spark discovery service on metadata object {0} (ref: {1})", new Object[] { dataSet.getName(), dataSet.getReference() });
+ return dataSet;
+ }
+
+ public static Table getTestTable(MetadataStore mds) {
+ Table dataSet = null;
+ List<MetaDataObjectReference> refs = mds.search(mds.newQueryBuilder().objectType("Table").build());
+ for (MetaDataObjectReference ref : refs) {
+ Table table = (Table) mds.retrieve(ref);
+ if (table.getName().equals(DASHDB_TABLE)) {
+ dataSet = table;
+ break;
+ }
+ }
+ Assert.assertNotNull(dataSet);
+ logger.log(Level.INFO, "Testing Spark discovery service on metadata object {0} (ref: {1})", new Object[] { dataSet.getName(), dataSet.getReference() });
+ return dataSet;
+ }
+
+ public static AnalysisRequest getSparkAnalysisRequest(DataSet dataSet) {
+ AnalysisRequest request = new AnalysisRequest();
+ List<MetaDataObjectReference> dataSetRefs = new ArrayList<>();
+ dataSetRefs.add(dataSet.getReference());
+ request.setDataSets(dataSetRefs);
+ List<String> serviceIds = Arrays.asList(new String[]{DISCOVERY_SERVICE_ID});
+ request.setDiscoveryServiceSequence(serviceIds);
+ return request;
+ }
+
+ public void runSparkServiceTest(SparkConfig sparkConfig, DATASET_TYPE dataSetType, DiscoveryServiceProperties regInfo, String[] annotationNames) throws Exception{
+ logger.info("Using Spark configuration: " + JSONUtils.toJSON(sparkConfig));
+ SettingsManager config = new ODFFactory().create().getSettingsManager();
+ ODFSettings settings = config.getODFSettings();
+ settings.setSparkConfig(sparkConfig);
+ config.updateODFSettings(settings);
+
+ logger.info("Using discovery service: " + JSONUtils.toJSON(regInfo));
+ DiscoveryServiceManager discoveryServicesManager = new ODFFactory().create().getDiscoveryServiceManager();
+ AnalysisManager analysisManager = new ODFFactory().create().getAnalysisManager();
+
+ try {
+ discoveryServicesManager.deleteDiscoveryService(DISCOVERY_SERVICE_ID);
+ } catch(ServiceNotFoundException e) {
+ // Ignore exception because service may not exist
+ }
+ discoveryServicesManager.createDiscoveryService(regInfo);
+
+ MetadataStore mds = new ODFFactory().create().getMetadataStore();
+ Assert.assertNotNull(mds);
+ AnnotationStore as = new ODFFactory().create().getAnnotationStore();
+ Assert.assertNotNull(as);
+
+ RelationalDataSet dataSet = null;
+ if (dataSetType == DATASET_TYPE.FILE) {
+ dataSet = getTestDataFile(mds);
+ } else if (dataSetType == DATASET_TYPE.TABLE) {
+ dataSet = getTestTable(mds);
+ } else {
+ Assert.fail();
+ }
+
+ logger.info("Using dataset: " + JSONUtils.toJSON(dataSet));
+
+ AnalysisRequest request = getSparkAnalysisRequest(dataSet);
+ logger.info("Using analysis request: " + JSONUtils.toJSON(request));
+
+ logger.info("Starting analysis...");
+ AnalysisResponse response = analysisManager.runAnalysis(request);
+ Assert.assertNotNull(response);
+ String requestId = response.getId();
+ Assert.assertNotNull(requestId);
+ logger.info("Request id is " + requestId + ".");
+
+ logger.info("Waiting for request to finish");
+ AnalysisRequestStatus status = null;
+ int maxPolls = MAX_NUMBER_OF_POLLS;
+ do {
+ status = analysisManager.getAnalysisRequestStatus(requestId);
+ logger.log(Level.INFO, "Poll request for request ID ''{0}'', state: ''{1}'', details: ''{2}''", new Object[] { requestId, status.getState(), status.getDetails() });
+ maxPolls--;
+ try {
+ Thread.sleep(WAIT_MS_BETWEEN_POLLING);
+ } catch (InterruptedException e) {
+ logger.log(Level.INFO, "Exception thrown: ", e);
+ }
+ } while (maxPolls > 0 && (status.getState() == AnalysisRequestStatus.State.ACTIVE || status.getState() == AnalysisRequestStatus.State.QUEUED || status.getState() == AnalysisRequestStatus.State.NOT_FOUND));
+ if (maxPolls == 0) {
+ logger.log(Level.INFO, "Request ''{0}'' is not finished yet, don't wait for it", requestId);
+ }
+ Assert.assertEquals(AnalysisRequestStatus.State.FINISHED, status.getState());
+
+ List<Annotation> annots = as.getAnnotations(null, status.getRequest().getId());
+ logger.info("Number of annotations created: " + annots.size());
+ Assert.assertTrue("No annotations have been created.", annots.size() > 0);
+
+ logger.log(Level.INFO, "Request ''{0}'' is finished.", requestId);
+
+ discoveryServicesManager.deleteDiscoveryService(DISCOVERY_SERVICE_ID);
+ }
+
+ @Test
+ public void testLocalSparkClusterWithLocalDataFile() throws Exception{
+ runSparkServiceTest(getLocalSparkConfig(), DATASET_TYPE.FILE, getSparkSummaryStatisticsService(), new String[] { "SparkSummaryStatisticsAnnotation", "SparkTableAnnotation" });
+ }
+
+ @Test
+ public void testLocalSparkClusterWithLocalDataFileAndDiscoveryServiceRequest() throws Exception{
+ runSparkServiceTest(getLocalSparkConfig(), DATASET_TYPE.FILE, getSparkDiscoveryServiceExample(), new String[] { "SparkSummaryStatisticsAnnotation", "SparkTableAnnotation" });
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/6d19e129/odf/odf-core/src/test/java/org/apache/atlas/odf/core/integrationtest/metadata/models/CachedMetadataStoreTest.java
----------------------------------------------------------------------
diff --git a/odf/odf-core/src/test/java/org/apache/atlas/odf/core/integrationtest/metadata/models/CachedMetadataStoreTest.java b/odf/odf-core/src/test/java/org/apache/atlas/odf/core/integrationtest/metadata/models/CachedMetadataStoreTest.java
new file mode 100755
index 0000000..4168b0e
--- /dev/null
+++ b/odf/odf-core/src/test/java/org/apache/atlas/odf/core/integrationtest/metadata/models/CachedMetadataStoreTest.java
@@ -0,0 +1,54 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.atlas.odf.core.integrationtest.metadata.models;
+
+import java.util.logging.Logger;
+
+import org.apache.atlas.odf.core.integrationtest.metadata.MetadataStoreTestBase;
+import org.apache.atlas.odf.core.metadata.WritableMetadataStore;
+import org.apache.atlas.odf.core.test.ODFTestLogger;
+import org.junit.Test;
+
+import org.apache.atlas.odf.api.metadata.models.CachedMetadataStore;
+import org.apache.atlas.odf.api.metadata.models.DataFile;
+import org.apache.atlas.odf.api.metadata.models.DataFileFolder;
+import org.apache.atlas.odf.api.metadata.models.Database;
+import org.apache.atlas.odf.api.metadata.models.Table;
+import org.apache.atlas.odf.core.test.TimerTestBase;
+
+public class CachedMetadataStoreTest extends TimerTestBase {
+ static protected Logger logger = ODFTestLogger.get();
+
+ @Test
+ public void testMetaDataCache() throws Exception {
+ // Note that only a subset of the metadata store test cases are used here because the MetaDataCache does not support queries
+ WritableMetadataStore mds = MetadataStoreTestBase.getWritableMetadataStore();
+ mds.resetAllData();
+ mds.createSampleData();
+ MetadataStoreTestBase.createAdditionalTestData(mds);
+
+ Database database = MetadataStoreTestBase.getDatabaseTestObject(mds);
+ MetadataStoreTestBase.checkReferences(new CachedMetadataStore(CachedMetadataStore.retrieveMetaDataCache(mds, database)), database);
+
+ Table table = MetadataStoreTestBase.getTableTestObject(mds);
+ MetadataStoreTestBase.checkReferences(new CachedMetadataStore(CachedMetadataStore.retrieveMetaDataCache(mds, table)), table);
+
+ DataFileFolder folder = MetadataStoreTestBase.getDataFileFolderTestObject(mds);
+ MetadataStoreTestBase.checkReferences(new CachedMetadataStore(CachedMetadataStore.retrieveMetaDataCache(mds, folder)), folder);
+
+ DataFile file = MetadataStoreTestBase.getDataFileTestObject(mds);
+ MetadataStoreTestBase.checkReferences(mds, file);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/6d19e129/odf/odf-core/src/test/java/org/apache/atlas/odf/core/test/ODFInternalFactoryTest.java
----------------------------------------------------------------------
diff --git a/odf/odf-core/src/test/java/org/apache/atlas/odf/core/test/ODFInternalFactoryTest.java b/odf/odf-core/src/test/java/org/apache/atlas/odf/core/test/ODFInternalFactoryTest.java
new file mode 100755
index 0000000..75d41c5
--- /dev/null
+++ b/odf/odf-core/src/test/java/org/apache/atlas/odf/core/test/ODFInternalFactoryTest.java
@@ -0,0 +1,58 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.atlas.odf.core.test;
+
+import static org.junit.Assert.assertNotNull;
+
+import java.util.logging.Logger;
+
+import org.apache.atlas.odf.core.ODFInternalFactory;
+import org.apache.atlas.odf.core.controlcenter.AnalysisRequestTrackerStore;
+import org.apache.atlas.odf.core.controlcenter.ExecutorServiceFactory;
+import org.apache.atlas.odf.core.messaging.DiscoveryServiceQueueManager;
+import org.junit.Test;
+
+import org.apache.atlas.odf.core.controlcenter.ControlCenter;
+import org.apache.atlas.odf.core.controlcenter.ThreadManager;
+import org.apache.atlas.odf.core.notification.NotificationManager;
+
+public class ODFInternalFactoryTest extends TimerTestBase {
+
+ Logger logger = ODFTestLogger.get();
+
+ @Test
+ public void testFactoryInstantiations() throws Exception {
+ try {
+ ODFInternalFactory factory = new ODFInternalFactory();
+ Class<?>[] interfaces = new Class<?>[] { //
+ DiscoveryServiceQueueManager.class, //
+ ControlCenter.class, //
+ AnalysisRequestTrackerStore.class, //
+ ThreadManager.class, //
+ ExecutorServiceFactory.class, //
+ NotificationManager.class, //
+ DiscoveryServiceQueueManager.class, //
+ };
+ for (Class<?> cl : interfaces) {
+ Object o = factory.create(cl);
+ assertNotNull(o);
+ logger.info("Object created for class " + cl.getName() + ": " + o.getClass().getName());
+ }
+ } catch (Exception e) {
+ e.printStackTrace();
+ throw e;
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/6d19e129/odf/odf-core/src/test/java/org/apache/atlas/odf/core/test/ODFTestBase.java
----------------------------------------------------------------------
diff --git a/odf/odf-core/src/test/java/org/apache/atlas/odf/core/test/ODFTestBase.java b/odf/odf-core/src/test/java/org/apache/atlas/odf/core/test/ODFTestBase.java
new file mode 100755
index 0000000..867f0a9
--- /dev/null
+++ b/odf/odf-core/src/test/java/org/apache/atlas/odf/core/test/ODFTestBase.java
@@ -0,0 +1,67 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.atlas.odf.core.test;
+
+import java.util.logging.Logger;
+
+import org.apache.atlas.odf.api.engine.SystemHealth;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import org.apache.atlas.odf.api.ODFFactory;
+import org.apache.atlas.odf.api.engine.EngineManager;
+
+/**
+ * All JUnit test cases that require proper Kafka setup should inherit from this class.
+ *
+ *
+ */
+public class ODFTestBase extends TimerTestBase {
+
+ static protected Logger log = ODFTestLogger.get();
+ @Test
+ public void testHealth() {
+ testHealth(true);
+ }
+
+ private void testHealth(boolean kafkaRunning) {
+ log.info("Starting health check...");
+ EngineManager engineManager = new ODFFactory().create().getEngineManager();
+ SystemHealth health = engineManager.checkHealthStatus();
+ if (!kafkaRunning) {
+ Assert.assertEquals(SystemHealth.HealthStatus.ERROR, health.getStatus());
+ } else {
+ Assert.assertEquals(SystemHealth.HealthStatus.OK, health.getStatus());
+ }
+ log.info("Health check finished");
+ }
+
+ @BeforeClass
+ public static void startup() throws Exception {
+ TestEnvironment.startAll();
+ }
+
+ @Before
+ public void setup() throws Exception {
+ testHealth(true);
+ }
+
+ @After
+ public void tearDown() throws Exception {
+ testHealth(true);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/6d19e129/odf/odf-core/src/test/java/org/apache/atlas/odf/core/test/ODFTestLogger.java
----------------------------------------------------------------------
diff --git a/odf/odf-core/src/test/java/org/apache/atlas/odf/core/test/ODFTestLogger.java b/odf/odf-core/src/test/java/org/apache/atlas/odf/core/test/ODFTestLogger.java
new file mode 100755
index 0000000..a845157
--- /dev/null
+++ b/odf/odf-core/src/test/java/org/apache/atlas/odf/core/test/ODFTestLogger.java
@@ -0,0 +1,24 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.atlas.odf.core.test;
+
+import java.util.logging.Logger;
+
+public class ODFTestLogger {
+
+ public static Logger get() {
+ return Logger.getLogger(ODFTestLogger.class.getName());
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/6d19e129/odf/odf-core/src/test/java/org/apache/atlas/odf/core/test/ODFTestcase.java
----------------------------------------------------------------------
diff --git a/odf/odf-core/src/test/java/org/apache/atlas/odf/core/test/ODFTestcase.java b/odf/odf-core/src/test/java/org/apache/atlas/odf/core/test/ODFTestcase.java
new file mode 100755
index 0000000..525dc83
--- /dev/null
+++ b/odf/odf-core/src/test/java/org/apache/atlas/odf/core/test/ODFTestcase.java
@@ -0,0 +1,27 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.atlas.odf.core.test;
+
+import org.junit.BeforeClass;
+
+import org.apache.atlas.odf.api.ODFFactory;
+
+public class ODFTestcase extends TimerTestBase {
+ @BeforeClass
+ public static void setupBeforeClass() {
+ TestEnvironment.startAll();
+ // Initialize analysis manager
+ new ODFFactory().create().getAnalysisManager();
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/6d19e129/odf/odf-core/src/test/java/org/apache/atlas/odf/core/test/TestEnvironment.java
----------------------------------------------------------------------
diff --git a/odf/odf-core/src/test/java/org/apache/atlas/odf/core/test/TestEnvironment.java b/odf/odf-core/src/test/java/org/apache/atlas/odf/core/test/TestEnvironment.java
new file mode 100755
index 0000000..06d407e
--- /dev/null
+++ b/odf/odf-core/src/test/java/org/apache/atlas/odf/core/test/TestEnvironment.java
@@ -0,0 +1,67 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.atlas.odf.core.test;
+
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import org.apache.atlas.odf.core.Environment;
+import org.apache.atlas.odf.core.ODFInitializer;
+import org.apache.atlas.odf.core.ODFInternalFactory;
+
+/**
+ * The class can be used to start components required for testing.
+ *
+ *
+ */
+public class TestEnvironment {
+
+ static Logger logger = Logger.getLogger(TestEnvironment.class.getName());
+
+ public static String MESSAGING_CLASS = "org.apache.atlas.odf.core.test.messaging.kafka.TestEnvironmentMessagingInitializer";
+
+ public static <T> T createObject(String className, Class<T> clazz) {
+ ClassLoader cl = TestEnvironment.class.getClassLoader();
+ // messaging
+ try {
+ Class<?> tei = cl.loadClass(className);
+ return (T) tei.newInstance();
+ } catch (Exception exc) {
+ logger.log(Level.WARNING, "An exception occurred when starting the messaging test environment", exc);
+ }
+ return null;
+ }
+
+ public static void start(String className) {
+ TestEnvironmentInitializer initializer = createObject(className, TestEnvironmentInitializer.class);
+ if (initializer != null) {
+ initializer.start();
+ }
+ }
+
+ public static void startMessaging() {
+ if ("true".equals(new ODFInternalFactory().create(Environment.class).getProperty("odf.dont.start.messaging"))) {
+ // do nothing
+ logger.info("Messaging test environment not started because environment variable odf.dont.start.messaging is set");
+ } else {
+ start(MESSAGING_CLASS);
+ }
+ }
+
+ public static void startAll() {
+ startMessaging();
+ ODFInitializer.start();
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/6d19e129/odf/odf-core/src/test/java/org/apache/atlas/odf/core/test/TestEnvironmentInitializer.java
----------------------------------------------------------------------
diff --git a/odf/odf-core/src/test/java/org/apache/atlas/odf/core/test/TestEnvironmentInitializer.java b/odf/odf-core/src/test/java/org/apache/atlas/odf/core/test/TestEnvironmentInitializer.java
new file mode 100755
index 0000000..b4a0022
--- /dev/null
+++ b/odf/odf-core/src/test/java/org/apache/atlas/odf/core/test/TestEnvironmentInitializer.java
@@ -0,0 +1,22 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.atlas.odf.core.test;
+
+public interface TestEnvironmentInitializer {
+ void start();
+
+ void stop();
+
+ String getName();
+}