You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@atlas.apache.org by ma...@apache.org on 2017/06/28 05:57:24 UTC
[11/25] incubator-atlas git commit: ATLAS-1898: initial commit of ODF
http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/6d19e129/odf/odf-core/src/test/java/org/apache/atlas/odf/core/test/TimerTestBase.java
----------------------------------------------------------------------
diff --git a/odf/odf-core/src/test/java/org/apache/atlas/odf/core/test/TimerTestBase.java b/odf/odf-core/src/test/java/org/apache/atlas/odf/core/test/TimerTestBase.java
new file mode 100755
index 0000000..68740e4
--- /dev/null
+++ b/odf/odf-core/src/test/java/org/apache/atlas/odf/core/test/TimerTestBase.java
@@ -0,0 +1,87 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.atlas.odf.core.test;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+import java.util.logging.Logger;
+
+import org.apache.wink.json4j.JSONException;
+import org.junit.AfterClass;
+import org.junit.Rule;
+import org.junit.rules.Stopwatch;
+import org.junit.runner.Description;
+
+import com.google.common.io.Files;
+
+public class TimerTestBase {
+ static final String logFilePath = "/tmp/odf-test-execution-log.csv";
+ static Map<String, HashMap<String, Long>> testTimeMap = new HashMap<String, HashMap<String, Long>>();
+ final static Logger logger = ODFTestLogger.get();
+
+ @Rule
+ public Stopwatch timeWatcher = new Stopwatch() {
+ @Override
+ protected void finished(long nanos, Description description) {
+ HashMap<String, Long> testMap = testTimeMap.get(description.getClassName());
+ if (testMap == null) {
+ testMap = new HashMap<String, Long>();
+ testTimeMap.put(description.getClassName(), testMap);
+ }
+ testMap.put(description.getMethodName(), (nanos / 1000 / 1000));
+ }
+ };
+
+ @AfterClass
+ public static void tearDownAndLogTimes() throws JSONException {
+ try {
+ File logFile = new File(logFilePath);
+ Set<String> uniqueRows = new HashSet<String>();
+ if (logFile.exists()) {
+ uniqueRows = new HashSet<String>(Files.readLines(logFile, StandardCharsets.UTF_8));
+ }
+
+ for (Entry<String, HashMap<String, Long>> entry : testTimeMap.entrySet()) {
+ for (Entry<String, Long> testEntry : entry.getValue().entrySet()) {
+ String logRow = new StringBuilder().append(testEntry.getKey()).append(",").append(testEntry.getValue()).append(",").append(entry.getKey()).append(",")
+ .append(System.getProperty("odf.build.project.name", "ProjectNameNotDefined")).toString();
+ uniqueRows.add(logRow);
+ }
+ }
+
+ StringBuilder logContent = new StringBuilder();
+ Iterator<String> rowIterator = uniqueRows.iterator();
+ while (rowIterator.hasNext()) {
+ logContent.append(rowIterator.next());
+ if (rowIterator.hasNext()) {
+ logContent.append("\n");
+ }
+ }
+
+ logger.info("Total time consumed by succeeded tests:\n" + logContent.toString());
+ logFile.createNewFile();
+ Files.write(logContent.toString().getBytes("UTF-8"), logFile);
+ } catch (IOException e) {
+ logger.warning("Error writing test execution log");
+ e.printStackTrace();
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/6d19e129/odf/odf-core/src/test/java/org/apache/atlas/odf/core/test/annotation/AnnotationExtensionTest.java
----------------------------------------------------------------------
diff --git a/odf/odf-core/src/test/java/org/apache/atlas/odf/core/test/annotation/AnnotationExtensionTest.java b/odf/odf-core/src/test/java/org/apache/atlas/odf/core/test/annotation/AnnotationExtensionTest.java
new file mode 100755
index 0000000..7a1f0ed
--- /dev/null
+++ b/odf/odf-core/src/test/java/org/apache/atlas/odf/core/test/annotation/AnnotationExtensionTest.java
@@ -0,0 +1,114 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.atlas.odf.core.test.annotation;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.logging.Logger;
+
+import org.apache.atlas.odf.api.metadata.models.Annotation;
+import org.apache.atlas.odf.api.metadata.models.ProfilingAnnotation;
+import org.apache.atlas.odf.json.JSONUtils;
+import org.apache.wink.json4j.JSONObject;
+import org.junit.Assert;
+import org.junit.Test;
+
+import com.fasterxml.jackson.core.Version;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.module.SimpleModule;
+import org.apache.atlas.odf.core.test.ODFTestLogger;
+import org.apache.atlas.odf.core.test.TimerTestBase;
+import org.apache.atlas.odf.json.AnnotationDeserializer;
+import org.apache.atlas.odf.json.AnnotationSerializer;
+
+public class AnnotationExtensionTest extends TimerTestBase {
+
+ static Logger logger = ODFTestLogger.get();
+
+ public static <T> T readJSONObjectFromFileInClasspath(ObjectMapper om, Class<T> cl, String pathToFile, ClassLoader classLoader) {
+ if (classLoader == null) {
+ // use current classloader if not provided
+ classLoader = AnnotationExtensionTest.class.getClassLoader();
+ }
+ InputStream is = classLoader.getResourceAsStream(pathToFile);
+ T result = null;
+ try {
+ result = om.readValue(is, cl);
+ } catch (IOException e) {
+ // assume that this is a severe error since the provided JSONs should be correct
+ throw new RuntimeException(e);
+ }
+
+ return result;
+ }
+
+ @Test
+ public void testWithUtils() throws Exception {
+ testSimple(JSONUtils.getGlobalObjectMapper());
+ }
+
+ @Test
+ public void testWithSeparateObjectMapper() throws Exception {
+ ObjectMapper om = new ObjectMapper();
+ SimpleModule mod = new SimpleModule("annotation module", Version.unknownVersion());
+ mod.addDeserializer(Annotation.class, new AnnotationDeserializer());
+ mod.addSerializer(Annotation.class, new AnnotationSerializer());
+ om.registerModule(mod);
+ testSimple(om);
+ }
+
+ private void testSimple(ObjectMapper om) throws Exception {
+ ExtensionTestAnnotation newTestAnnot = new ExtensionTestAnnotation();
+ String strValue = "newstring1";
+ int intValue = 4237;
+ newTestAnnot.setNewStringProp1(strValue);
+ newTestAnnot.setNewIntProp2(intValue);
+// String newTestAnnotJSON = om.writeValueAsString(newTestAnnot);
+ String newTestAnnotJSON = JSONUtils.toJSON(newTestAnnot).toString();
+ logger.info("New test annot JSON: " + newTestAnnotJSON);
+
+ logger.info("Deserializing with " + Annotation.class.getSimpleName() + "class as target class");
+ Annotation annot1 = om.readValue(newTestAnnotJSON, Annotation.class);
+ Assert.assertNotNull(annot1);
+ logger.info("Deserialized annotation JSON (target: " + Annotation.class.getSimpleName() + "): " + om.writeValueAsString(annot1));
+ logger.info("Deserialized annotation class (target: " + Annotation.class.getSimpleName() + "): " + annot1.getClass().getName());
+ Assert.assertEquals(ExtensionTestAnnotation.class, annot1.getClass());
+ ExtensionTestAnnotation extAnnot1 = (ExtensionTestAnnotation) annot1;
+ Assert.assertEquals(strValue, extAnnot1.getNewStringProp1());
+ Assert.assertEquals(intValue, extAnnot1.getNewIntProp2());
+
+ /* This does not make sense as you would never enter ExtensionTestAnnotation.class as deserialization target
+ * which would enforce usage of the standard Bean serializer (since no serializer is registered for this specific class -> jsonProperties can not be mapped
+ logger.info("Calling deserialization with " + ExtensionTestAnnotation.class.getSimpleName() + " as target");
+ ExtensionTestAnnotation annot2 = om.readValue(newTestAnnotJSON, ExtensionTestAnnotation.class);
+ Assert.assertNotNull(annot2);
+ logger.info("Deserialized annotation JSON (target: " + ExtensionTestAnnotation.class.getSimpleName() + "): " + om.writeValueAsString(annot2));
+ logger.info("Deserialized annotation class (target: " + ExtensionTestAnnotation.class.getSimpleName() + "): " + annot2.getClass().getName());
+ Assert.assertEquals(ExtensionTestAnnotation.class, annot2.getClass());
+ String s = annot2.getNewStringProp1();
+ Assert.assertEquals(strValue, annot2.getNewStringProp1());
+ Assert.assertEquals(intValue, annot2.getNewIntProp2()); */
+
+ logger.info("Processing profiling annotation...");
+ Annotation unknownAnnot = readJSONObjectFromFileInClasspath(om, Annotation.class, "org/apache/atlas/odf/core/test/annotation/annotexttest1.json", null);
+ Assert.assertNotNull(unknownAnnot);
+ logger.info("Read Unknown annotation: " + unknownAnnot.getClass().getName());
+ Assert.assertEquals(ProfilingAnnotation.class, unknownAnnot.getClass());
+
+ logger.info("Read profiling annotation: " + om.writeValueAsString(unknownAnnot));
+ JSONObject jsonPropertiesObj = new JSONObject(unknownAnnot.getJsonProperties());
+ Assert.assertEquals("newProp1Value", jsonPropertiesObj.get("newProp1"));
+ Assert.assertEquals((Integer) 4237, jsonPropertiesObj.get("newProp2"));
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/6d19e129/odf/odf-core/src/test/java/org/apache/atlas/odf/core/test/annotation/AnnotationStoreTest.java
----------------------------------------------------------------------
diff --git a/odf/odf-core/src/test/java/org/apache/atlas/odf/core/test/annotation/AnnotationStoreTest.java b/odf/odf-core/src/test/java/org/apache/atlas/odf/core/test/annotation/AnnotationStoreTest.java
new file mode 100755
index 0000000..b65ce17
--- /dev/null
+++ b/odf/odf-core/src/test/java/org/apache/atlas/odf/core/test/annotation/AnnotationStoreTest.java
@@ -0,0 +1,62 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.atlas.odf.core.test.annotation;
+
+import java.util.List;
+import java.util.UUID;
+
+import org.apache.atlas.odf.api.metadata.models.Annotation;
+import org.apache.atlas.odf.api.metadata.models.ProfilingAnnotation;
+import org.junit.Assert;
+import org.junit.Test;
+
+import org.apache.atlas.odf.api.metadata.MetaDataObjectReference;
+import org.apache.atlas.odf.api.annotation.AnnotationStore;
+import org.apache.atlas.odf.core.controlcenter.DefaultStatusQueueStore;
+import org.apache.atlas.odf.core.test.ODFTestcase;
+
+public class AnnotationStoreTest extends ODFTestcase {
+
+ private AnnotationStore createAnnotationStore() {
+ return new DefaultStatusQueueStore();
+ }
+
+ @Test
+ public void testStoreProfilingAnnotation() throws Exception {
+ AnnotationStore as = createAnnotationStore();
+
+ String modRef1Id = UUID.randomUUID().toString();
+ MetaDataObjectReference mdoref1 = new MetaDataObjectReference();
+ mdoref1.setId(modRef1Id);
+
+ ProfilingAnnotation annot1 = new ProfilingAnnotation();
+ annot1.setJsonProperties("{\"a\": \"b\"}");
+ annot1.setAnnotationType("AnnotType1");
+ annot1.setProfiledObject(mdoref1);
+
+ MetaDataObjectReference annot1Ref = as.store(annot1);
+ Assert.assertNotNull(annot1Ref.getId());
+ List<Annotation> retrievedAnnots = as.getAnnotations(mdoref1, null);
+ Assert.assertEquals(1, retrievedAnnots.size());
+
+ Annotation retrievedAnnot = retrievedAnnots.get(0);
+ Assert.assertTrue(annot1 != retrievedAnnot);
+ Assert.assertTrue(retrievedAnnot instanceof ProfilingAnnotation);
+ ProfilingAnnotation retrievedProfilingAnnotation = (ProfilingAnnotation) retrievedAnnot;
+ Assert.assertEquals(modRef1Id, retrievedProfilingAnnotation.getProfiledObject().getId());
+ Assert.assertEquals(annot1Ref, retrievedAnnot.getReference());
+
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/6d19e129/odf/odf-core/src/test/java/org/apache/atlas/odf/core/test/annotation/ExtensionTestAnnotation.java
----------------------------------------------------------------------
diff --git a/odf/odf-core/src/test/java/org/apache/atlas/odf/core/test/annotation/ExtensionTestAnnotation.java b/odf/odf-core/src/test/java/org/apache/atlas/odf/core/test/annotation/ExtensionTestAnnotation.java
new file mode 100755
index 0000000..cd8f695
--- /dev/null
+++ b/odf/odf-core/src/test/java/org/apache/atlas/odf/core/test/annotation/ExtensionTestAnnotation.java
@@ -0,0 +1,39 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.atlas.odf.core.test.annotation;
+
+import org.apache.atlas.odf.api.metadata.models.ProfilingAnnotation;
+
+class ExtensionTestAnnotation extends ProfilingAnnotation {
+
+ private String newStringProp1;
+ private int newIntProp2;
+
+ public String getNewStringProp1() {
+ return newStringProp1;
+ }
+
+ public void setNewStringProp1(String newStringProp1) {
+ this.newStringProp1 = newStringProp1;
+ }
+
+ public int getNewIntProp2() {
+ return newIntProp2;
+ }
+
+ public void setNewIntProp2(int newIntProp2) {
+ this.newIntProp2 = newIntProp2;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/6d19e129/odf/odf-core/src/test/java/org/apache/atlas/odf/core/test/annotation/TestSyncDiscoveryServiceWritingExtendedAnnotations.java
----------------------------------------------------------------------
diff --git a/odf/odf-core/src/test/java/org/apache/atlas/odf/core/test/annotation/TestSyncDiscoveryServiceWritingExtendedAnnotations.java b/odf/odf-core/src/test/java/org/apache/atlas/odf/core/test/annotation/TestSyncDiscoveryServiceWritingExtendedAnnotations.java
new file mode 100755
index 0000000..f65e3ad
--- /dev/null
+++ b/odf/odf-core/src/test/java/org/apache/atlas/odf/core/test/annotation/TestSyncDiscoveryServiceWritingExtendedAnnotations.java
@@ -0,0 +1,147 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.atlas.odf.core.test.annotation;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.logging.Logger;
+
+import org.apache.atlas.odf.api.metadata.MetaDataObjectReference;
+import org.apache.atlas.odf.api.metadata.models.Annotation;
+import org.apache.atlas.odf.api.metadata.models.ProfilingAnnotation;
+import org.apache.atlas.odf.api.discoveryservice.DiscoveryServiceBase;
+import org.apache.atlas.odf.api.discoveryservice.DiscoveryServiceRequest;
+import org.apache.atlas.odf.api.discoveryservice.DiscoveryServiceResult;
+import org.apache.atlas.odf.api.discoveryservice.sync.DiscoveryServiceSyncResponse;
+import org.apache.atlas.odf.api.discoveryservice.sync.SyncDiscoveryService;
+import org.apache.atlas.odf.core.test.ODFTestLogger;
+import org.apache.atlas.odf.json.JSONUtils;
+import org.apache.atlas.odf.api.discoveryservice.DiscoveryServiceResponse;
+
+public class TestSyncDiscoveryServiceWritingExtendedAnnotations extends DiscoveryServiceBase implements SyncDiscoveryService {
+ Logger logger = ODFTestLogger.get();
+
+ public static class SyncDiscoveryServiceAnnotation extends ProfilingAnnotation {
+ private String prop1 = "";
+ private int prop2 = 4237;
+ private MyObject prop3 = new MyObject();
+
+ public String getProp1() {
+ return prop1;
+ }
+
+ public void setProp1(String prop1) {
+ this.prop1 = prop1;
+ }
+
+ public int getProp2() {
+ return prop2;
+ }
+
+ public void setProp2(int prop2) {
+ this.prop2 = prop2;
+ }
+
+ public MyObject getProp3() {
+ return prop3;
+ }
+
+ public void setProp3(MyObject prop3) {
+ this.prop3 = prop3;
+ }
+
+ }
+
+ public static class MyObject {
+ private String anotherProp = "";
+
+ public String getAnotherProp() {
+ return anotherProp;
+ }
+
+ public void setAnotherProp(String anotherProp) {
+ this.anotherProp = anotherProp;
+ }
+
+ private MyOtherObject yetAnotherProp = new MyOtherObject();
+
+ public MyOtherObject getYetAnotherProp() {
+ return yetAnotherProp;
+ }
+
+ public void setYetAnotherProp(MyOtherObject yetAnotherProp) {
+ this.yetAnotherProp = yetAnotherProp;
+ }
+
+ }
+
+ public static class MyOtherObject {
+ private String myOtherObjectProperty = "";
+
+ public String getMyOtherObjectProperty() {
+ return myOtherObjectProperty;
+ }
+
+ public void setMyOtherObjectProperty(String myOtherObjectProperty) {
+ this.myOtherObjectProperty = myOtherObjectProperty;
+ }
+
+ }
+
+ @Override
+ public DiscoveryServiceSyncResponse runAnalysis(DiscoveryServiceRequest request) {
+ try {
+ MetaDataObjectReference dataSetRef = request.getDataSetContainer().getDataSet().getReference();
+
+ List<Annotation> annotations = new ArrayList<>();
+ SyncDiscoveryServiceAnnotation annotation1 = new SyncDiscoveryServiceAnnotation();
+ String annotation1_prop1 = "prop1_1_" + dataSetRef.getUrl();
+ annotation1.setProp1(annotation1_prop1);
+ annotation1.setProp2(annotation1_prop1.hashCode());
+ annotation1.setProfiledObject(dataSetRef);
+ MyObject mo1 = new MyObject();
+ MyOtherObject moo1 = new MyOtherObject();
+ moo1.setMyOtherObjectProperty("nestedtwolevels" + annotation1_prop1);
+ mo1.setYetAnotherProp(moo1);
+ mo1.setAnotherProp("nested" + annotation1_prop1);
+ annotation1.setProp3(mo1);
+ annotations.add(annotation1);
+
+ SyncDiscoveryServiceAnnotation annotation2 = new SyncDiscoveryServiceAnnotation();
+ String annotation2_prop1 = "prop1_2_" + dataSetRef.getUrl();
+ annotation2.setProp1(annotation2_prop1);
+ annotation2.setProp2(annotation2_prop1.hashCode());
+ annotation2.setProfiledObject(dataSetRef);
+ MyObject mo2 = new MyObject();
+ MyOtherObject moo2 = new MyOtherObject();
+ moo2.setMyOtherObjectProperty("nestedtwolevels" + annotation2_prop1);
+ mo2.setYetAnotherProp(moo2);
+ mo2.setAnotherProp("nested" + annotation2_prop1);
+ annotation2.setProp3(mo2);
+ annotations.add(annotation2);
+
+ DiscoveryServiceSyncResponse resp = new DiscoveryServiceSyncResponse();
+ resp.setCode(DiscoveryServiceResponse.ResponseCode.OK);
+ DiscoveryServiceResult dsResult = new DiscoveryServiceResult();
+ dsResult.setAnnotations(annotations);
+ resp.setResult(dsResult);
+ resp.setDetails(this.getClass().getName() + ".runAnalysis finished OK");
+
+ logger.info("Returning from discovery service " + this.getClass().getSimpleName() + " with result: " + JSONUtils.toJSON(resp));
+ return resp;
+ } catch (Exception exc) {
+ throw new RuntimeException(exc);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/6d19e129/odf/odf-core/src/test/java/org/apache/atlas/odf/core/test/annotation/TestSyncDiscoveryServiceWritingJsonAnnotations.java
----------------------------------------------------------------------
diff --git a/odf/odf-core/src/test/java/org/apache/atlas/odf/core/test/annotation/TestSyncDiscoveryServiceWritingJsonAnnotations.java b/odf/odf-core/src/test/java/org/apache/atlas/odf/core/test/annotation/TestSyncDiscoveryServiceWritingJsonAnnotations.java
new file mode 100755
index 0000000..91b544c
--- /dev/null
+++ b/odf/odf-core/src/test/java/org/apache/atlas/odf/core/test/annotation/TestSyncDiscoveryServiceWritingJsonAnnotations.java
@@ -0,0 +1,63 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.atlas.odf.core.test.annotation;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.logging.Logger;
+
+import org.apache.atlas.odf.api.metadata.MetaDataObjectReference;
+import org.apache.atlas.odf.api.metadata.models.Annotation;
+import org.apache.atlas.odf.api.metadata.models.ProfilingAnnotation;
+import org.apache.atlas.odf.core.Utils;
+import org.apache.atlas.odf.api.discoveryservice.DiscoveryServiceBase;
+import org.apache.atlas.odf.api.discoveryservice.DiscoveryServiceRequest;
+import org.apache.atlas.odf.api.discoveryservice.DiscoveryServiceResult;
+import org.apache.atlas.odf.api.discoveryservice.sync.DiscoveryServiceSyncResponse;
+import org.apache.atlas.odf.api.discoveryservice.sync.SyncDiscoveryService;
+import org.apache.atlas.odf.core.test.ODFTestLogger;
+import org.apache.atlas.odf.json.JSONUtils;
+import org.apache.atlas.odf.api.discoveryservice.DiscoveryServiceResponse;
+
+public class TestSyncDiscoveryServiceWritingJsonAnnotations extends DiscoveryServiceBase implements SyncDiscoveryService {
+ Logger logger = ODFTestLogger.get();
+ private String annotationResult = Utils.getInputStreamAsString(this.getClass().getClassLoader().getResourceAsStream("org/apache/atlas/odf/core/integrationtest/metadata/internal/atlas/nested_annotation_example.json"), "UTF-8");
+
+ @Override
+ public DiscoveryServiceSyncResponse runAnalysis(DiscoveryServiceRequest request) {
+ try {
+ MetaDataObjectReference dataSetRef = request.getDataSetContainer().getDataSet().getReference();
+
+ List<Annotation> annotations = new ArrayList<>();
+ ProfilingAnnotation annotation1 = new ProfilingAnnotation();
+ annotation1.setProfiledObject(dataSetRef);
+ annotation1.setJsonProperties(annotationResult);
+ annotation1.setAnnotationType("JsonAnnotationWriteTest");
+ annotation1.setJavaClass("JsonAnnotationWriteTest");
+ annotations.add(annotation1);
+
+ DiscoveryServiceSyncResponse resp = new DiscoveryServiceSyncResponse();
+ resp.setCode(DiscoveryServiceResponse.ResponseCode.OK);
+ DiscoveryServiceResult dsResult = new DiscoveryServiceResult();
+ dsResult.setAnnotations(annotations);
+ resp.setResult(dsResult);
+ resp.setDetails(this.getClass().getName() + ".runAnalysis finished OK");
+
+ logger.info("Returning from discovery service " + this.getClass().getSimpleName() + " with result: " + JSONUtils.toJSON(resp));
+ return resp;
+ } catch (Exception exc) {
+ throw new RuntimeException(exc);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/6d19e129/odf/odf-core/src/test/java/org/apache/atlas/odf/core/test/configuration/ODFConfigurationTest.java
----------------------------------------------------------------------
diff --git a/odf/odf-core/src/test/java/org/apache/atlas/odf/core/test/configuration/ODFConfigurationTest.java b/odf/odf-core/src/test/java/org/apache/atlas/odf/core/test/configuration/ODFConfigurationTest.java
new file mode 100755
index 0000000..b1d2518
--- /dev/null
+++ b/odf/odf-core/src/test/java/org/apache/atlas/odf/core/test/configuration/ODFConfigurationTest.java
@@ -0,0 +1,165 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.atlas.odf.core.test.configuration;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.Map;
+import java.util.logging.Logger;
+
+import org.apache.atlas.odf.api.ODFFactory;
+import org.apache.atlas.odf.api.discoveryservice.ServiceNotFoundException;
+import org.apache.atlas.odf.api.settings.KafkaMessagingConfiguration;
+import org.apache.atlas.odf.api.settings.ODFSettings;
+import org.apache.atlas.odf.api.settings.SettingsManager;
+import org.apache.atlas.odf.api.settings.validation.ValidationException;
+import org.apache.atlas.odf.core.ODFInternalFactory;
+import org.apache.atlas.odf.core.Utils;
+import org.apache.atlas.odf.core.configuration.ConfigContainer;
+import org.apache.atlas.odf.core.configuration.ConfigManager;
+import org.apache.atlas.odf.core.test.ODFTestLogger;
+import org.apache.atlas.odf.core.test.ODFTestcase;
+import org.apache.atlas.odf.json.JSONUtils;
+import org.apache.wink.json4j.JSONException;
+import org.apache.wink.json4j.JSONObject;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import com.fasterxml.jackson.core.JsonParseException;
+import com.fasterxml.jackson.databind.JsonMappingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+
+/**
+ * this test uses a mocked storage therefore no zookeeper is required
+ */
+public class ODFConfigurationTest extends ODFTestcase {
+
+ Logger logger = ODFTestLogger.get();
+
+ @Before
+ public void setupDefaultConfig() throws JsonParseException, JsonMappingException, IOException, ValidationException, JSONException {
+ logger.info("reset config to default");
+ InputStream is = ODFConfigurationTest.class.getClassLoader().getResourceAsStream("org/apache/atlas/odf/core/test/internal/odf-initial-configuration.json");
+ ConfigContainer defaultConfig = new ObjectMapper().readValue(is, ConfigContainer.class);
+ ConfigManager configManager = new ODFInternalFactory().create(ConfigManager.class);
+ configManager.updateConfigContainer(defaultConfig);
+ }
+
+ @Test
+ public void testUserDefinedMerge() throws JsonParseException, JsonMappingException, IOException {
+ InputStream is = ODFConfigurationTest.class.getClassLoader().getResourceAsStream("org/apache/atlas/odf/core/test/internal/odf-initial-configuration.json");
+ ConfigContainer defaultConfig;
+ defaultConfig = new ObjectMapper().readValue(is, ConfigContainer.class);
+ //set testProps to defaultValues to be overwritten
+ defaultConfig.getOdf().getUserDefined().put("testProp", "defaultValue");
+ defaultConfig.getOdf().getUserDefined().put("testProp2", "defaultValue");
+ logger.info("Read config: " + defaultConfig);
+
+ //config example with userdefined property testProp to 123
+ String value = "{\r\n\t\"odf\" : {\r\n\t\"userDefined\" : {\r\n\t\t\"testProp\" : 123\r\n\t}\r\n}\r\n}\r\n";
+ ConfigContainer props = new ObjectMapper().readValue(value, ConfigContainer.class);
+ Utils.mergeODFPOJOs(defaultConfig, props);
+ logger.info("Mergded config: " + defaultConfig);
+
+ Assert.assertEquals(123, defaultConfig.getOdf().getUserDefined().get("testProp"));
+ Assert.assertEquals("defaultValue", defaultConfig.getOdf().getUserDefined().get("testProp2"));
+ }
+
+ @Test
+ public void testValidation() throws JsonParseException, JsonMappingException, IOException {
+ boolean exceptionOccured = false;
+ String value = "{\r\n\t\"odf\" : {\r\n\t\t\"discoveryServiceWatcherWaitMs\" : -5\r\n\t}\r\n}\r\n";
+ try {
+ ConfigContainer props = new ObjectMapper().readValue(value, ConfigContainer.class);
+ props.validate();
+ } catch (ValidationException e) {
+ exceptionOccured = true;
+ }
+
+ Assert.assertTrue(exceptionOccured);
+ }
+
+ @Test
+ public void testMerge() throws JsonParseException, JsonMappingException, IOException {
+ InputStream is = ODFConfigurationTest.class.getClassLoader().getResourceAsStream("org/apache/atlas/odf/core/test/internal/odf-initial-configuration.json");
+ ConfigContainer defaultConfig;
+ defaultConfig = new ObjectMapper().readValue(is, ConfigContainer.class);
+ //config example with ODF - queueConsumerWaitMs property value 777
+ String value = "{\r\n\t\"odf\" : {\r\n\t\t\"discoveryServiceWatcherWaitMs\" : 777\r\n\t}\r\n}\r\n";
+ ConfigContainer props = new ObjectMapper().readValue(value, ConfigContainer.class);
+ Utils.mergeODFPOJOs(defaultConfig, props);
+
+ // TODOCONFIG, move next line to kafka tests
+ // Assert.assertEquals(777, defaultConfig.getOdf().getQueueConsumerWaitMs().intValue());
+ }
+
+ @Test
+ public void testDeepMerge() throws JsonParseException, JsonMappingException, IOException {
+ InputStream is = ODFConfigurationTest.class.getClassLoader().getResourceAsStream("org/apache/atlas/odf/core/test/internal/odf-initial-configuration.json");
+ ConfigContainer defaultConfig;
+ defaultConfig = new ObjectMapper().readValue(is, ConfigContainer.class);
+ //config example with ODF - kafkaConsumer - offsetsStorage property value TEST. All other values for the kafkaConsumer should stay the same!
+ String value = "{\r\n\t\"odf\" : {\r\n\"messagingConfiguration\": { \"type\": \"" + KafkaMessagingConfiguration.class.getName()
+ + "\", \t\t\"kafkaConsumerConfig\" : { \r\n\t\t\t\"offsetsStorage\" : \"TEST\"\r\n\t\t}\r\n\t}\r\n}}\r\n";
+ ConfigContainer props = new ObjectMapper().readValue(value, ConfigContainer.class);
+ Utils.mergeODFPOJOs(defaultConfig, props);
+
+ // TODOCONFIG
+ // Assert.assertEquals("TEST", defaultConfig.getOdf().getKafkaConsumerConfig().getOffsetsStorage());
+ //make sure the rest is still default
+ // Assert.assertEquals(400, defaultConfig.getOdf().getKafkaConsumerConfig().getZookeeperSessionTimeoutMs().intValue());
+ }
+
+ @Test
+ public void testGet() {
+ Assert.assertTrue(new ODFFactory().create().getSettingsManager().getODFSettings().isReuseRequests());
+ }
+
+ @Test
+ public void testPut() throws InterruptedException, IOException, ValidationException, JSONException, ServiceNotFoundException {
+ SettingsManager config = new ODFFactory().create().getSettingsManager();
+ String propertyId = "my_dummy_test_property";
+ int testNumber = 123;
+ Map<String, Object> cont = config.getUserDefinedConfig();
+ cont.put(propertyId, testNumber);
+ config.updateUserDefined(cont);
+ Assert.assertEquals(testNumber, config.getUserDefinedConfig().get(propertyId));
+
+ String testString = "test";
+ cont.put(propertyId, testString);
+ config.updateUserDefined(cont);
+
+ Assert.assertEquals(testString, config.getUserDefinedConfig().get(propertyId));
+
+ JSONObject testJson = new JSONObject();
+ testJson.put("testProp", "test");
+ cont.put(propertyId, testJson);
+ config.updateUserDefined(cont);
+
+ Assert.assertEquals(testJson, config.getUserDefinedConfig().get(propertyId));
+
+ ODFSettings settings = config.getODFSettings();
+ logger.info("Last update object: " + JSONUtils.toJSON(settings));
+ Assert.assertNotNull(settings);
+ Assert.assertNotNull(settings.getUserDefined());
+ Assert.assertNotNull(settings.getUserDefined().get(propertyId));
+ logger.info("User defined object: " + settings.getUserDefined().get(propertyId).getClass());
+ @SuppressWarnings("unchecked")
+ Map<String, Object> notifiedNestedJSON = (Map<String, Object>) settings.getUserDefined().get(propertyId);
+ Assert.assertNotNull(notifiedNestedJSON.get("testProp"));
+ Assert.assertTrue(notifiedNestedJSON.get("testProp") instanceof String);
+ Assert.assertEquals("test", notifiedNestedJSON.get("testProp"));
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/6d19e129/odf/odf-core/src/test/java/org/apache/atlas/odf/core/test/configuration/PasswordEncryptionTest.java
----------------------------------------------------------------------
diff --git a/odf/odf-core/src/test/java/org/apache/atlas/odf/core/test/configuration/PasswordEncryptionTest.java b/odf/odf-core/src/test/java/org/apache/atlas/odf/core/test/configuration/PasswordEncryptionTest.java
new file mode 100755
index 0000000..aea9a30
--- /dev/null
+++ b/odf/odf-core/src/test/java/org/apache/atlas/odf/core/test/configuration/PasswordEncryptionTest.java
@@ -0,0 +1,83 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.atlas.odf.core.test.configuration;
+
+import java.util.logging.Logger;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import org.apache.atlas.odf.core.Encryption;
+import org.apache.atlas.odf.api.ODFFactory;
+import org.apache.atlas.odf.api.settings.SettingsManager;
+import org.apache.atlas.odf.api.settings.SparkConfig;
+import org.apache.atlas.odf.api.settings.ODFSettings;
+import org.apache.atlas.odf.core.test.ODFTestLogger;
+import org.apache.atlas.odf.core.test.TimerTestBase;
+import org.apache.atlas.odf.json.JSONUtils;
+
+public class PasswordEncryptionTest extends TimerTestBase {
+ Logger logger = ODFTestLogger.get();
+ private static final String SPARK_PASSWORD_CONFIG = "spark.authenticate.secret";
+
+ @Test
+ public void testGeneralPasswordEncryption() throws Exception {
+ SettingsManager settings = new ODFFactory().create().getSettingsManager();
+ ODFSettings settingsWithPlainPasswords = settings.getODFSettingsHidePasswords();
+ settingsWithPlainPasswords.setOdfPassword("newOdfPassword");
+ logger.info("Settings with plain password: " + JSONUtils.toJSON(settingsWithPlainPasswords));
+ settings.updateODFSettings(settingsWithPlainPasswords);
+
+ ODFSettings settingsWithHiddenPasswords = settings.getODFSettingsHidePasswords();
+ String hiddenPasswordIdentifyier = "***hidden***";
+ Assert.assertEquals(hiddenPasswordIdentifyier, settingsWithHiddenPasswords.getOdfPassword());
+ logger.info("Settings with hidden password: " + JSONUtils.toJSON(settingsWithHiddenPasswords));
+
+ ODFSettings settingsWithEncryptedPassword = settings.getODFSettings();
+ Assert.assertEquals("newOdfPassword", Encryption.decryptText(settingsWithEncryptedPassword.getOdfPassword()));
+ logger.info("Settings with encrypted password: " + JSONUtils.toJSON(settingsWithEncryptedPassword));
+
+ // When overwriting settings with hidden passwords, encrypted passwords must be kept internally
+ settings.updateODFSettings(settingsWithHiddenPasswords);
+ settingsWithEncryptedPassword = settings.getODFSettings();
+ Assert.assertEquals("newOdfPassword", Encryption.decryptText(settingsWithEncryptedPassword.getOdfPassword()));
+ }
+
+ @Test
+ public void testSparkConfigEncryption() throws Exception {
+ SettingsManager settings = new ODFFactory().create().getSettingsManager();
+ SparkConfig plainSparkConfig = new SparkConfig();
+ plainSparkConfig.setConfig(SPARK_PASSWORD_CONFIG, "plainConfigValue");
+ ODFSettings settingsWithPlainPasswords = settings.getODFSettings();
+ settingsWithPlainPasswords.setSparkConfig(plainSparkConfig);;
+ logger.info("Settings with plain password: " + JSONUtils.toJSON(settingsWithPlainPasswords));
+ settings.updateODFSettings(settingsWithPlainPasswords);
+
+ ODFSettings settingsWithHiddenPasswords = settings.getODFSettingsHidePasswords();
+ String hiddenPasswordIdentifyier = "***hidden***";
+ String hiddenConfigValue = (String) settingsWithHiddenPasswords.getSparkConfig().getConfigs().get(SPARK_PASSWORD_CONFIG);
+ Assert.assertEquals(hiddenPasswordIdentifyier, hiddenConfigValue);
+ logger.info("Config with hidden password: " + JSONUtils.toJSON(settingsWithHiddenPasswords));
+
+ ODFSettings settingsWithEncryptedPassword = settings.getODFSettings();
+ String encryptedConfigValue = (String) settingsWithEncryptedPassword.getSparkConfig().getConfigs().get(SPARK_PASSWORD_CONFIG);
+ Assert.assertEquals("plainConfigValue", Encryption.decryptText(encryptedConfigValue));
+ logger.info("Config with encrypted password: " + JSONUtils.toJSON(settingsWithEncryptedPassword));
+
+ // When overwriting settings with hidden passwords, encrypted passwords must be kept internally
+ settings.updateODFSettings(settingsWithHiddenPasswords);
+ encryptedConfigValue = (String) settingsWithEncryptedPassword.getSparkConfig().getConfigs().get(SPARK_PASSWORD_CONFIG);
+ Assert.assertEquals("plainConfigValue", Encryption.decryptText(encryptedConfigValue));
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/6d19e129/odf/odf-core/src/test/java/org/apache/atlas/odf/core/test/configuration/ValidationTests.java
----------------------------------------------------------------------
diff --git a/odf/odf-core/src/test/java/org/apache/atlas/odf/core/test/configuration/ValidationTests.java b/odf/odf-core/src/test/java/org/apache/atlas/odf/core/test/configuration/ValidationTests.java
new file mode 100755
index 0000000..3db5778
--- /dev/null
+++ b/odf/odf-core/src/test/java/org/apache/atlas/odf/core/test/configuration/ValidationTests.java
@@ -0,0 +1,103 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.atlas.odf.core.test.configuration;
+
+import java.util.Collections;
+
+import org.apache.atlas.odf.api.settings.validation.EnumValidator;
+import org.apache.atlas.odf.api.settings.validation.ValidationException;
+import org.junit.Assert;
+import org.junit.Test;
+
+import org.apache.atlas.odf.core.ODFInternalFactory;
+import org.apache.atlas.odf.api.ODFFactory;
+import org.apache.atlas.odf.core.configuration.ConfigContainer;
+import org.apache.atlas.odf.core.configuration.ConfigManager;
+import org.apache.atlas.odf.core.configuration.ServiceValidator;
+import org.apache.atlas.odf.api.discoveryservice.DiscoveryServiceManager;
+import org.apache.atlas.odf.api.discoveryservice.DiscoveryServiceProperties;
+import org.apache.atlas.odf.api.settings.validation.ImplementationValidator;
+import org.apache.atlas.odf.api.settings.validation.NumberPositiveValidator;
+import org.apache.atlas.odf.api.settings.validation.PropertyValidator;
+import org.apache.atlas.odf.core.test.TimerTestBase;
+import org.apache.atlas.odf.core.test.discoveryservice.TestAsyncDiscoveryServiceWritingAnnotations1;
+import org.apache.atlas.odf.json.JSONUtils;
+
+public class ValidationTests extends TimerTestBase {
+
+ @Test
+ public void testEnum() {
+ String[] vals = new String[] { "test", "test2" };
+ String correct = "test";
+ String incorrect = "fail";
+
+ Assert.assertTrue(validateTest(correct, new EnumValidator(vals)));
+ Assert.assertFalse(validateTest(incorrect, new EnumValidator(vals)));
+ }
+
+ @Test
+ public void testImplementation() {
+ String correct = TestAsyncDiscoveryServiceWritingAnnotations1.class.getName();
+ String incorrect = "dummyClass";
+ Assert.assertTrue(validateTest(correct, new ImplementationValidator()));
+ Assert.assertFalse(validateTest(incorrect, new ImplementationValidator()));
+ }
+
+ @Test
+ public void testService() throws Exception {
+ String s = "{\r\n" +
+ " \"id\": \"asynctestservice\",\r\n" +
+ " \"name\": \"Async test\",\r\n" +
+ " \"description\": \"The async test service\",\r\n" +
+ " \"endpoint\": {\r\n" +
+ " \"runtimeName\": \"Java\",\r\n" +
+ " \"className\": \"TestAsyncDiscoveryService1\"\r\n" +
+ " }\r\n" +
+ " }";
+
+ DiscoveryServiceProperties newService = JSONUtils.fromJSON(s, DiscoveryServiceProperties.class);
+ DiscoveryServiceManager discoveryServicesManager = new ODFFactory().create().getDiscoveryServiceManager();
+ //ODFConfig odfConfig = new ODFFactory().create(ODFConfiguration.class).getODFConfig();
+
+ ConfigContainer new1 = new ConfigContainer();
+ new1.setRegisteredServices(Collections.singletonList(newService));
+ ConfigManager configManager = new ODFInternalFactory().create(ConfigManager.class);
+ configManager.updateConfigContainer(new1);
+
+ DiscoveryServiceProperties correct = discoveryServicesManager.getDiscoveryServicesProperties().get(0);
+ Assert.assertEquals("asynctestservice", correct.getId());
+ correct.setId("newId");
+ DiscoveryServiceProperties incorrect = new DiscoveryServiceProperties();
+ Assert.assertTrue(validateTest(correct, new ServiceValidator()));
+ Assert.assertFalse(validateTest(incorrect, new ServiceValidator()));
+ }
+
+ @Test
+ public void testNumber() {
+ int correct = 5;
+ int incorrect = -5;
+ Assert.assertTrue(validateTest(correct, new NumberPositiveValidator()));
+ Assert.assertFalse(validateTest(incorrect, new NumberPositiveValidator()));
+ }
+
+ private boolean validateTest(Object value, PropertyValidator validator) {
+ try {
+ validator.validate(null, value);
+ return true;
+ } catch (ValidationException ex) {
+ return false;
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/6d19e129/odf/odf-core/src/test/java/org/apache/atlas/odf/core/test/controlcenter/AnalysisProcessingTests.java
----------------------------------------------------------------------
diff --git a/odf/odf-core/src/test/java/org/apache/atlas/odf/core/test/controlcenter/AnalysisProcessingTests.java b/odf/odf-core/src/test/java/org/apache/atlas/odf/core/test/controlcenter/AnalysisProcessingTests.java
new file mode 100755
index 0000000..4fa2eda
--- /dev/null
+++ b/odf/odf-core/src/test/java/org/apache/atlas/odf/core/test/controlcenter/AnalysisProcessingTests.java
@@ -0,0 +1,139 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.atlas.odf.core.test.controlcenter;
+
+import java.text.MessageFormat;
+import java.util.Arrays;
+import java.util.List;
+import java.util.logging.Logger;
+
+import org.apache.atlas.odf.api.analysis.AnalysisCancelResult;
+import org.apache.atlas.odf.api.analysis.AnalysisRequest;
+import org.apache.atlas.odf.api.analysis.AnalysisRequestStatus;
+import org.apache.atlas.odf.api.analysis.AnalysisResponse;
+import org.apache.atlas.odf.api.settings.MessagingConfiguration;
+import org.apache.atlas.odf.api.settings.ODFSettings;
+import org.apache.atlas.odf.api.settings.SettingsManager;
+import org.apache.atlas.odf.core.ODFInternalFactory;
+import org.junit.Assert;
+import org.junit.Test;
+
+import org.apache.atlas.odf.api.ODFFactory;
+import org.apache.atlas.odf.api.discoveryservice.AnalysisRequestTracker;
+import org.apache.atlas.odf.core.controlcenter.ControlCenter;
+import org.apache.atlas.odf.core.test.ODFTestLogger;
+import org.apache.atlas.odf.core.test.ODFTestcase;
+import org.apache.atlas.odf.json.JSONUtils;
+
+public class AnalysisProcessingTests extends ODFTestcase {
+ Logger logger = ODFTestLogger.get();
+
+ @Test
+ public void testAnalysisProcessingAfterShutdown() throws Exception {
+ final SettingsManager config = new ODFFactory().create().getSettingsManager();
+ final ODFSettings odfSettings = config.getODFSettings();
+ final MessagingConfiguration messagingConfiguration = odfSettings.getMessagingConfiguration();
+ final Long origRequestRetentionMs = messagingConfiguration.getAnalysisRequestRetentionMs();
+ messagingConfiguration.setAnalysisRequestRetentionMs(300000l);
+ config.updateODFSettings(odfSettings);
+
+ ControlCenter cc = new ODFInternalFactory().create(ControlCenter.class);
+ AnalysisRequestTracker tracker = JSONUtils.readJSONObjectFromFileInClasspath(AnalysisRequestTracker.class, "org/apache/atlas/odf/core/test/messaging/kafka/tracker1.json", null);
+ AnalysisRequest req = tracker.getRequest();
+ req.setDiscoveryServiceSequence(Arrays.asList("asynctestservice"));
+ req.getDataSets().get(0).setId(ODFAPITest.DUMMY_SUCCESS_ID + "_dataset");
+ final AnalysisResponse startRequest = cc.startRequest(req);
+ logger.info("Analysis :" + startRequest.getId());
+
+ Assert.assertNull(startRequest.getOriginalRequest());
+ Assert.assertFalse(startRequest.isInvalidRequest());
+ final AnalysisResponse duplicate = cc.startRequest(req);
+ Assert.assertNotNull(duplicate.getOriginalRequest());
+ Assert.assertEquals(startRequest.getId(), duplicate.getId());
+ logger.info("Analysis1 duplciate :" + duplicate.getId());
+
+ final AnalysisCancelResult cancelRequest = cc.cancelRequest(startRequest.getId());
+ Assert.assertEquals(AnalysisCancelResult.State.SUCCESS, cancelRequest.getState());
+
+ cc.getQueueManager().stop();
+
+ AnalysisResponse response2 = cc.startRequest(req);
+ logger.info("Analysis2:" + response2.getId());
+ AnalysisRequestStatus requestStatus = cc.getRequestStatus(response2.getId());
+ int maxWait = 20;
+
+ int currentWait = 0;
+ while (currentWait < maxWait && requestStatus.getState() != AnalysisRequestStatus.State.ACTIVE) {
+ Thread.sleep(100);
+ currentWait++;
+ requestStatus = cc.getRequestStatus(response2.getId());
+ }
+ logger.info("THREAD ACTIVE, KILL IT!");
+
+ cc.getQueueManager().start();
+ logger.info("restarted");
+ Assert.assertNull(response2.getOriginalRequest());
+ Assert.assertFalse(response2.isInvalidRequest());
+
+ messagingConfiguration.setAnalysisRequestRetentionMs(origRequestRetentionMs);
+ config.updateODFSettings(odfSettings);
+
+ currentWait = 0;
+ while (currentWait < maxWait && requestStatus.getState() != AnalysisRequestStatus.State.FINISHED) {
+ Thread.sleep(100);
+ requestStatus = cc.getRequestStatus(response2.getId());
+ }
+ Assert.assertEquals(AnalysisRequestStatus.State.FINISHED, requestStatus.getState());
+ }
+
+ @Test
+ public void testRequestWithAnnotationTypes() throws Exception {
+ ControlCenter cc = new ODFInternalFactory().create(ControlCenter.class);
+ AnalysisRequestTracker tracker = JSONUtils.readJSONObjectFromFileInClasspath(AnalysisRequestTracker.class, "org/apache/atlas/odf/core/test/messaging/kafka/tracker1.json", null);
+ AnalysisRequest req = tracker.getRequest();
+ req.getDataSets().get(0).setId(ODFAPITest.DUMMY_SUCCESS_ID + "_dataset");
+ List<String> annotationTypes = Arrays.asList(new String[] { "AsyncTestDummyAnnotation" });
+ req.setAnnotationTypes(annotationTypes);
+ logger.info(MessageFormat.format("Running discovery request for annotation type {0}.", annotationTypes));
+ AnalysisResponse resp = cc.startRequest(req);
+ logger.info(MessageFormat.format("Started request id {0}.", resp.getId()));
+ Assert.assertNotNull(resp.getId());
+ Assert.assertFalse(resp.isInvalidRequest());
+
+ int currentWait = 0;
+ int maxWait = 20;
+ AnalysisRequestStatus requestStatus = cc.getRequestStatus(resp.getId());
+ while (currentWait < maxWait && requestStatus.getState() != AnalysisRequestStatus.State.FINISHED) {
+ Thread.sleep(100);
+ requestStatus = cc.getRequestStatus(resp.getId());
+ }
+ Assert.assertEquals(AnalysisRequestStatus.State.FINISHED, requestStatus.getState());
+ Assert.assertEquals("Generated service has incorrect number of elements.", 1, requestStatus.getRequest().getDiscoveryServiceSequence().size());
+ Assert.assertEquals("Generated service sequence differs from expected value.", "asynctestservice", requestStatus.getRequest().getDiscoveryServiceSequence().get(0));
+ }
+
+ @Test
+ public void testRequestWithMissingAnnotationTypes() throws Exception {
+ ControlCenter cc = new ODFInternalFactory().create(ControlCenter.class);
+ AnalysisRequestTracker tracker = JSONUtils.readJSONObjectFromFileInClasspath(AnalysisRequestTracker.class, "org/apache/atlas/odf/core/test/messaging/kafka/tracker1.json", null);
+ AnalysisRequest req = tracker.getRequest();
+ req.getDataSets().get(0).setId(ODFAPITest.DUMMY_SUCCESS_ID + "_dataset");
+ List<String> annotationTypes = Arrays.asList(new String[] { "noServiceExistsForThisAnnotationType" });
+ req.setAnnotationTypes(annotationTypes);
+ logger.info(MessageFormat.format("Running discovery request for non-existing annotation type {0}.", annotationTypes));
+ AnalysisResponse resp = cc.startRequest(req);
+ Assert.assertTrue(resp.isInvalidRequest());
+ Assert.assertEquals("Unexpected error message.", "No suitable discovery services found to create the requested annotation types.", resp.getDetails());
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/6d19e129/odf/odf-core/src/test/java/org/apache/atlas/odf/core/test/controlcenter/AnalysisRequestCancellationTest.java
----------------------------------------------------------------------
diff --git a/odf/odf-core/src/test/java/org/apache/atlas/odf/core/test/controlcenter/AnalysisRequestCancellationTest.java b/odf/odf-core/src/test/java/org/apache/atlas/odf/core/test/controlcenter/AnalysisRequestCancellationTest.java
new file mode 100755
index 0000000..fd39e15
--- /dev/null
+++ b/odf/odf-core/src/test/java/org/apache/atlas/odf/core/test/controlcenter/AnalysisRequestCancellationTest.java
@@ -0,0 +1,104 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.atlas.odf.core.test.controlcenter;
+
+import java.util.Collections;
+import java.util.logging.Logger;
+
+import org.apache.atlas.odf.api.analysis.AnalysisCancelResult;
+import org.apache.atlas.odf.api.analysis.AnalysisRequest;
+import org.apache.atlas.odf.api.metadata.MetaDataObjectReference;
+import org.apache.atlas.odf.core.ODFInternalFactory;
+import org.apache.atlas.odf.core.controlcenter.AnalysisRequestTrackerStore;
+import org.apache.atlas.odf.core.test.ODFTestcase;
+import org.junit.Assert;
+import org.junit.Test;
+
+import org.apache.atlas.odf.core.Utils;
+import org.apache.atlas.odf.api.analysis.AnalysisRequestTrackerStatus.STATUS;
+import org.apache.atlas.odf.api.discoveryservice.AnalysisRequestTracker;
+import org.apache.atlas.odf.core.controlcenter.ControlCenter;
+import org.apache.atlas.odf.core.test.ODFTestLogger;
+
+public class AnalysisRequestCancellationTest extends ODFTestcase {
+
+ Logger logger = ODFTestLogger.get();
+
+ AnalysisRequestTracker generateTracker(String id, STATUS status) {
+ AnalysisRequestTracker tracker = new AnalysisRequestTracker();
+ Utils.setCurrentTimeAsLastModified(tracker);
+ tracker.setNextDiscoveryServiceRequest(0);
+ AnalysisRequest req = new AnalysisRequest();
+ req.setId(id);
+ MetaDataObjectReference ref = new MetaDataObjectReference();
+ ref.setId("DataSet" + id);
+ req.setDataSets(Collections.singletonList(ref));
+ tracker.setRequest(req);
+ tracker.setStatus(status);
+ return tracker;
+ }
+
+ @Test
+ public void testRequestCancellationNotFoundFailure() {
+ ControlCenter cc = new ODFInternalFactory().create(ControlCenter.class);
+ AnalysisCancelResult cancelRequest = cc.cancelRequest("dummy_id");
+ Assert.assertEquals(cancelRequest.getState(), AnalysisCancelResult.State.NOT_FOUND);
+ }
+
+ @Test
+ public void testRequestCancellationWrongStateFailure() {
+ ControlCenter cc = new ODFInternalFactory().create(ControlCenter.class);
+ AnalysisRequestTrackerStore store = (new ODFInternalFactory()).create(AnalysisRequestTrackerStore.class);
+ String testId = "test_id1";
+ AnalysisRequestTracker tracker = null;
+ AnalysisCancelResult cancelRequest = null;
+
+ tracker = generateTracker(testId, STATUS.FINISHED);
+ store.store(tracker);
+ cancelRequest = cc.cancelRequest(testId);
+ Assert.assertEquals(cancelRequest.getState(), AnalysisCancelResult.State.INVALID_STATE);
+
+ tracker = generateTracker(testId, STATUS.ERROR);
+ store.store(tracker);
+ cancelRequest = cc.cancelRequest(testId);
+ Assert.assertEquals(cancelRequest.getState(), AnalysisCancelResult.State.INVALID_STATE);
+
+ tracker = generateTracker(testId, STATUS.CANCELLED);
+ store.store(tracker);
+ cancelRequest = cc.cancelRequest(testId);
+ Assert.assertEquals(cancelRequest.getState(), AnalysisCancelResult.State.INVALID_STATE);
+ }
+
+ @Test
+ public void testRequestCancellationSuccess() {
+ ControlCenter cc = new ODFInternalFactory().create(ControlCenter.class);
+ AnalysisRequestTrackerStore store = (new ODFInternalFactory()).create(AnalysisRequestTrackerStore.class);
+ String testId = "test_id2";
+
+ AnalysisRequestTracker tracker = generateTracker(testId, STATUS.INITIALIZED);
+ store.store(tracker);
+ AnalysisCancelResult cancelRequest = cc.cancelRequest(testId);
+ Assert.assertEquals(AnalysisCancelResult.State.SUCCESS, cancelRequest.getState());
+
+ tracker = generateTracker(testId, STATUS.IN_DISCOVERY_SERVICE_QUEUE);
+ store.store(tracker);
+ cancelRequest = cc.cancelRequest(testId);
+ Assert.assertEquals(AnalysisCancelResult.State.SUCCESS, cancelRequest.getState());
+
+ tracker = generateTracker(testId, STATUS.DISCOVERY_SERVICE_RUNNING);
+ store.store(tracker);
+ cancelRequest = cc.cancelRequest(testId);
+ Assert.assertEquals(AnalysisCancelResult.State.SUCCESS, cancelRequest.getState());
+}
+}
http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/6d19e129/odf/odf-core/src/test/java/org/apache/atlas/odf/core/test/controlcenter/AnalysisRequestTrackerStoreTest.java
----------------------------------------------------------------------
diff --git a/odf/odf-core/src/test/java/org/apache/atlas/odf/core/test/controlcenter/AnalysisRequestTrackerStoreTest.java b/odf/odf-core/src/test/java/org/apache/atlas/odf/core/test/controlcenter/AnalysisRequestTrackerStoreTest.java
new file mode 100755
index 0000000..7eb46d8
--- /dev/null
+++ b/odf/odf-core/src/test/java/org/apache/atlas/odf/core/test/controlcenter/AnalysisRequestTrackerStoreTest.java
@@ -0,0 +1,105 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.atlas.odf.core.test.controlcenter;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.logging.Logger;
+
+import org.apache.atlas.odf.api.analysis.AnalysisRequest;
+import org.apache.atlas.odf.core.ODFInternalFactory;
+import org.junit.Test;
+
+import org.apache.atlas.odf.api.metadata.MetaDataObjectReference;
+import org.apache.atlas.odf.core.Utils;
+import org.apache.atlas.odf.api.analysis.AnalysisRequestTrackerStatus.STATUS;
+import org.apache.atlas.odf.api.discoveryservice.AnalysisRequestTracker;
+import org.apache.atlas.odf.core.controlcenter.AnalysisRequestTrackerStore;
+import org.apache.atlas.odf.core.test.ODFTestLogger;
+import org.apache.atlas.odf.core.test.ODFTestcase;
+
+public class AnalysisRequestTrackerStoreTest extends ODFTestcase {
+
+ Logger logger = ODFTestLogger.get();
+
+ AnalysisRequestTracker generateTracker(String id, STATUS status) {
+ AnalysisRequestTracker tracker = new AnalysisRequestTracker();
+ Utils.setCurrentTimeAsLastModified(tracker);
+ tracker.setNextDiscoveryServiceRequest(0);
+ AnalysisRequest req = new AnalysisRequest();
+ req.setId(id);
+ MetaDataObjectReference ref = new MetaDataObjectReference();
+ ref.setId("DataSet" + id);
+ req.setDataSets(Collections.singletonList(ref));
+ tracker.setRequest(req);
+ tracker.setStatus(status);
+ return tracker;
+ }
+
+ @Test
+ public void testStore() throws Exception {
+ AnalysisRequestTrackerStore store = (new ODFInternalFactory()).create(AnalysisRequestTrackerStore.class);
+ assertNotNull(store);
+ int MAX_TRACKERS = 50;
+ List<AnalysisRequestTracker> trackers1 = new ArrayList<AnalysisRequestTracker>();
+ STATUS lastStatus = STATUS.IN_DISCOVERY_SERVICE_QUEUE;
+ for (int i = 0; i < MAX_TRACKERS; i++) {
+ trackers1.add(generateTracker("STORETEST_ID" + i, lastStatus));
+ }
+
+ logger.info("Storing " + MAX_TRACKERS + " Trackers");
+ long pass1Start = System.currentTimeMillis();
+ for (AnalysisRequestTracker tracker : trackers1) {
+ store.store(tracker);
+ }
+ long pass1End = System.currentTimeMillis();
+
+ logger.info("Storing " + MAX_TRACKERS + " Trackers again with new status");
+
+ lastStatus = STATUS.FINISHED;
+ List<AnalysisRequestTracker> trackers2 = new ArrayList<AnalysisRequestTracker>();
+ for (int i = 0; i < MAX_TRACKERS; i++) {
+ trackers2.add(generateTracker("STORETEST_ID" + i, lastStatus));
+ }
+ long pass2Start = System.currentTimeMillis();
+ for (AnalysisRequestTracker tracker : trackers2) {
+ store.store(tracker);
+ }
+ long pass2End = System.currentTimeMillis();
+
+ Thread.sleep(2000);
+ logger.info("Querying and checking " + MAX_TRACKERS + " Trackers");
+
+ long queryStart = System.currentTimeMillis();
+
+ for (int i = 0; i < MAX_TRACKERS; i++) {
+ final String analysisRequestId = "STORETEST_ID" + i;
+ AnalysisRequestTracker tracker = store.query(analysisRequestId);
+ assertNotNull(tracker);
+ assertEquals(1, tracker.getRequest().getDataSets().size());
+ MetaDataObjectReference ref = new MetaDataObjectReference();
+ ref.setId("DataSet" + analysisRequestId);
+ assertEquals(tracker.getRequest().getDataSets().get(0), ref);
+ assertEquals(lastStatus, tracker.getStatus());
+ }
+ long queryEnd = System.currentTimeMillis();
+
+ System.out.println("First pass: " + (pass1End - pass1Start) + "ms, second pass: " + (pass2End - pass2Start) + "ms, query: " + (queryEnd - queryStart) + "ms");
+
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/6d19e129/odf/odf-core/src/test/java/org/apache/atlas/odf/core/test/controlcenter/DeclarativeRequestMapperTest.java
----------------------------------------------------------------------
diff --git a/odf/odf-core/src/test/java/org/apache/atlas/odf/core/test/controlcenter/DeclarativeRequestMapperTest.java b/odf/odf-core/src/test/java/org/apache/atlas/odf/core/test/controlcenter/DeclarativeRequestMapperTest.java
new file mode 100755
index 0000000..347fb84
--- /dev/null
+++ b/odf/odf-core/src/test/java/org/apache/atlas/odf/core/test/controlcenter/DeclarativeRequestMapperTest.java
@@ -0,0 +1,158 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.atlas.odf.core.test.controlcenter;
+
+import org.apache.atlas.odf.api.analysis.AnalysisRequest;
+import org.apache.atlas.odf.api.discoveryservice.DiscoveryServiceEndpoint;
+import org.apache.atlas.odf.api.discoveryservice.DiscoveryServiceJavaEndpoint;
+import org.apache.atlas.odf.api.discoveryservice.DiscoveryServiceManager;
+import org.apache.atlas.odf.api.settings.validation.ValidationException;
+import org.apache.atlas.odf.core.controlcenter.DeclarativeRequestMapper;
+import org.apache.atlas.odf.core.test.ODFTestBase;
+import org.apache.atlas.odf.json.JSONUtils;
+import org.apache.wink.json4j.JSONException;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.logging.Level;
+import java.util.logging.Logger;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+import org.apache.atlas.odf.api.ODFFactory;
+import org.apache.atlas.odf.core.Utils;
+import org.apache.atlas.odf.core.controlcenter.ControlCenter;
+import org.apache.atlas.odf.api.discoveryservice.DiscoveryServiceProperties;
+import org.apache.atlas.odf.api.discoveryservice.ServiceNotFoundException;
+
+public class DeclarativeRequestMapperTest extends ODFTestBase {
+ final private static String SERVICE_CLASSNAME = "TestAsyncDiscoveryService1";
+ final private static String[] EXPECTED_SERVICE_SEQUENCES = new String[] { "pre3,ser1", "alt1,ser1", "pre4,pre1,ser1",
+ "pre3,ser1,ser3", "pre3,ser1,ser5", "alt1,ser1,ser3", "alt1,ser1,ser5", "pre3,pre2,ser4", "alt1,pre2,ser4",
+ "pre4,pre1,ser1,ser3", "pre4,pre1,ser1,ser5", "pre3,ser1,alt1,ser3", "pre3,ser1,pre2,ser4", "pre3,ser1,alt1,ser5" };
+ private Logger logger = Logger.getLogger(ControlCenter.class.getName());
+
+ private static void createDiscoveryService(String serviceId, String[] resultingAnnotationTypes, String[] prerequisiteAnnotationTypes, String[] supportedObjectTypes) throws ValidationException, JSONException {
+ DiscoveryServiceManager discoveryServicesManager = new ODFFactory().create().getDiscoveryServiceManager();
+ DiscoveryServiceProperties dsProperties = new DiscoveryServiceProperties();
+ DiscoveryServiceJavaEndpoint dse = new DiscoveryServiceJavaEndpoint();
+ dse.setClassName(SERVICE_CLASSNAME);
+ dsProperties.setEndpoint(JSONUtils.convert(dse, DiscoveryServiceEndpoint.class));
+ dsProperties.setId(serviceId);
+ dsProperties.setName(serviceId + " Discovery Service");
+ dsProperties.setPrerequisiteAnnotationTypes(Arrays.asList(prerequisiteAnnotationTypes));
+ dsProperties.setResultingAnnotationTypes(Arrays.asList(resultingAnnotationTypes));
+ dsProperties.setSupportedObjectTypes(Arrays.asList(supportedObjectTypes));
+ discoveryServicesManager.createDiscoveryService(dsProperties);
+ }
+
+ private void deleteDiscoveryService(String serviceId, boolean failOnError) throws ValidationException {
+ DiscoveryServiceManager discoveryServicesManager = new ODFFactory().create().getDiscoveryServiceManager();
+ try {
+ discoveryServicesManager.deleteDiscoveryService(serviceId);
+ }
+ catch (ServiceNotFoundException e) {
+ if (failOnError) {
+ Assert.fail("Error deleting discovery services.");
+ }
+ }
+ }
+
+ private void deleteDiscoveryServices(boolean failOnError) throws ValidationException {
+ List<String> serviceIds = Arrays.asList(new String[] { "ser1", "ser2", "ser3", "ser4", "ser5", "pre1", "pre2", "pre3", "pre4", "alt1" });
+ for (String serviceId : serviceIds) {
+ deleteDiscoveryService(serviceId, failOnError);
+ }
+ }
+
+ private void createDiscoveryServices() throws ValidationException, JSONException {
+ createDiscoveryService("ser1", new String[] { "an1", "com1", "com2" }, new String[] { "pre1" }, new String[] { "Table", "DataFile" });
+ createDiscoveryService("ser2", new String[] { "an2", "com1" }, new String[] { "pre2" }, new String[] { "Table", "DataFile" });
+ createDiscoveryService("ser3", new String[] { "com2" }, new String[] { "pre1" }, new String[] { "Table", "DataFile" });
+ createDiscoveryService("ser4", new String[] { "an1", "com1", "com2" }, new String[] { "pre1", "pre2" }, new String[] { "Table", "DataFile" });
+ createDiscoveryService("ser5", new String[] { "com1", "com2" }, new String[] { "pre1" }, new String[] { "Table", "DataFile" });
+
+ createDiscoveryService("pre1", new String[] { "pre1" }, new String[] { "pre4" }, new String[] { "Table", "DataFile" });
+ createDiscoveryService("pre2", new String[] { "pre2" }, new String[] { }, new String[] { "Table", "DataFile" });
+ createDiscoveryService("pre3", new String[] { "pre1" }, new String[] { }, new String[] { "Table", "DataFile" });
+ createDiscoveryService("pre4", new String[] { "pre4" }, new String[] { }, new String[] { "Table", "DataFile" });
+
+ createDiscoveryService("alt1", new String[] { "pre1" }, new String[] { }, new String[] { "Table", "DataFile" });
+ }
+
+ @Test
+ public void testDiscoveryServiceSequences() throws Exception {
+ deleteDiscoveryServices(false);
+ createDiscoveryServices();
+
+ AnalysisRequest request = new AnalysisRequest();
+ request.setAnnotationTypes(Arrays.asList( new String[] { "an1", "com2" }));
+ DeclarativeRequestMapper mapper = new DeclarativeRequestMapper(request);
+ logger.log(Level.INFO, "Printing list of mapper result to stdout.");
+ int i = 0;
+ for (DeclarativeRequestMapper.DiscoveryServiceSequence discoveryApproach : mapper.getDiscoveryServiceSequences()) {
+ String sequence = Utils.joinStrings(new ArrayList<String>(discoveryApproach.getServiceSequence()), ',');
+ System.out.println(sequence);
+ if (i < EXPECTED_SERVICE_SEQUENCES.length) {
+ Assert.assertTrue(sequence.equals(EXPECTED_SERVICE_SEQUENCES[i++]));
+ }
+ }
+ Assert.assertEquals("Number of calculated discovery service sequences does not match expected value.", 36, mapper.getDiscoveryServiceSequences().size());
+
+ deleteDiscoveryServices(true);
+ }
+
+ @Test
+ public void testRecommendedDiscoveryServiceSequence() throws Exception {
+ deleteDiscoveryServices(false);
+ createDiscoveryServices();
+
+ AnalysisRequest request = new AnalysisRequest();
+ request.setAnnotationTypes(Arrays.asList( new String[] { "com2", "pre4" }));
+ DeclarativeRequestMapper mapper = new DeclarativeRequestMapper(request);
+ Assert.assertEquals("Recommended sequence does not match expected string.", "pre4,pre1,ser1", Utils.joinStrings(mapper.getRecommendedDiscoveryServiceSequence(), ','));
+
+ deleteDiscoveryServices(true);
+ }
+
+ @Test
+ public void testRemoveFailingService() throws Exception {
+ deleteDiscoveryServices(false);
+ createDiscoveryServices();
+
+ AnalysisRequest request = new AnalysisRequest();
+ request.setAnnotationTypes(Arrays.asList(new String[] { "an1", "com2" }));
+ DeclarativeRequestMapper mapper = new DeclarativeRequestMapper(request);
+ Assert.assertEquals("Original sequence does not match expected string.", EXPECTED_SERVICE_SEQUENCES[0], Utils.joinStrings(mapper.getRecommendedDiscoveryServiceSequence(), ','));
+
+ mapper.removeDiscoveryServiceSequences("ser1");
+ Assert.assertEquals("Updated sequence does not match expected string.", "pre3,pre2,ser4", Utils.joinStrings(mapper.getRecommendedDiscoveryServiceSequence(), ','));
+
+ deleteDiscoveryServices(true);
+ }
+
+ @Test
+ public void testRequestWithManyAnnotationTypes() throws Exception {
+ deleteDiscoveryServices(false);
+ createDiscoveryServices();
+
+ AnalysisRequest request = new AnalysisRequest();
+ request.setAnnotationTypes(Arrays.asList(new String[] { "an1", "an2", "com1", "com2", "pre1", "pre2", "pre4" }));
+ DeclarativeRequestMapper mapper = new DeclarativeRequestMapper(request);
+ Assert.assertEquals("Number of calculated discovery service sequences does not match expected value.", 75, mapper.getDiscoveryServiceSequences().size());
+
+ deleteDiscoveryServices(true);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/6d19e129/odf/odf-core/src/test/java/org/apache/atlas/odf/core/test/controlcenter/DefaultThreadManagerTest.java
----------------------------------------------------------------------
diff --git a/odf/odf-core/src/test/java/org/apache/atlas/odf/core/test/controlcenter/DefaultThreadManagerTest.java b/odf/odf-core/src/test/java/org/apache/atlas/odf/core/test/controlcenter/DefaultThreadManagerTest.java
new file mode 100755
index 0000000..96a4fee
--- /dev/null
+++ b/odf/odf-core/src/test/java/org/apache/atlas/odf/core/test/controlcenter/DefaultThreadManagerTest.java
@@ -0,0 +1,172 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.atlas.odf.core.test.controlcenter;
+
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.concurrent.ExecutorService;
+import java.util.logging.Logger;
+
+import org.apache.atlas.odf.api.engine.ThreadStatus;
+import org.apache.atlas.odf.core.ODFInternalFactory;
+import org.apache.atlas.odf.core.controlcenter.ExecutorServiceFactory;
+import org.apache.atlas.odf.core.controlcenter.ODFRunnable;
+import org.junit.Assert;
+import org.junit.Test;
+
+import org.apache.atlas.odf.core.controlcenter.ThreadManager;
+import org.apache.atlas.odf.core.test.ODFTestLogger;
+import org.apache.atlas.odf.core.test.TimerTestBase;
+
+public class DefaultThreadManagerTest extends TimerTestBase {
+
+ int threadMS = 100;
+ int waitMS = 5000;
+
+ Logger logger = ODFTestLogger.get();
+
+ class TestRunnable implements ODFRunnable {
+
+ String id;
+ boolean cancelled = false;
+ long msToWaitBeforeFinish;
+
+ public TestRunnable(String id, long msToWaitBeforeFinish) {
+ this.id = id;
+ this.msToWaitBeforeFinish = msToWaitBeforeFinish;
+ }
+
+ public TestRunnable(String id) {
+ this(id, threadMS);
+ }
+
+ @Override
+ public void run() {
+ logger.info("Starting thread with ID: " + id);
+ try {
+ Thread.sleep(msToWaitBeforeFinish);
+ } catch (InterruptedException e) {
+ // TODO Auto-generated catch block
+ e.printStackTrace();
+ }
+ logger.info("Thread finished with ID: " + id);
+
+ }
+
+ @Override
+ public void setExecutorService(ExecutorService service) {
+ // TODO Auto-generated method stub
+
+ }
+
+ @Override
+ public void cancel() {
+ cancelled = true;
+ }
+
+ @Override
+ public boolean isReady() {
+ return true;
+ }
+
+ }
+
+ @Test
+ public void testSimple() throws Exception {
+ ODFInternalFactory f = new ODFInternalFactory();
+ ThreadManager tm = f.create(ThreadManager.class);
+ tm.setExecutorService(f.create(ExecutorServiceFactory.class).createExecutorService());
+ assertNotNull(tm);
+
+ String id1 = "id1";
+ String id2 = "id2";
+
+ // start id1
+ ThreadStatus.ThreadState st = tm.getStateOfUnmanagedThread(id1);
+ Assert.assertEquals(ThreadStatus.ThreadState.NON_EXISTENT, st);
+
+ boolean b = tm.startUnmanagedThread(id1, new TestRunnable(id1)).isNewThreadCreated();
+ assertTrue(b);
+ b = tm.startUnmanagedThread(id1, new TestRunnable(id1)).isNewThreadCreated();
+ assertFalse(b);
+
+ st = tm.getStateOfUnmanagedThread(id1);
+ Assert.assertEquals(ThreadStatus.ThreadState.RUNNING, st);
+
+ // start id2
+ st = tm.getStateOfUnmanagedThread(id2);
+ Assert.assertEquals(ThreadStatus.ThreadState.NON_EXISTENT, st);
+
+ b = tm.startUnmanagedThread(id2, new TestRunnable(id2)).isNewThreadCreated();
+ assertTrue(b);
+ b = tm.startUnmanagedThread(id2, new TestRunnable(id2)).isNewThreadCreated();
+ assertFalse(b);
+
+ Thread.sleep(waitMS);
+ st = tm.getStateOfUnmanagedThread(id1);
+ Assert.assertEquals(ThreadStatus.ThreadState.FINISHED, st);
+ b = tm.startUnmanagedThread(id1, new TestRunnable(id1)).isNewThreadCreated();
+ assertTrue(b);
+
+ st = tm.getStateOfUnmanagedThread(id2);
+ // id2 should be removed from thread list
+ Assert.assertTrue(ThreadStatus.ThreadState.FINISHED.equals(st) || ThreadStatus.ThreadState.NON_EXISTENT.equals(st));
+
+ tm.shutdownThreads(Arrays.asList("id1", "id2"));
+ }
+
+ @Test
+ public void testManyThreads() throws Exception {
+ ODFInternalFactory f = new ODFInternalFactory();
+ ThreadManager tm = f.create(ThreadManager.class);
+ tm.setExecutorService(f.create(ExecutorServiceFactory.class).createExecutorService());
+
+ assertNotNull(tm);
+
+ List<String> threadIds = new ArrayList<>();
+ int THREAD_NUM = 20;
+ for (int i = 0; i < THREAD_NUM; i++) {
+ String id = "ThreadID" + i;
+ threadIds.add(id);
+ ThreadStatus.ThreadState st = tm.getStateOfUnmanagedThread(id);
+ Assert.assertEquals(ThreadStatus.ThreadState.NON_EXISTENT, st);
+
+ boolean b = tm.startUnmanagedThread(id, new TestRunnable(id)).isNewThreadCreated();
+ assertTrue(b);
+ b = tm.startUnmanagedThread(id, new TestRunnable(id)).isNewThreadCreated();
+ assertFalse(b);
+
+ st = tm.getStateOfUnmanagedThread(id);
+ Assert.assertEquals(ThreadStatus.ThreadState.RUNNING, st);
+
+ }
+ logger.info("All threads scheduled");
+
+ Thread.sleep(waitMS);
+
+ for (int i = 0; i < THREAD_NUM; i++) {
+ String id = "ThreadID" + i;
+ ThreadStatus.ThreadState st = tm.getStateOfUnmanagedThread(id);
+ Assert.assertEquals(ThreadStatus.ThreadState.FINISHED, st);
+ }
+ tm.shutdownThreads(threadIds);
+
+ }
+
+}