You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@atlas.apache.org by ma...@apache.org on 2017/06/28 05:57:23 UTC
[10/25] incubator-atlas git commit: ATLAS-1898: initial commit of ODF
http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/6d19e129/odf/odf-core/src/test/java/org/apache/atlas/odf/core/test/controlcenter/ODFAPITest.java
----------------------------------------------------------------------
diff --git a/odf/odf-core/src/test/java/org/apache/atlas/odf/core/test/controlcenter/ODFAPITest.java b/odf/odf-core/src/test/java/org/apache/atlas/odf/core/test/controlcenter/ODFAPITest.java
new file mode 100755
index 0000000..900c214
--- /dev/null
+++ b/odf/odf-core/src/test/java/org/apache/atlas/odf/core/test/controlcenter/ODFAPITest.java
@@ -0,0 +1,373 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.atlas.odf.core.test.controlcenter;
+
+import java.text.MessageFormat;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.logging.Level;
+
+import org.apache.atlas.odf.api.metadata.MetaDataObjectReference;
+import org.apache.atlas.odf.api.metadata.MetadataStore;
+import org.apache.atlas.odf.api.metadata.models.DataSet;
+import org.apache.atlas.odf.api.metadata.models.UnknownDataSet;
+import org.apache.atlas.odf.core.ODFInternalFactory;
+import org.apache.atlas.odf.core.controlcenter.AnalysisRequestTrackerStore;
+import org.apache.atlas.odf.core.controlcenter.DefaultStatusQueueStore;
+import org.apache.atlas.odf.core.metadata.DefaultMetadataStore;
+import org.apache.atlas.odf.core.test.ODFTestBase;
+import org.apache.wink.json4j.JSONException;
+import org.apache.wink.json4j.JSONObject;
+import org.junit.Assert;
+import org.junit.Ignore;
+import org.junit.Test;
+
+import org.apache.atlas.odf.api.ODFFactory;
+import org.apache.atlas.odf.api.analysis.AnalysisCancelResult;
+import org.apache.atlas.odf.api.analysis.AnalysisManager;
+import org.apache.atlas.odf.api.analysis.AnalysisRequest;
+import org.apache.atlas.odf.api.analysis.AnalysisRequestStatus;
+import org.apache.atlas.odf.api.analysis.AnalysisResponse;
+import org.apache.atlas.odf.api.discoveryservice.AnalysisRequestTracker;
+
+public class ODFAPITest extends ODFTestBase {
+
+ public static int WAIT_MS_BETWEEN_POLLING = 500;
+ public static int MAX_NUMBER_OF_POLLS = 500;
+ public static String DUMMY_SUCCESS_ID = "success";
+ public static String DUMMY_ERROR_ID = "error";
+
+ public static void runRequestAndCheckResult(String dataSetID, AnalysisRequestStatus.State expectedState, int expectedProcessedDiscoveryRequests) throws Exception{
+ runRequestAndCheckResult(Collections.singletonList(dataSetID), expectedState, expectedProcessedDiscoveryRequests);
+ }
+
+ public static void runRequestAndCheckResult(List<String> dataSetIDs, AnalysisRequestStatus.State expectedState, int expectedProcessedDiscoveryRequests) throws Exception{
+ AnalysisManager analysisManager = new ODFFactory().create().getAnalysisManager();
+ String id = runRequest(dataSetIDs, analysisManager);
+ log.info("Running request "+id+" on data sets: " + dataSetIDs);
+ AnalysisRequestStatus status = null;
+
+ int maxPolls = MAX_NUMBER_OF_POLLS;
+ do {
+ status = analysisManager.getAnalysisRequestStatus(id);
+ log.log(Level.INFO, "{4}th poll request for request ID ''{0}'' (expected state: ''{3}''): state: ''{1}'', details: ''{2}''", new Object[] { id, status.getState(), status.getDetails(),
+ expectedState, (MAX_NUMBER_OF_POLLS-maxPolls) });
+ maxPolls--;
+ Thread.sleep(WAIT_MS_BETWEEN_POLLING);
+ } while (maxPolls > 0 && (status.getState() == AnalysisRequestStatus.State.ACTIVE || status.getState() == AnalysisRequestStatus.State.QUEUED || status.getState() == AnalysisRequestStatus.State.NOT_FOUND));
+
+ log.log(Level.INFO, "Polling result after {0} polls for request id {1}: status: {2}", new Object[] {(MAX_NUMBER_OF_POLLS-maxPolls), id, status.getState()});
+
+ Assert.assertTrue(maxPolls > 0);
+ Assert.assertEquals(expectedState, status.getState());
+ AnalysisRequestTrackerStore store = new ODFInternalFactory().create(AnalysisRequestTrackerStore.class);
+ AnalysisRequestTracker tracker = store.query(id);
+ Assert.assertNotNull(tracker);
+ checkTracker(tracker, expectedProcessedDiscoveryRequests);
+ log.info("Status details: " + status.getDetails());
+ }
+
+ static void checkTracker(AnalysisRequestTracker tracker, int expectedProcessedDiscoveryRequests) {
+ if (expectedProcessedDiscoveryRequests == -1) {
+ expectedProcessedDiscoveryRequests = tracker.getDiscoveryServiceRequests().size();
+ }
+ Assert.assertEquals(expectedProcessedDiscoveryRequests, tracker.getDiscoveryServiceResponses().size());
+
+ }
+
+ static String runRequest(String dataSetID, AnalysisManager analysisManager) throws Exception {
+ return runRequest(Collections.singletonList(dataSetID), analysisManager);
+ }
+
+ public static String runRequest(List<String> dataSetIDs, AnalysisManager analysisManager) throws Exception {
+ AnalysisRequest request = createAnalysisRequest(dataSetIDs);
+ log.info("Starting analyis");
+ AnalysisResponse response = analysisManager.runAnalysis(request);
+ Assert.assertNotNull(response);
+ Assert.assertFalse(response.isInvalidRequest());
+ String id = response.getId();
+ Assert.assertNotNull(id);
+ return id;
+ }
+
+
+ @Test
+ public void testSimpleSuccess() throws Exception {
+ runRequestAndCheckResult("successID", AnalysisRequestStatus.State.FINISHED, -1);
+ }
+
+ public static void waitForRequest(String requestId, AnalysisManager analysisManager) {
+ waitForRequest(requestId, analysisManager, MAX_NUMBER_OF_POLLS);
+ }
+
+ public static void waitForRequest(String requestId, AnalysisManager analysisManager, int maxPolls) {
+ AnalysisRequestStatus status = null;
+
+ log.log(Level.INFO, "Waiting for request ''{0}'' to finish", requestId);
+ do {
+ status = analysisManager.getAnalysisRequestStatus(requestId);
+
+ log.log(Level.INFO, "Poll request for request ID ''{0}'', state: ''{1}'', details: ''{2}''", new Object[] { requestId, status.getState(), status.getDetails() });
+ maxPolls--;
+ try {
+ Thread.sleep(WAIT_MS_BETWEEN_POLLING);
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ throw new RuntimeException(e);
+ }
+ } while (maxPolls > 0 && (status.getState() == AnalysisRequestStatus.State.ACTIVE || status.getState() == AnalysisRequestStatus.State.QUEUED || status.getState() == AnalysisRequestStatus.State.NOT_FOUND));
+ if (maxPolls == 0) {
+ log.log(Level.INFO, "Request ''{0}'' is not finished yet, don't wait for it", requestId);
+ }
+ log.log(Level.INFO, "Request ''{0}'' is finished with state: ''{1}''", new Object[] { requestId, status.getState() });
+ }
+
+ public static boolean waitForRequest(String requestId, AnalysisManager analysisManager, int maxPolls, AnalysisRequestStatus.State expectedState) {
+ AnalysisRequestStatus status = null;
+
+ log.log(Level.INFO, "Waiting for request ''{0}'' to finish", requestId);
+ do {
+ status = analysisManager.getAnalysisRequestStatus(requestId);
+ log.log(Level.INFO, "Poll request for request ID ''{0}'', state: ''{1}'', details: ''{2}''", new Object[] { requestId, status.getState(), status.getDetails() });
+ maxPolls--;
+ try {
+ Thread.sleep(WAIT_MS_BETWEEN_POLLING);
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ throw new RuntimeException(e);
+ }
+ } while (maxPolls > 0 && (status.getState() == AnalysisRequestStatus.State.ACTIVE || status.getState() == AnalysisRequestStatus.State.QUEUED || status.getState() == AnalysisRequestStatus.State.NOT_FOUND));
+ if (maxPolls == 0) {
+ log.log(Level.INFO, "Request ''{0}'' is not finished yet, don't wait for it", requestId);
+ }
+ return expectedState.equals(status.getState());
+ }
+
+
+ @Test
+ public void testSimpleSuccessDuplicate() throws Exception {
+ AnalysisManager analysisManager = new ODFFactory().create().getAnalysisManager();
+ String id = runRequest("successID", analysisManager);
+ String secondId = runRequest("successID", analysisManager);
+ Assert.assertNotEquals(id, secondId);
+ //Wait limit and try if new analysis is started
+ Thread.sleep(DefaultStatusQueueStore.IGNORE_SIMILAR_REQUESTS_TIMESPAN_MS*2 + 5000);
+ String thirdId = runRequest("successID", analysisManager);
+ Assert.assertNotEquals(secondId, thirdId);
+ waitForRequest(id, analysisManager);
+ waitForRequest(thirdId, analysisManager);
+ }
+
+ @Test
+ public void testSimpleSuccessNoDuplicate() throws Exception {
+ AnalysisManager analysisManager = new ODFFactory().create().getAnalysisManager();
+ String id = runRequest("successID", analysisManager);
+ String secondId = runRequest("successID2", analysisManager);
+ Assert.assertNotEquals(id, secondId);
+ waitForRequest(id, analysisManager);
+ waitForRequest(secondId, analysisManager);
+ }
+
+ @Test
+ public void testSimpleSuccessDuplicateSubset() throws Exception {
+ AnalysisManager analysisManager = new ODFFactory().create().getAnalysisManager();
+ String id = runRequest(Arrays.asList("successID", "successID2", "successID3"), analysisManager);
+ String secondId = runRequest("successID2", analysisManager);
+ Assert.assertNotEquals(id, secondId);
+ Thread.sleep(DefaultStatusQueueStore.IGNORE_SIMILAR_REQUESTS_TIMESPAN_MS + 5000);
+ String thirdId = runRequest("successID", analysisManager);
+ Assert.assertNotEquals(secondId, thirdId);
+ waitForRequest(id, analysisManager);
+ waitForRequest(thirdId, analysisManager);
+ }
+
+ /**
+ * This test depends on the speed of execution.
+ * An analysis that is not in state INITIALIZED or IN_SERVICE_QUEUE cannot be cancelled.
+ * Therefore if the analysis is started too quickly this test will fail!
+ *
+ * Ignore for now as this can go wrong in the build.
+ */
+ @Test
+ @Ignore
+ public void testCancelRequest() throws Exception {
+ AnalysisManager analysisManager = new ODFFactory().create().getAnalysisManager();
+ String id = runRequest(Arrays.asList("successID", "successID2", "successID3"), analysisManager);
+ AnalysisCancelResult cancelAnalysisRequest = analysisManager.cancelAnalysisRequest(id);
+ Assert.assertEquals(cancelAnalysisRequest.getState(), AnalysisCancelResult.State.SUCCESS);
+ String secondId = runRequest("successID2", analysisManager);
+ Assert.assertNotEquals(id, secondId);
+ }
+
+
+ @Test
+ public void testRequestsWithDataSetListSuccess() throws Exception {
+ runRequestAndCheckResult(Arrays.asList("success1", "success2", "success3"), AnalysisRequestStatus.State.FINISHED, 6);
+ }
+
+ @Test
+ public void testRequestsWithDataSetListError() throws Exception {
+ runRequestAndCheckResult(Arrays.asList("success1", "error2", "success3"), AnalysisRequestStatus.State.ERROR, 3);
+ }
+
+
+
+ @Test
+ public void testSimpleFailure() throws Exception {
+ runRequestAndCheckResult("errorID", AnalysisRequestStatus.State.ERROR, 1);
+ }
+
+ @Test
+ public void testManyRequests() throws Exception {
+ List<String> dataSets = new ArrayList<String>();
+ List<AnalysisRequestStatus.State> expectedStates = new ArrayList<AnalysisRequestStatus.State>();
+ int dataSetNum = 5;
+ for (int i=0; i<dataSetNum; i++) {
+ AnalysisRequestStatus.State expectedState = AnalysisRequestStatus.State.FINISHED;
+ String dataSet = "successdataSet" + i;
+ if (i % 3 == 0) {
+ // every third data set should fail
+ dataSet = "errorDataSet" + i;
+ expectedState = AnalysisRequestStatus.State.ERROR;
+ }
+ dataSets.add(dataSet);
+ expectedStates.add(expectedState);
+ }
+
+ runRequests(dataSets, expectedStates);
+ }
+
+ public void runRequests(List<String> dataSetIDs, List<AnalysisRequestStatus.State> expectedStates) throws Exception {
+ Assert.assertTrue(dataSetIDs.size() == expectedStates.size());
+ AnalysisManager analysisManager = new ODFFactory().create().getAnalysisManager();
+
+ Map<AnalysisRequest, AnalysisRequestStatus.State> request2ExpectedState = new HashMap<AnalysisRequest, AnalysisRequestStatus.State>();
+
+ for (int i = 0; i < dataSetIDs.size(); i++) {
+ String dataSetID = dataSetIDs.get(i);
+ AnalysisRequestStatus.State expectedState = expectedStates.get(i);
+
+ AnalysisRequest request = createAnalysisRequest(Collections.singletonList(dataSetID));
+
+ log.info("Starting analyis");
+ AnalysisResponse response = analysisManager.runAnalysis(request);
+ Assert.assertNotNull(response);
+ String id = response.getId();
+ Assert.assertFalse(response.isInvalidRequest());
+ Assert.assertNotNull(id);
+ request.setId(id);
+ request2ExpectedState.put(request, expectedState);
+ }
+
+ // Set<AnalysisRequest> finishedRequests = new HashSet<AnalysisRequest>();
+ Map<AnalysisRequest, AnalysisRequestStatus> actualFinalStatePerRequest = new HashMap<AnalysisRequest, AnalysisRequestStatus>();
+ int maxPollPasses = 10;
+ for (int i = 0; i < maxPollPasses; i++) {
+ log.info("Polling all requests for the " + i + " th time");
+ boolean allRequestsFinished = true;
+ for (Map.Entry<AnalysisRequest, AnalysisRequestStatus.State> entry : request2ExpectedState.entrySet()) {
+
+ AnalysisRequest request = entry.getKey();
+ String id = request.getId();
+ if (actualFinalStatePerRequest.containsKey(request)) {
+ log.log(Level.INFO, "Request with ID ''{0}'' already finished, skipping it", id);
+ } else {
+ allRequestsFinished = false;
+
+ AnalysisRequestStatus.State expectedState = entry.getValue();
+
+ AnalysisRequestStatus status = null;
+
+ int maxPollsPerRequest = 3;
+ do {
+ status = analysisManager.getAnalysisRequestStatus(id);
+ log.log(Level.INFO, "Poll request for request ID ''{0}'' (expected state: ''{3}''): state: ''{1}'', details: ''{2}''",
+ new Object[] { id, status.getState(), status.getDetails(), expectedState });
+ maxPollsPerRequest--;
+ Thread.sleep(1000);
+ } while (maxPollsPerRequest > 0 && (status.getState() == AnalysisRequestStatus.State.ACTIVE || status.getState() == AnalysisRequestStatus.State.QUEUED || status.getState() == AnalysisRequestStatus.State.NOT_FOUND));
+
+ if (maxPollsPerRequest > 0) {
+ // final state found
+ actualFinalStatePerRequest.put(request, status);
+ // Assert.assertEquals(expectedState, status.getState());
+ }
+ }
+ }
+ if (allRequestsFinished) {
+ log.info("All requests finished");
+ break;
+ }
+ }
+ Assert.assertTrue(actualFinalStatePerRequest.size() == request2ExpectedState.size());
+ Assert.assertTrue(actualFinalStatePerRequest.keySet().equals(request2ExpectedState.keySet()));
+ for (Map.Entry<AnalysisRequest, AnalysisRequestStatus> actual : actualFinalStatePerRequest.entrySet()) {
+ AnalysisRequest req = actual.getKey();
+ Assert.assertNotNull(req);
+ AnalysisRequestStatus.State expectedState = request2ExpectedState.get(req);
+ Assert.assertNotNull(expectedState);
+ AnalysisRequestStatus.State actualState = actual.getValue().getState();
+ Assert.assertNotNull(actualState);
+
+ log.log(Level.INFO, "Checking request ID ''{0}'', actual state: ''{1}'', expected state: ''{2}''", new Object[] { req.getId(), actualState, expectedState });
+ Assert.assertNotNull(expectedState);
+ Assert.assertEquals(expectedState, actualState);
+ }
+ }
+
+ public static AnalysisRequest createAnalysisRequest(List<String> dataSetIDs) throws JSONException {
+ AnalysisRequest request = new AnalysisRequest();
+ List<MetaDataObjectReference> dataSetRefs = new ArrayList<>();
+ MetadataStore mds = new ODFFactory().create().getMetadataStore();
+ if (!(mds instanceof DefaultMetadataStore)) {
+ throw new RuntimeException(MessageFormat.format("This tests does not work with metadata store implementation \"{0}\" but only with the DefaultMetadataStore.", mds.getClass().getName()));
+ }
+ DefaultMetadataStore defaultMds = (DefaultMetadataStore) mds;
+ defaultMds.resetAllData();
+ for (String id : dataSetIDs) {
+ MetaDataObjectReference mdr = new MetaDataObjectReference();
+ mdr.setId(id);
+ dataSetRefs.add(mdr);
+ if (id.startsWith(DUMMY_SUCCESS_ID) || id.startsWith(DUMMY_ERROR_ID)) {
+ log.info("Creating dummy data set for reference : " + id.toString());
+ DataSet ds = new UnknownDataSet();
+ ds.setReference(mdr);
+ defaultMds.createObject(ds);
+ }
+ }
+ defaultMds.commit();
+ request.setDataSets(dataSetRefs);
+ List<String> serviceIds = Arrays.asList(new String[]{"asynctestservice", "synctestservice"});
+ /* use a fix list of services
+ List<DiscoveryServiceRegistrationInfo> registeredServices = new ODFFactory().create(ControlCenter.class).getConfig().getRegisteredServices();
+ for(DiscoveryServiceRegistrationInfo service : registeredServices){
+ serviceIds.add(service.getId());
+ }
+ */
+ request.setDiscoveryServiceSequence(serviceIds);
+ Map<String, Object> additionalProps = new HashMap<String, Object>();
+ additionalProps.put("aaa", "bbb");
+ JSONObject jo = new JSONObject();
+ jo.put("p1", "v1");
+ jo.put("p2", "v2");
+ additionalProps.put("jo", jo);
+ request.setAdditionalProperties(additionalProps);
+ return request;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/6d19e129/odf/odf-core/src/test/java/org/apache/atlas/odf/core/test/controlcenter/ParallelODFTest.java
----------------------------------------------------------------------
diff --git a/odf/odf-core/src/test/java/org/apache/atlas/odf/core/test/controlcenter/ParallelODFTest.java b/odf/odf-core/src/test/java/org/apache/atlas/odf/core/test/controlcenter/ParallelODFTest.java
new file mode 100755
index 0000000..9aa3ba4
--- /dev/null
+++ b/odf/odf-core/src/test/java/org/apache/atlas/odf/core/test/controlcenter/ParallelODFTest.java
@@ -0,0 +1,101 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.atlas.odf.core.test.controlcenter;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import org.apache.atlas.odf.api.analysis.AnalysisRequest;
+import org.junit.Assert;
+import org.junit.Test;
+
+import org.apache.atlas.odf.api.ODFFactory;
+import org.apache.atlas.odf.api.analysis.AnalysisManager;
+import org.apache.atlas.odf.api.analysis.AnalysisRequestStatus;
+import org.apache.atlas.odf.api.analysis.AnalysisRequestStatus.State;
+import org.apache.atlas.odf.api.analysis.AnalysisResponse;
+import org.apache.atlas.odf.api.engine.EngineManager;
+import org.apache.atlas.odf.api.engine.SystemHealth;
+import org.apache.atlas.odf.api.engine.SystemHealth.HealthStatus;
+import org.apache.atlas.odf.core.Utils;
+import org.apache.atlas.odf.core.analysis.AnalysisManagerImpl;
+import org.apache.atlas.odf.core.test.ODFTestLogger;
+import org.apache.atlas.odf.core.test.ODFTestcase;
+
+public class ParallelODFTest extends ODFTestcase {
+ Logger log = ODFTestLogger.get();
+
+ @Test
+ public void runDataSetsInParallelSuccess() throws Exception {
+ runDataSetsInParallelAndCheckResult(Arrays.asList(new String[] { "successID1", "successID2" }), State.FINISHED);
+ }
+
+ @Test
+ public void runDataSetsInParallelError() throws Exception {
+ runDataSetsInParallelAndCheckResult(Arrays.asList(new String[] { "successID1", "errorID2" }), State.ERROR);
+ }
+
+ private void runDataSetsInParallelAndCheckResult(List<String> dataSetIDs, State expectedState) throws Exception {
+ log.info("Running data sets in parallel: " + dataSetIDs);
+ log.info("Expected state: " + expectedState);
+ AnalysisRequest req = ODFAPITest.createAnalysisRequest(dataSetIDs);
+ // Enable parallel processing because this is a parallel test
+ req.setProcessDataSetsSequentially(false);
+ AnalysisManager analysisManager = new ODFFactory().create().getAnalysisManager();
+ EngineManager engineManager = new ODFFactory().create().getEngineManager();
+
+ SystemHealth healthCheckResult = engineManager.checkHealthStatus();
+ Assert.assertEquals(HealthStatus.OK, healthCheckResult.getStatus());
+ AnalysisResponse resp = analysisManager.runAnalysis(req);
+ log.info("Parallel requests started");
+
+ String id = resp.getId();
+ List<String> singleIds = Utils.splitString(id, AnalysisManagerImpl.COMPOUND_REQUEST_SEPARATOR);
+ List<String> singleDetails = Utils.splitString(resp.getDetails(), AnalysisManagerImpl.COMPOUND_REQUEST_SEPARATOR);
+ Assert.assertEquals(dataSetIDs.size(), singleIds.size());
+ Assert.assertEquals(dataSetIDs.size(), singleDetails.size());
+
+ AnalysisRequestStatus status = null;
+
+ // check that requests are processed in parallel:
+ // there must be a point in time where both requests are in status "active"
+ log.info("Polling for status of parallel request...");
+ boolean foundPointInTimeWhereBothRequestsAreActive = false;
+ int maxPolls = ODFAPITest.MAX_NUMBER_OF_POLLS;
+ do {
+ List<State> allSingleStates = new ArrayList<AnalysisRequestStatus.State>();
+ for (String singleId : singleIds) {
+ allSingleStates.add(analysisManager.getAnalysisRequestStatus(singleId).getState());
+ }
+ if (Utils.containsOnly(allSingleStates, new State[] { State.ACTIVE })) {
+ foundPointInTimeWhereBothRequestsAreActive = true;
+ }
+
+ status = analysisManager.getAnalysisRequestStatus(id);
+ log.log(Level.INFO, "Poll request for parallel request ID ''{0}'' (expected state: ''{3}''): state: ''{1}'', details: ''{2}''", new Object[] { id, status.getState(), status.getDetails(),
+ expectedState });
+ log.info("States of single requests: " + singleIds + ": " + allSingleStates);
+ maxPolls--;
+ Thread.sleep(ODFAPITest.WAIT_MS_BETWEEN_POLLING);
+ } while (maxPolls > 0 && (status.getState() == State.ACTIVE || status.getState() == State.QUEUED));
+
+ Assert.assertTrue(maxPolls > 0);
+ Assert.assertEquals(expectedState, status.getState());
+ Assert.assertTrue(foundPointInTimeWhereBothRequestsAreActive);
+ log.info("Parallel request status details: " + status.getDetails());
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/6d19e129/odf/odf-core/src/test/java/org/apache/atlas/odf/core/test/controlcenter/SetTrackerStatusTest.java
----------------------------------------------------------------------
diff --git a/odf/odf-core/src/test/java/org/apache/atlas/odf/core/test/controlcenter/SetTrackerStatusTest.java b/odf/odf-core/src/test/java/org/apache/atlas/odf/core/test/controlcenter/SetTrackerStatusTest.java
new file mode 100755
index 0000000..9a43b78
--- /dev/null
+++ b/odf/odf-core/src/test/java/org/apache/atlas/odf/core/test/controlcenter/SetTrackerStatusTest.java
@@ -0,0 +1,66 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.atlas.odf.core.test.controlcenter;
+
+import java.util.logging.Level;
+
+import org.apache.atlas.odf.api.analysis.AnalysisRequestStatus;
+import org.apache.atlas.odf.core.ODFInternalFactory;
+import org.junit.Assert;
+import org.junit.Test;
+
+import org.apache.atlas.odf.api.ODFFactory;
+import org.apache.atlas.odf.api.analysis.AnalysisManager;
+import org.apache.atlas.odf.api.analysis.AnalysisRequestTrackerStatus.STATUS;
+import org.apache.atlas.odf.api.discoveryservice.AnalysisRequestTracker;
+import org.apache.atlas.odf.core.controlcenter.AnalysisRequestTrackerStore;
+import org.apache.atlas.odf.core.test.ODFTestBase;
+
+public class SetTrackerStatusTest extends ODFTestBase {
+
+ @Test
+ public void testSetTrackerStatus() throws Exception {
+ AnalysisManager am = new ODFFactory().create().getAnalysisManager();
+ AnalysisRequestTrackerStore arts = new ODFInternalFactory().create(AnalysisRequestTrackerStore.class);
+ String requestId = ODFAPITest.runRequest("successId", am);
+ Thread.sleep(1000);
+ long cutOffTimestamp = System.currentTimeMillis();
+ String testMessage = "Message was set to error at " + cutOffTimestamp;
+ arts.setStatusOfOldRequest(cutOffTimestamp, STATUS.ERROR, testMessage);
+ AnalysisRequestTracker tracker = arts.query(requestId);
+ Assert.assertEquals(STATUS.ERROR, tracker.getStatus());
+ Assert.assertEquals(testMessage, tracker.getStatusDetails());
+
+ // wait until request is finished and state is set back to finished
+ log.log(Level.INFO, "Waiting for request ''{0}'' to finish", requestId);
+ int maxPolls = ODFAPITest.MAX_NUMBER_OF_POLLS;
+ AnalysisRequestStatus status = null;
+ do {
+ status = am.getAnalysisRequestStatus(requestId);
+ log.log(Level.INFO, "Poll request for request ID ''{0}'', state: ''{1}'', details: ''{2}''", new Object[] { requestId, status.getState(), status.getDetails() });
+ maxPolls--;
+ try {
+ Thread.sleep(ODFAPITest.WAIT_MS_BETWEEN_POLLING);
+ } catch (InterruptedException e) {
+ // TODO Auto-generated catch block
+ e.printStackTrace();
+ }
+ } while (maxPolls > 0 && (status.getState() != AnalysisRequestStatus.State.FINISHED) );
+
+ Assert.assertEquals(AnalysisRequestStatus.State.FINISHED, am.getAnalysisRequestStatus(requestId).getState());
+ tracker = arts.query(requestId);
+ Assert.assertEquals(STATUS.FINISHED, tracker.getStatus());
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/6d19e129/odf/odf-core/src/test/java/org/apache/atlas/odf/core/test/discoveryservice/DiscoveryServiceManagerTest.java
----------------------------------------------------------------------
diff --git a/odf/odf-core/src/test/java/org/apache/atlas/odf/core/test/discoveryservice/DiscoveryServiceManagerTest.java b/odf/odf-core/src/test/java/org/apache/atlas/odf/core/test/discoveryservice/DiscoveryServiceManagerTest.java
new file mode 100755
index 0000000..0f1aa8f
--- /dev/null
+++ b/odf/odf-core/src/test/java/org/apache/atlas/odf/core/test/discoveryservice/DiscoveryServiceManagerTest.java
@@ -0,0 +1,135 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.atlas.odf.core.test.discoveryservice;
+
+import java.io.InputStream;
+
+import org.apache.atlas.odf.api.ODFFactory;
+import org.apache.atlas.odf.api.settings.validation.ValidationException;
+import org.apache.atlas.odf.json.JSONUtils;
+import org.apache.wink.json4j.JSONException;
+import org.junit.Assert;
+import org.junit.Ignore;
+import org.junit.Test;
+
+import org.apache.atlas.odf.api.discoveryservice.DiscoveryServiceManager;
+import org.apache.atlas.odf.api.discoveryservice.DiscoveryServiceEndpoint;
+import org.apache.atlas.odf.api.discoveryservice.DiscoveryServiceJavaEndpoint;
+import org.apache.atlas.odf.api.discoveryservice.DiscoveryServiceProperties;
+import org.apache.atlas.odf.api.discoveryservice.DiscoveryServiceRuntimeStatistics;
+import org.apache.atlas.odf.api.discoveryservice.DiscoveryServiceStatus;
+import org.apache.atlas.odf.api.discoveryservice.ServiceNotFoundException;
+
+public class DiscoveryServiceManagerTest {
+
+ final private static String ASYNCTESTWA_SERVICE_ID = "asynctestservice-with-annotations";
+
+ final private static String NEW_SERVICE_ID = "New_Service";
+ final private static String NEW_SERVICE_NAME = "Name of New Service";
+ final private static String NEW_SERVICE_DESCRIPTION = "Description of the New Service";
+ final private static String NEW_SERVICE_CLASSNAME = "TestAsyncDiscoveryService1";
+
+ final private static String UPDATED_SERVICE_DESCRIPTION = "Updated description of the New Service";
+ final private static String UPDATED_SERVICE_CLASSNAME = "TestSyncDiscoveryService1";
+
+ private void registerDiscoveryService(DiscoveryServiceProperties dsProperties) throws ValidationException {
+ DiscoveryServiceManager discoveryServicesManager = new ODFFactory().create().getDiscoveryServiceManager();
+ discoveryServicesManager.createDiscoveryService(dsProperties);
+ }
+
+ private void replaceDiscoveryService(DiscoveryServiceProperties dsProperties) throws ValidationException {
+ DiscoveryServiceManager discoveryServicesManager = new ODFFactory().create().getDiscoveryServiceManager();
+ discoveryServicesManager.replaceDiscoveryService(dsProperties);
+ }
+
+ private void unregisterDiscoveryService(String serviceId) throws ServiceNotFoundException, ValidationException {
+ DiscoveryServiceManager discoveryServicesManager = new ODFFactory().create().getDiscoveryServiceManager();
+ discoveryServicesManager.deleteDiscoveryService(serviceId);
+ }
+
+ @Test
+ public void testGetDiscoveryServiceProperties() throws ServiceNotFoundException {
+ DiscoveryServiceManager discoveryServicesManager = new ODFFactory().create().getDiscoveryServiceManager();
+ DiscoveryServiceProperties dsProperties = discoveryServicesManager.getDiscoveryServiceProperties(ASYNCTESTWA_SERVICE_ID);
+ Assert.assertNotNull(dsProperties);
+ }
+
+
+ @Ignore @Test // Ignoring testcase due to problem on Mac (issue #56)
+ public void testGetDiscoveryServiceStatus() throws ServiceNotFoundException {
+ DiscoveryServiceManager discoveryServicesManager = new ODFFactory().create().getDiscoveryServiceManager();
+ DiscoveryServiceStatus dsStatus = discoveryServicesManager.getDiscoveryServiceStatus(ASYNCTESTWA_SERVICE_ID);
+ Assert.assertNotNull(dsStatus);
+ }
+
+ @Test // TODO: need to adjust as soon as runtime statistics are available
+ public void testGetDiscoveryServiceRuntimeStatistics() throws ServiceNotFoundException {
+ DiscoveryServiceManager discoveryServicesManager = new ODFFactory().create().getDiscoveryServiceManager();
+ DiscoveryServiceRuntimeStatistics dsRuntimeStats = discoveryServicesManager.getDiscoveryServiceRuntimeStatistics(ASYNCTESTWA_SERVICE_ID);
+ Assert.assertNotNull(dsRuntimeStats);
+ long avgProcTime = dsRuntimeStats.getAverageProcessingTimePerItemInMillis();
+ Assert.assertEquals(0, avgProcTime);
+ }
+
+ @Test
+ public void testDeleteDiscoveryServiceRuntimeStatistics() throws ServiceNotFoundException {
+ DiscoveryServiceManager discoveryServicesManager = new ODFFactory().create().getDiscoveryServiceManager();
+ discoveryServicesManager.deleteDiscoveryServiceRuntimeStatistics(ASYNCTESTWA_SERVICE_ID);
+ }
+
+ @Test
+ public void testGetDiscoveryServiceImage() throws ServiceNotFoundException {
+ DiscoveryServiceManager discoveryServicesManager = new ODFFactory().create().getDiscoveryServiceManager();
+ InputStream is = discoveryServicesManager.getDiscoveryServiceImage(ASYNCTESTWA_SERVICE_ID);
+ Assert.assertNull(is);
+ }
+
+ @Test
+ public void testCreateUpdateDelete() throws ServiceNotFoundException, ValidationException, JSONException {
+ DiscoveryServiceJavaEndpoint dse = new DiscoveryServiceJavaEndpoint();
+ dse.setClassName(NEW_SERVICE_CLASSNAME);
+ DiscoveryServiceProperties dsProperties = new DiscoveryServiceProperties();
+ dsProperties.setId(NEW_SERVICE_ID);
+ dsProperties.setName(NEW_SERVICE_NAME);
+ dsProperties.setDescription(NEW_SERVICE_DESCRIPTION);
+ dsProperties.setLink(null);
+ dsProperties.setPrerequisiteAnnotationTypes(null);
+ dsProperties.setResultingAnnotationTypes(null);
+ dsProperties.setSupportedObjectTypes(null);
+ dsProperties.setAssignedObjectTypes(null);
+ dsProperties.setAssignedObjectCandidates(null);
+ dsProperties.setEndpoint(JSONUtils.convert(dse, DiscoveryServiceEndpoint.class));
+ dsProperties.setParallelismCount(2);
+ registerDiscoveryService(dsProperties);
+
+ DiscoveryServiceJavaEndpoint dse2 = new DiscoveryServiceJavaEndpoint();
+ dse2.setClassName(UPDATED_SERVICE_CLASSNAME);
+ DiscoveryServiceProperties dsProperties2 = new DiscoveryServiceProperties();
+ dsProperties2.setId(NEW_SERVICE_ID);
+ dsProperties2.setName(NEW_SERVICE_NAME);
+ dsProperties2.setDescription(UPDATED_SERVICE_DESCRIPTION);
+ dsProperties2.setLink(null);
+ dsProperties.setPrerequisiteAnnotationTypes(null);
+ dsProperties.setResultingAnnotationTypes(null);
+ dsProperties.setSupportedObjectTypes(null);
+ dsProperties.setAssignedObjectTypes(null);
+ dsProperties.setAssignedObjectCandidates(null);
+ dsProperties2.setEndpoint(JSONUtils.convert(dse2, DiscoveryServiceEndpoint.class));
+ dsProperties2.setParallelismCount(2);
+ replaceDiscoveryService(dsProperties2);
+
+ unregisterDiscoveryService(NEW_SERVICE_ID);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/6d19e129/odf/odf-core/src/test/java/org/apache/atlas/odf/core/test/discoveryservice/TestAsyncDiscoveryService1.java
----------------------------------------------------------------------
diff --git a/odf/odf-core/src/test/java/org/apache/atlas/odf/core/test/discoveryservice/TestAsyncDiscoveryService1.java b/odf/odf-core/src/test/java/org/apache/atlas/odf/core/test/discoveryservice/TestAsyncDiscoveryService1.java
new file mode 100755
index 0000000..2ea85b7
--- /dev/null
+++ b/odf/odf-core/src/test/java/org/apache/atlas/odf/core/test/discoveryservice/TestAsyncDiscoveryService1.java
@@ -0,0 +1,227 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.atlas.odf.core.test.discoveryservice;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+import java.util.logging.Logger;
+
+import org.apache.atlas.odf.api.discoveryservice.DiscoveryServiceBase;
+import org.apache.atlas.odf.api.discoveryservice.DiscoveryServiceResponse;
+import org.apache.atlas.odf.api.discoveryservice.async.DiscoveryServiceAsyncRunStatus;
+import org.junit.Assert;
+
+import org.apache.atlas.odf.api.discoveryservice.DiscoveryServiceRequest;
+import org.apache.atlas.odf.api.discoveryservice.async.AsyncDiscoveryService;
+import org.apache.atlas.odf.core.Environment;
+import org.apache.atlas.odf.core.ODFInternalFactory;
+import org.apache.atlas.odf.core.Utils;
+import org.apache.atlas.odf.core.test.ODFTestLogger;
+import org.apache.atlas.odf.api.discoveryservice.async.DiscoveryServiceAsyncStartResponse;
+import org.apache.atlas.odf.json.JSONUtils;
+
+public class TestAsyncDiscoveryService1 extends DiscoveryServiceBase implements AsyncDiscoveryService {
+
+ static int unavailableCounter = 0;
+
+ static Logger logger = ODFTestLogger.get();
+
+ public static void checkUserAndAdditionalProperties(DiscoveryServiceRequest request) {
+ String user = request.getUser();
+
+ String defaultUser = System.getProperty("user.name");
+ Assert.assertEquals(defaultUser, user);
+
+ Map<String, Object> additionalProperties = request.getAdditionalProperties();
+ logger.info("TestAsyncDiscoveryService1.startAnalysis additional properties: " + additionalProperties);
+ Assert.assertNotNull(additionalProperties);
+
+ // check that environment entries are also available additional properties
+ Environment ev = new ODFInternalFactory().create(Environment.class);
+ String dsId = request.getDiscoveryServiceId();
+ Map<String, String> serviceEnvProps = ev.getPropertiesWithPrefix(dsId);
+ if (!serviceEnvProps.isEmpty()) {
+ Assert.assertTrue(!additionalProperties.isEmpty());
+ for (Map.Entry<String, String> serviceEnvProp : serviceEnvProps.entrySet()) {
+ String key = serviceEnvProp.getKey();
+ String val = serviceEnvProp.getValue();
+ logger.info("Found discoveryservice configuration parameter: " + key + " with value " + val);
+ Assert.assertTrue(key.startsWith(dsId));
+ Assert.assertTrue(additionalProperties.containsKey(key) );
+ Assert.assertEquals(val, additionalProperties.get(key));
+ }
+ }
+
+ if (!additionalProperties.isEmpty()) {
+ Assert.assertTrue(additionalProperties.containsKey("aaa"));
+ Assert.assertTrue("bbb".equals(additionalProperties.get("aaa")));
+ Assert.assertTrue(additionalProperties.containsKey("jo"));
+ @SuppressWarnings("unchecked")
+ Map<String, Object> m = (Map<String, Object>) additionalProperties.get("jo");
+ Assert.assertTrue("v1".equals(m.get("p1")));
+ Assert.assertTrue("v2".equals(m.get("p2")));
+ /*
+ if (!additionalProperties.containsKey("aaa")) {
+ response.setCode(ResponseCode.UNKNOWN_ERROR);
+ response.setDetails("Additional property value 'aaa' doesn't exist");
+ return;
+ }
+ if (!"bbb".equals(additionalProperties.get("aaa"))) {
+ response.setCode(ResponseCode.UNKNOWN_ERROR);
+ response.setDetails("Additional properties 'aaa' has wrong value");
+ return;
+ }
+ if (!additionalProperties.containsKey("jo")) {
+ response.setCode(ResponseCode.UNKNOWN_ERROR);
+ response.setDetails("Additional property value 'jo' doesn't exist");
+ return;
+ }
+ Map m = (Map) additionalProperties.get("jo");
+ if (!"v1".equals(m.get("p1"))) {
+ response.setCode(ResponseCode.UNKNOWN_ERROR);
+ response.setDetails("Additional property value 'jo.p1' doesn't exist");
+ return;
+
+ }
+ if (!"v2".equals(m.get("p2"))) {
+ response.setCode(ResponseCode.UNKNOWN_ERROR);
+ response.setDetails("Additional property value 'jo.p2' doesn't exist");
+ return;
+ }
+ */
+ }
+ }
+
+ @Override
+ public DiscoveryServiceAsyncStartResponse startAnalysis(DiscoveryServiceRequest request) {
+ try {
+ DiscoveryServiceResponse.ResponseCode code = DiscoveryServiceResponse.ResponseCode.TEMPORARILY_UNAVAILABLE;
+ String details = "Cannot answer right now";
+ if (unavailableCounter % 2 == 0) {
+ code = DiscoveryServiceResponse.ResponseCode.OK;
+ details = "Everything's peachy";
+ }
+ unavailableCounter++;
+ /*
+ if (unavailableCounter % 3 == 0) {
+ code = CODE.NOT_AUTHORIZED;
+ details = "You have no power here!";
+ }
+ */
+ DiscoveryServiceAsyncStartResponse response = new DiscoveryServiceAsyncStartResponse();
+ response.setCode(code);
+ response.setDetails(details);
+ if (code == DiscoveryServiceResponse.ResponseCode.OK) {
+ String runid = "TestAsyncService1" + UUID.randomUUID().toString();
+ synchronized (lock) {
+ runIDsRunning.put(runid, 4); // return status "running" 4 times before finishing
+ }
+ response.setRunId(runid);
+ String dataSetId = request.getDataSetContainer().getDataSet().getReference().getId();
+ if (dataSetId.startsWith("error")) {
+ logger.info("TestAsync Discovery Service run " + runid + " will fail");
+ runIDsWithError.add(runid);
+ } else {
+ logger.info("TestAsync Discovery Service run " + runid + " will succeed");
+ }
+ }
+ logger.info("TestAsyncDiscoveryService1.startAnalysis returns: " + JSONUtils.lazyJSONSerializer(response));
+ checkUserAndAdditionalProperties(request);
+ /*
+ String user = request.getUser();
+ Assert.assertEquals(TestControlCenter.TEST_USER_ID, user);
+
+ Map<String, Object> additionalProperties = request.getAdditionalProperties();
+ logger.info("TestAsyncDiscoveryService1.startAnalysis additional properties: " + additionalProperties);
+ Assert.assertNotNull(additionalProperties);
+ if (!additionalProperties.isEmpty()) {
+ if (!additionalProperties.containsKey("aaa")) {
+ response.setCode(ResponseCode.UNKNOWN_ERROR);
+ response.setDetails("Additional property value 'aaa' doesn't exist");
+ return response;
+ }
+ if (!"bbb".equals(additionalProperties.get("aaa"))) {
+ response.setCode(ResponseCode.UNKNOWN_ERROR);
+ response.setDetails("Additional properties 'aaa' has wrong value");
+ return response;
+ }
+ if (!additionalProperties.containsKey("jo")) {
+ response.setCode(ResponseCode.UNKNOWN_ERROR);
+ response.setDetails("Additional property value 'jo' doesn't exist");
+ return response;
+ }
+ Map m = (Map) additionalProperties.get("jo");
+ if (!"v1".equals(m.get("p1"))) {
+ response.setCode(ResponseCode.UNKNOWN_ERROR);
+ response.setDetails("Additional property value 'jo.p1' doesn't exist");
+ return response;
+
+ }
+ if (!"v2".equals(m.get("p2"))) {
+ response.setCode(ResponseCode.UNKNOWN_ERROR);
+ response.setDetails("Additional property value 'jo.p2' doesn't exist");
+ return response;
+ }
+ }
+ */
+ return response;
+ } catch (Throwable t) {
+ DiscoveryServiceAsyncStartResponse response = new DiscoveryServiceAsyncStartResponse();
+ response.setCode(DiscoveryServiceResponse.ResponseCode.UNKNOWN_ERROR);
+ response.setDetails(Utils.getExceptionAsString(t));
+ return response;
+ }
+ }
+
+ static Object lock = new Object();
+ static Map<String, Integer> runIDsRunning = new HashMap<String, Integer>();
+ static Set<String> runIDsWithError = Collections.synchronizedSet(new HashSet<String>());
+
+ // static Map<String, Integer> requestIDUnavailable = new HashMap<>();
+
+ @Override
+ public DiscoveryServiceAsyncRunStatus getStatus(String runId) {
+ String details = "Run like the wind";
+ DiscoveryServiceAsyncRunStatus.State state = DiscoveryServiceAsyncRunStatus.State.RUNNING;
+ synchronized (lock) {
+ Integer i = runIDsRunning.get(runId);
+ Assert.assertNotNull(i);
+ if (i.intValue() == 0) {
+ if (runIDsWithError.contains(runId)) {
+ state = DiscoveryServiceAsyncRunStatus.State.ERROR;
+ details = "This was a mistake";
+ } else {
+ state = DiscoveryServiceAsyncRunStatus.State.FINISHED;
+ details = "Finish him!";
+ }
+ } else {
+ runIDsRunning.put(runId, i - 1);
+ }
+ }
+
+ DiscoveryServiceAsyncRunStatus status = new DiscoveryServiceAsyncRunStatus();
+ status.setRunId(runId);
+ status.setDetails(details);
+ status.setState(state);
+ logger.info("TestAsyncDiscoveryService1.getStatus returns: " + JSONUtils.lazyJSONSerializer(status));
+
+ return status;
+ }
+
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/6d19e129/odf/odf-core/src/test/java/org/apache/atlas/odf/core/test/discoveryservice/TestAsyncDiscoveryServiceWritingAnnotations1.java
----------------------------------------------------------------------
diff --git a/odf/odf-core/src/test/java/org/apache/atlas/odf/core/test/discoveryservice/TestAsyncDiscoveryServiceWritingAnnotations1.java b/odf/odf-core/src/test/java/org/apache/atlas/odf/core/test/discoveryservice/TestAsyncDiscoveryServiceWritingAnnotations1.java
new file mode 100755
index 0000000..bd2f1a6
--- /dev/null
+++ b/odf/odf-core/src/test/java/org/apache/atlas/odf/core/test/discoveryservice/TestAsyncDiscoveryServiceWritingAnnotations1.java
@@ -0,0 +1,99 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.atlas.odf.core.test.discoveryservice;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.UUID;
+import java.util.logging.Logger;
+
+import org.apache.atlas.odf.api.metadata.MetaDataObjectReference;
+import org.apache.atlas.odf.api.discoveryservice.DiscoveryServiceBase;
+import org.apache.atlas.odf.api.discoveryservice.DiscoveryServiceRequest;
+import org.apache.atlas.odf.api.discoveryservice.async.AsyncDiscoveryService;
+import org.apache.atlas.odf.api.discoveryservice.async.DiscoveryServiceAsyncRunStatus;
+import org.apache.atlas.odf.api.discoveryservice.async.DiscoveryServiceAsyncStartResponse;
+import org.apache.atlas.odf.core.test.ODFTestLogger;
+import org.apache.atlas.odf.api.discoveryservice.DiscoveryServiceResponse;
+
+public class TestAsyncDiscoveryServiceWritingAnnotations1 extends DiscoveryServiceBase implements AsyncDiscoveryService {
+
+ static Logger logger = ODFTestLogger.get();
+
+ static Map<String, MyThread> id2Thread = Collections.synchronizedMap(new HashMap<String, MyThread>());
+
+ class MyThread extends Thread {
+
+ String errorMessage = null;
+ String correlationId;
+ MetaDataObjectReference dataSetRef;
+
+ public MyThread(MetaDataObjectReference dataSetRef, String correlationId) {
+ super();
+ this.dataSetRef = dataSetRef;
+ this.correlationId = correlationId;
+ }
+
+ @Override
+ public void run() {
+ this.errorMessage = TestSyncDiscoveryServiceWritingAnnotations1.createAnnotations(dataSetRef, correlationId, metadataStore, annotationStore);
+ }
+
+ }
+
+ @Override
+ public DiscoveryServiceAsyncStartResponse startAnalysis(DiscoveryServiceRequest request) {
+ DiscoveryServiceAsyncStartResponse response = new DiscoveryServiceAsyncStartResponse();
+ MetaDataObjectReference dataSetRef = request.getDataSetContainer().getDataSet().getReference();
+
+ String newRunID = "RunId-" + this.getClass().getSimpleName() + "-" + UUID.randomUUID().toString();
+ MyThread t = new MyThread(dataSetRef, (String) request.getAdditionalProperties().get(TestSyncDiscoveryServiceWritingAnnotations1.REQUEST_PROPERTY_CORRELATION_ID));
+ t.start();
+ id2Thread.put(newRunID, t);
+ response.setCode(DiscoveryServiceResponse.ResponseCode.OK);
+ response.setRunId(newRunID);
+ response.setDetails("Thread started");
+ logger.info("Analysis writing annotations has started");
+
+ return response;
+ }
+
+ @Override
+ public DiscoveryServiceAsyncRunStatus getStatus(String runId) {
+ DiscoveryServiceAsyncRunStatus status = new DiscoveryServiceAsyncRunStatus();
+
+ MyThread t = id2Thread.get(runId);
+ status.setRunId(runId);
+ if (t == null) {
+ status.setState(DiscoveryServiceAsyncRunStatus.State.NOT_FOUND);
+ } else {
+ java.lang.Thread.State ts = t.getState();
+ if (!ts.equals(java.lang.Thread.State.TERMINATED)) {
+ status.setState(DiscoveryServiceAsyncRunStatus.State.RUNNING);
+ } else {
+ if (t.errorMessage != null) {
+ status.setState(DiscoveryServiceAsyncRunStatus.State.ERROR);
+ status.setDetails(t.errorMessage);
+ } else {
+ status.setState(DiscoveryServiceAsyncRunStatus.State.FINISHED);
+ status.setDetails("All went fine");
+ }
+ }
+ }
+ logger.info("Status of analysis with annotations: " + status.getState() + ", " + status.getDetails());
+ return status;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/6d19e129/odf/odf-core/src/test/java/org/apache/atlas/odf/core/test/discoveryservice/TestSyncDiscoveryService1.java
----------------------------------------------------------------------
diff --git a/odf/odf-core/src/test/java/org/apache/atlas/odf/core/test/discoveryservice/TestSyncDiscoveryService1.java b/odf/odf-core/src/test/java/org/apache/atlas/odf/core/test/discoveryservice/TestSyncDiscoveryService1.java
new file mode 100755
index 0000000..9ea92f3
--- /dev/null
+++ b/odf/odf-core/src/test/java/org/apache/atlas/odf/core/test/discoveryservice/TestSyncDiscoveryService1.java
@@ -0,0 +1,61 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.atlas.odf.core.test.discoveryservice;
+
+import java.util.logging.Logger;
+
+import org.apache.atlas.odf.api.discoveryservice.DiscoveryServiceBase;
+import org.apache.atlas.odf.api.discoveryservice.DiscoveryServiceRequest;
+import org.apache.atlas.odf.api.discoveryservice.sync.DiscoveryServiceSyncResponse;
+import org.apache.atlas.odf.api.discoveryservice.sync.SyncDiscoveryService;
+import org.apache.atlas.odf.core.test.ODFTestLogger;
+import org.apache.atlas.odf.api.discoveryservice.DiscoveryServiceResponse;
+
+public class TestSyncDiscoveryService1 extends DiscoveryServiceBase implements SyncDiscoveryService {
+ static int unavailableCounter = 0;
+
+ Logger logger = ODFTestLogger.get();
+
+ @Override
+ public DiscoveryServiceSyncResponse runAnalysis(DiscoveryServiceRequest request) {
+ try {
+ DiscoveryServiceResponse.ResponseCode code = DiscoveryServiceResponse.ResponseCode.TEMPORARILY_UNAVAILABLE;
+ String details = "Cannot answer right now synchronously";
+ if (unavailableCounter % 2 == 0) {
+ code = DiscoveryServiceResponse.ResponseCode.OK;
+ details = "Everything's peachy and synchronous";
+ }
+ unavailableCounter++;
+ DiscoveryServiceSyncResponse response = new DiscoveryServiceSyncResponse();
+ response.setDetails(details);
+ response.setCode(code);
+ if (code == DiscoveryServiceResponse.ResponseCode.OK) {
+ String dataSetId = request.getDataSetContainer().getDataSet().getReference().getId();
+ if (dataSetId.startsWith("error")) {
+ response.setCode(DiscoveryServiceResponse.ResponseCode.UNKNOWN_ERROR);
+ response.setDetails("Something went synchronously wrong!");
+ } else {
+ response.setDetails("All is synchronously fine!");
+ }
+ TestAsyncDiscoveryService1.checkUserAndAdditionalProperties(request);
+ }
+ logger.info(this.getClass().getSimpleName() + " service returned with code: " + response.getCode());
+ return response;
+ } catch (Throwable t) {
+ t.printStackTrace();
+ return null;
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/6d19e129/odf/odf-core/src/test/java/org/apache/atlas/odf/core/test/discoveryservice/TestSyncDiscoveryServiceWritingAnnotations1.java
----------------------------------------------------------------------
diff --git a/odf/odf-core/src/test/java/org/apache/atlas/odf/core/test/discoveryservice/TestSyncDiscoveryServiceWritingAnnotations1.java b/odf/odf-core/src/test/java/org/apache/atlas/odf/core/test/discoveryservice/TestSyncDiscoveryServiceWritingAnnotations1.java
new file mode 100755
index 0000000..62c7bf6
--- /dev/null
+++ b/odf/odf-core/src/test/java/org/apache/atlas/odf/core/test/discoveryservice/TestSyncDiscoveryServiceWritingAnnotations1.java
@@ -0,0 +1,156 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.atlas.odf.core.test.discoveryservice;
+
+import java.util.HashSet;
+import java.util.Set;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import org.apache.atlas.odf.api.discoveryservice.DiscoveryServiceBase;
+import org.apache.atlas.odf.api.discoveryservice.DiscoveryServiceRequest;
+import org.apache.atlas.odf.api.discoveryservice.DiscoveryServiceResponse;
+import org.apache.atlas.odf.api.discoveryservice.sync.DiscoveryServiceSyncResponse;
+import org.apache.atlas.odf.api.discoveryservice.sync.SyncDiscoveryService;
+import org.apache.atlas.odf.api.metadata.MetaDataObjectReference;
+import org.apache.atlas.odf.api.metadata.MetadataStore;
+import org.apache.wink.json4j.JSONObject;
+import org.junit.Assert;
+
+import org.apache.atlas.odf.api.metadata.models.CachedMetadataStore;
+import org.apache.atlas.odf.api.metadata.models.DataSet;
+import org.apache.atlas.odf.api.metadata.models.MetaDataCache;
+import org.apache.atlas.odf.api.metadata.models.MetaDataObject;
+import org.apache.atlas.odf.api.metadata.models.RelationalDataSet;
+import org.apache.atlas.odf.api.metadata.models.ProfilingAnnotation;
+import org.apache.atlas.odf.core.Utils;
+import org.apache.atlas.odf.api.ODFFactory;
+import org.apache.atlas.odf.api.annotation.AnnotationStore;
+
+public class TestSyncDiscoveryServiceWritingAnnotations1 extends DiscoveryServiceBase implements SyncDiscoveryService {
+
+ static Logger logger = Logger.getLogger(TestSyncDiscoveryServiceWritingAnnotations1.class.getName());
+
+ public static String checkMetaDataCache(DiscoveryServiceRequest request) {
+ logger.info("Checking metadata cache");
+ MetaDataObject mdo = request.getDataSetContainer().getDataSet();
+ MetaDataCache cache = request.getDataSetContainer().getMetaDataCache();
+ if (cache == null) {
+ return null;
+ }
+ CachedMetadataStore cacheReader = new CachedMetadataStore(cache);
+
+ if (mdo instanceof RelationalDataSet) {
+ logger.info("Checking metadata cache for columns...");
+ RelationalDataSet rds = (RelationalDataSet) mdo;
+ Set<MetaDataObjectReference> cachedColumns = new HashSet<>();
+ Set<MetaDataObjectReference> actualColumns = new HashSet<>();
+ for (MetaDataObject col : cacheReader.getColumns(rds)) {
+ cachedColumns.add(col.getReference());
+ }
+ MetadataStore mds = new ODFFactory().create().getMetadataStore();
+ for (MetaDataObject col : mds.getColumns(rds)) {
+ actualColumns.add(col.getReference());
+ }
+ Assert.assertTrue("Columns missing from metadata cache.", cachedColumns.containsAll(actualColumns));
+ Assert.assertTrue("Too many columns in metadata cache.", actualColumns.containsAll(cachedColumns));
+ }
+ return null;
+ }
+
+ @Override
+ public DiscoveryServiceSyncResponse runAnalysis(DiscoveryServiceRequest request) {
+ logger.info("Analysis started on sync test service with annotations ");
+ String errorMessage = createAnnotations( //
+ request.getDataSetContainer().getDataSet().getReference(), //
+ (String) request.getAdditionalProperties().get(REQUEST_PROPERTY_CORRELATION_ID), //
+ metadataStore, //
+ annotationStore);
+ if (errorMessage == null) {
+ errorMessage = checkMetaDataCache(request);
+ }
+ DiscoveryServiceSyncResponse resp = new DiscoveryServiceSyncResponse();
+ if (errorMessage == null) {
+ resp.setCode(DiscoveryServiceResponse.ResponseCode.OK);
+ resp.setDetails("Annotations created successfully");
+ } else {
+ resp.setCode(DiscoveryServiceResponse.ResponseCode.UNKNOWN_ERROR);
+ resp.setDetails(errorMessage);
+ }
+ logger.info("Analysis finished on sync test service with annotations ");
+
+ return resp;
+ }
+
+ public static final String REQUEST_PROPERTY_CORRELATION_ID = "REQUEST_PROPERTY_CORRELATION_ID";
+
+ static final String ANNOTATION_TYPE = "AnnotationType-" + TestSyncDiscoveryServiceWritingAnnotations1.class.getSimpleName();
+ static final String JSON_ATTRIBUTE = "Attribute-" + TestSyncDiscoveryServiceWritingAnnotations1.class.getSimpleName();
+ static final String JSON_VALUE = "Value-" + TestSyncDiscoveryServiceWritingAnnotations1.class.getSimpleName();
+
+ public static int getNumberOfAnnotations() {
+ return 3;
+ }
+
+ public static String[] getPropsOfNthAnnotation(int i) {
+ return new String[] { ANNOTATION_TYPE + i, JSON_ATTRIBUTE + i, JSON_VALUE + i };
+ }
+
+ public static String createAnnotations(MetaDataObjectReference dataSetRef, String correlationId, MetadataStore mds, AnnotationStore as) {
+ try {
+ TestSyncDiscoveryServiceWritingAnnotations1.logger.info("Analysis will run on data set ref: " + dataSetRef);
+ MetaDataObject dataSet = mds.retrieve(dataSetRef);
+
+ String errorMessage = null;
+ if (dataSet == null) {
+ errorMessage = "Data set with id " + dataSetRef + " could not be retrieved";
+ TestSyncDiscoveryServiceWritingAnnotations1.logger.severe(errorMessage);
+ return errorMessage;
+ }
+
+ if (!(dataSet instanceof DataSet)) {
+ errorMessage = "Object with id " + dataSetRef + " is not a data set";
+ TestSyncDiscoveryServiceWritingAnnotations1.logger.severe(errorMessage);
+ return errorMessage;
+ }
+
+ // add some annotations
+ for (int i = 0; i < getNumberOfAnnotations(); i++) {
+ String[] annotValues = getPropsOfNthAnnotation(i);
+ ProfilingAnnotation annotation1 = new ProfilingAnnotation();
+ annotation1.setProfiledObject(dataSetRef);
+ annotation1.setAnnotationType(annotValues[0]);
+ JSONObject jo1 = new JSONObject();
+ jo1.put(annotValues[1], annotValues[2]);
+ jo1.put(REQUEST_PROPERTY_CORRELATION_ID, correlationId);
+ annotation1.setJsonProperties(jo1.write());
+
+// PG: dynamic type creation disabled (types are already created statically)
+// mds.createAnnotationTypesFromPrototypes(Collections.singletonList(annotation1));
+ MetaDataObjectReference resultRef1 = as.store(annotation1);
+ if (resultRef1 == null) {
+ throw new RuntimeException("Annotation object " + i + " could not be created");
+ }
+ }
+
+ TestSyncDiscoveryServiceWritingAnnotations1.logger.info("Discovery service " + TestSyncDiscoveryServiceWritingAnnotations1.class.getSimpleName() + "created annotations successfully");
+ } catch (Throwable exc) {
+ exc.printStackTrace();
+ TestSyncDiscoveryServiceWritingAnnotations1.logger.log(Level.WARNING, TestSyncDiscoveryServiceWritingAnnotations1.class.getSimpleName() + " has failed", exc);
+ return "Failed: " + Utils.getExceptionAsString(exc);
+ }
+ return null;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/6d19e129/odf/odf-core/src/test/java/org/apache/atlas/odf/core/test/engine/ODFVersionTest.java
----------------------------------------------------------------------
diff --git a/odf/odf-core/src/test/java/org/apache/atlas/odf/core/test/engine/ODFVersionTest.java b/odf/odf-core/src/test/java/org/apache/atlas/odf/core/test/engine/ODFVersionTest.java
new file mode 100755
index 0000000..2e6d012
--- /dev/null
+++ b/odf/odf-core/src/test/java/org/apache/atlas/odf/core/test/engine/ODFVersionTest.java
@@ -0,0 +1,30 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.atlas.odf.core.test.engine;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import org.apache.atlas.odf.api.ODFFactory;
+import org.apache.atlas.odf.api.engine.ODFVersion;
+import org.apache.atlas.odf.core.test.TimerTestBase;
+
+public class ODFVersionTest extends TimerTestBase {
+ @Test
+ public void testVersion() {
+ ODFVersion version = new ODFFactory().create().getEngineManager().getVersion();
+ Assert.assertNotNull(version);
+ Assert.assertTrue(version.getVersion().startsWith("1.2.0-"));
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/6d19e129/odf/odf-core/src/test/java/org/apache/atlas/odf/core/test/engine/ShutdownTest.java
----------------------------------------------------------------------
diff --git a/odf/odf-core/src/test/java/org/apache/atlas/odf/core/test/engine/ShutdownTest.java b/odf/odf-core/src/test/java/org/apache/atlas/odf/core/test/engine/ShutdownTest.java
new file mode 100755
index 0000000..465eb5c
--- /dev/null
+++ b/odf/odf-core/src/test/java/org/apache/atlas/odf/core/test/engine/ShutdownTest.java
@@ -0,0 +1,90 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.atlas.odf.core.test.engine;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import org.apache.atlas.odf.api.ODFFactory;
+import org.apache.atlas.odf.api.analysis.AnalysisRequestStatus.State;
+import org.apache.atlas.odf.api.engine.EngineManager;
+import org.apache.atlas.odf.api.engine.ODFEngineOptions;
+import org.apache.atlas.odf.core.ODFInternalFactory;
+import org.apache.atlas.odf.core.controlcenter.ThreadManager;
+import org.apache.atlas.odf.core.test.ODFTestBase;
+import org.apache.atlas.odf.core.test.controlcenter.ODFAPITest;
+
+public class ShutdownTest extends ODFTestBase {
+
+ private void runAndTestThreads() throws Exception {
+ ODFAPITest.runRequestAndCheckResult("successID", State.FINISHED, -1);
+ ThreadManager tm = new ODFInternalFactory().create(ThreadManager.class);
+ int numThreads = tm.getNumberOfRunningThreads();
+ log.info("--- Number of running threads: " + numThreads);
+ Assert.assertTrue(numThreads >= 3);
+ }
+
+ @Test
+ public void testShutdown() throws Exception {
+
+ log.info("--- Running some request before shutdown...");
+ runAndTestThreads();
+
+ ThreadManager tm = new ODFInternalFactory().create(ThreadManager.class);
+ log.info("--- Number of threads before shutdown: " + tm.getNumberOfRunningThreads());
+
+ EngineManager engineManager = new ODFFactory().create().getEngineManager();
+ ODFEngineOptions options = new ODFEngineOptions();
+ options.setRestart(false);
+ int numThreads = tm.getNumberOfRunningThreads();
+ log.info("--- Number of threads before restart: " + numThreads);
+
+ engineManager.shutdown(options);
+ log.info("--- Shutdown requested...");
+ int maxWait = 60;
+ int waitCnt = 0;
+ log.info("--- Shutdown requested, waiting for max " + maxWait + " seconds");
+ while (tm.getNumberOfRunningThreads() > 0 && waitCnt < maxWait) {
+ waitCnt++;
+ Thread.sleep(1000);
+ }
+ log.info("--- Shutdown should be done by now, waited for " + waitCnt + " threads: " + tm.getNumberOfRunningThreads());
+ Assert.assertNotEquals(waitCnt, maxWait);
+
+ // log.info("--- Starting ODF again....");
+
+ // ODFInitializer.start();
+ log.info("--- Rerunning request after shutdown...");
+ runAndTestThreads();
+
+ int nrOfThreads = tm.getNumberOfRunningThreads();
+ options.setRestart(true);
+ engineManager.shutdown(options);
+ maxWait = nrOfThreads * 2;
+ waitCnt = 0;
+ log.info("--- Restart requested..., wait for a maximum of " + (nrOfThreads * 2500) + " ms");
+ while (tm.getNumberOfRunningThreads() > 0 && waitCnt < maxWait) {
+ waitCnt++;
+ Thread.sleep(1000);
+ }
+ log.info("--- Restart should be done by now");
+ Thread.sleep(5000);
+ numThreads = tm.getNumberOfRunningThreads();
+ log.info("--- Number of threads after restart: " + numThreads);
+ Assert.assertTrue(numThreads > 2);
+ log.info("--- testShutdown finished");
+
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/6d19e129/odf/odf-core/src/test/java/org/apache/atlas/odf/core/test/messaging/MockQueueManager.java
----------------------------------------------------------------------
diff --git a/odf/odf-core/src/test/java/org/apache/atlas/odf/core/test/messaging/MockQueueManager.java b/odf/odf-core/src/test/java/org/apache/atlas/odf/core/test/messaging/MockQueueManager.java
new file mode 100755
index 0000000..c2be180
--- /dev/null
+++ b/odf/odf-core/src/test/java/org/apache/atlas/odf/core/test/messaging/MockQueueManager.java
@@ -0,0 +1,249 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.atlas.odf.core.test.messaging;
+
+import java.text.MessageFormat;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.TimeoutException;
+import java.util.logging.Logger;
+
+import org.apache.atlas.odf.api.discoveryservice.DiscoveryServiceRequest;
+import org.apache.atlas.odf.core.ODFInternalFactory;
+import org.apache.atlas.odf.core.messaging.DiscoveryServiceQueueManager;
+import org.apache.wink.json4j.JSONException;
+
+import org.apache.atlas.odf.api.discoveryservice.AnalysisRequestTracker;
+import org.apache.atlas.odf.api.engine.MessagingStatus;
+import org.apache.atlas.odf.core.controlcenter.AdminMessage;
+import org.apache.atlas.odf.core.controlcenter.AdminQueueProcessor;
+import org.apache.atlas.odf.core.controlcenter.ConfigChangeQueueProcessor;
+import org.apache.atlas.odf.core.controlcenter.DefaultStatusQueueStore;
+import org.apache.atlas.odf.core.controlcenter.DiscoveryServiceStarter;
+import org.apache.atlas.odf.core.controlcenter.ExecutorServiceFactory;
+import org.apache.atlas.odf.core.controlcenter.ODFRunnable;
+import org.apache.atlas.odf.core.controlcenter.QueueMessageProcessor;
+import org.apache.atlas.odf.core.controlcenter.ServiceRuntime;
+import org.apache.atlas.odf.core.controlcenter.ServiceRuntimes;
+import org.apache.atlas.odf.core.controlcenter.StatusQueueEntry;
+import org.apache.atlas.odf.core.controlcenter.ThreadManager;
+import org.apache.atlas.odf.core.controlcenter.ThreadManager.ThreadStartupResult;
+import org.apache.atlas.odf.core.controlcenter.TrackerUtil;
+import org.apache.atlas.odf.json.JSONUtils;
+
+public class MockQueueManager implements DiscoveryServiceQueueManager {
+
+ static Logger logger = Logger.getLogger(MockQueueManager.class.getName());
+
+ static Object lock = new Object();
+
+ static List<AdminMessage> adminQueue = Collections.synchronizedList(new ArrayList<AdminMessage>());
+ static List<StatusQueueEntry> statusQueue = Collections.synchronizedList(new ArrayList<StatusQueueEntry>());
+ static Map<String, List<AnalysisRequestTracker>> runtimeQueues = new HashMap<>();
+
+ ThreadManager threadManager;
+
+ public MockQueueManager() {
+ ODFInternalFactory factory = new ODFInternalFactory();
+ ExecutorServiceFactory esf = factory.create(ExecutorServiceFactory.class);
+ threadManager = factory.create(ThreadManager.class);
+ threadManager.setExecutorService(esf.createExecutorService());
+ //initialize();
+ }
+
+ @Override
+ public void start() throws TimeoutException {
+ logger.info("Initializing MockQueueManager");
+ List<ThreadStartupResult> threads = new ArrayList<ThreadStartupResult>();
+ ThreadStartupResult startUnmanagedThread = this.threadManager.startUnmanagedThread("MOCKADMIN", createQueueListener("Admin", adminQueue, new AdminQueueProcessor(), false));
+ boolean threadCreated = startUnmanagedThread.isNewThreadCreated();
+ threads.add(startUnmanagedThread);
+ startUnmanagedThread = this.threadManager.startUnmanagedThread("MOCKADMINCONFIGCHANGE",
+ createQueueListener("AdminConfig", adminQueue, new ConfigChangeQueueProcessor(), false));
+ threadCreated |= startUnmanagedThread.isNewThreadCreated();
+ threads.add(startUnmanagedThread);
+ startUnmanagedThread = this.threadManager.startUnmanagedThread("MOCKSTATUSSTORE",
+ createQueueListener("StatusStore", statusQueue, new DefaultStatusQueueStore.StatusQueueProcessor(), true));
+ threadCreated |= startUnmanagedThread
+ .isNewThreadCreated();
+ threads.add(startUnmanagedThread);
+
+ logger.info("New thread created: " + threadCreated);
+ if (threadCreated) {
+ try {
+ this.threadManager.waitForThreadsToBeReady(5000, threads);
+ logger.info("All threads ready");
+ } catch (TimeoutException e) {
+ final String message = "Not all thrads were created on time";
+ logger.warning(message);
+ }
+ }
+ }
+
+ @Override
+ public void stop() {
+ threadManager.shutdownThreads(Arrays.asList("MOCKADMIN", "MOCKADMINCONFIGCHANGE", "MOCKSTATUSSTORE"));
+ }
+
+ <T> T cloneObject(T obj) {
+ try {
+ return JSONUtils.cloneJSONObject(obj);
+ } catch (JSONException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ @Override
+ public void enqueue(AnalysisRequestTracker tracker) {
+ tracker = cloneObject(tracker);
+ DiscoveryServiceRequest dsRequest = TrackerUtil.getCurrentDiscoveryServiceStartRequest(tracker);
+ if (dsRequest == null) {
+ throw new RuntimeException("Tracker is finished, should not be enqueued");
+ }
+ String dsID = dsRequest.getDiscoveryServiceId();
+ dsRequest.setPutOnRequestQueue(System.currentTimeMillis());
+ synchronized (lock) {
+ ServiceRuntime runtime = ServiceRuntimes.getRuntimeForDiscoveryService(dsID);
+ if (runtime == null) {
+ throw new RuntimeException(MessageFormat.format("Runtime of discovery service ''{0}'' does not exist", dsID));
+ }
+ String runtimeName = runtime.getName();
+ List<AnalysisRequestTracker> mq = runtimeQueues.get(runtimeName);
+ if (mq == null) {
+ mq = Collections.synchronizedList(new ArrayList<AnalysisRequestTracker>());
+ runtimeQueues.put(runtimeName, mq);
+ }
+ boolean started = this.threadManager.startUnmanagedThread("MOCK" + runtimeName, createQueueListener("Starter" + runtimeName, mq, new DiscoveryServiceStarter(), false))
+ .isNewThreadCreated();
+ logger.info("New thread created for runtime " + runtimeName + ", started: " + started + ", current queue length: " + mq.size());
+ mq.add(tracker);
+ }
+ }
+
+ static class MockQueueListener implements ODFRunnable {
+ String name;
+ QueueMessageProcessor processor;
+ List<?> queue;
+ boolean cancelled = false;
+ ExecutorService service;
+ int index = 0;
+
+ public MockQueueListener(String name, List<?> q, QueueMessageProcessor qmp, boolean fromBeginning) {
+ this.name = name;
+ this.processor = qmp;
+ this.queue = q;
+ if (fromBeginning) {
+ index = 0;
+ } else {
+ index = q.size();
+ }
+ }
+
+ long WAITTIMEMS = 100;
+
+ boolean isValidIndex() {
+ return index >= 0 && index < queue.size();
+ }
+
+ @Override
+ public void run() {
+ logger.info("MockQueueManager thread " + name + " started");
+
+ while (!cancelled) {
+ // logger.info("Queue consumer " + name + ": checking index " + index + " on queue of size " + queue.size());
+ if (!isValidIndex()) {
+ try {
+ Thread.sleep(WAITTIMEMS);
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ }
+ } else {
+ Object obj = queue.get(index);
+ String msg;
+ try {
+ msg = JSONUtils.toJSON(obj);
+ } catch (JSONException e) {
+ e.printStackTrace();
+ cancelled = true;
+ return;
+ }
+ this.processor.process(service, msg, 0, index);
+ logger.finest("MockQConsumer " + name + ": Processed message: " + msg);
+ index++;
+ }
+ }
+ logger.info("MockQueueManager thread finished");
+
+ }
+
+
+ @Override
+ public void setExecutorService(ExecutorService service) {
+ this.service = service;
+ }
+
+ @Override
+ public void cancel() {
+ cancelled = true;
+ }
+
+ @Override
+ public boolean isReady() {
+ return true;
+ }
+
+ }
+
+ ODFRunnable createQueueListener(String name, List<?> queue, QueueMessageProcessor qmp, boolean fromBeginning) {
+ return new MockQueueListener(name, queue, qmp, fromBeginning);
+ }
+
+ @Override
+ public void enqueueInStatusQueue(StatusQueueEntry sqe) {
+ sqe = cloneObject(sqe);
+ statusQueue.add(sqe);
+ }
+
+ @Override
+ public void enqueueInAdminQueue(AdminMessage message) {
+ message = cloneObject(message);
+ adminQueue.add(message);
+ }
+
+ public static class MockMessagingStatus extends MessagingStatus {
+ String message;
+
+ public String getMessage() {
+ return message;
+ }
+
+ public void setMessage(String message) {
+ this.message = message;
+ }
+
+ }
+
+ @Override
+ public MessagingStatus getMessagingStatus() {
+ MockMessagingStatus mms = new MockMessagingStatus();
+ mms.setMessage("OK");
+ return mms;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/6d19e129/odf/odf-core/src/test/java/org/apache/atlas/odf/core/test/notification/NotificationManagerTest.java
----------------------------------------------------------------------
diff --git a/odf/odf-core/src/test/java/org/apache/atlas/odf/core/test/notification/NotificationManagerTest.java b/odf/odf-core/src/test/java/org/apache/atlas/odf/core/test/notification/NotificationManagerTest.java
new file mode 100755
index 0000000..f69513c
--- /dev/null
+++ b/odf/odf-core/src/test/java/org/apache/atlas/odf/core/test/notification/NotificationManagerTest.java
@@ -0,0 +1,72 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.atlas.odf.core.test.notification;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+import org.apache.atlas.odf.api.OpenDiscoveryFramework;
+import org.apache.atlas.odf.core.ODFInternalFactory;
+import org.apache.atlas.odf.core.notification.NotificationListener;
+import org.apache.atlas.odf.core.test.ODFTestBase;
+import org.junit.Assert;
+import org.junit.Test;
+
+import org.apache.atlas.odf.api.ODFFactory;
+import org.apache.atlas.odf.api.analysis.AnalysisRequestTrackerStatus.STATUS;
+import org.apache.atlas.odf.api.discoveryservice.AnalysisRequestTracker;
+import org.apache.atlas.odf.core.notification.NotificationManager;
+import org.apache.atlas.odf.core.test.controlcenter.ODFAPITest;
+
+public class NotificationManagerTest extends ODFTestBase {
+
+ @Test
+ public void testNotifications() throws Exception {
+ NotificationManager nm = new ODFInternalFactory().create(NotificationManager.class);
+ Assert.assertNotNull(nm);
+ log.info("Notification manager found " + nm.getClass().getName());
+ Assert.assertTrue(nm instanceof TestNotificationManager);
+ List<NotificationListener> listeners = nm.getListeners();
+ Assert.assertTrue(listeners.size() > 0);
+
+ OpenDiscoveryFramework odf = new ODFFactory().create();
+ List<String> dataSetIDs = Collections.singletonList("successID");
+ String id = ODFAPITest.runRequest(dataSetIDs, odf.getAnalysisManager());
+ ODFAPITest.waitForRequest(id, odf.getAnalysisManager());
+
+ int polls = 20;
+ boolean found = false;
+ boolean foundFinished = false;
+ do {
+ // now check that trackers were found through the notification mechanism
+ log.info("Checking that trackers were consumed, " + polls + " seconds left");
+ List<AnalysisRequestTracker> trackers = new ArrayList<>(TestNotificationManager.receivedTrackers);
+ log.info("Received trackers: " + trackers.size());
+ for (AnalysisRequestTracker tracker : trackers) {
+ String foundId = tracker.getRequest().getId();
+ if (foundId.equals(id)) {
+ found = true;
+ if (tracker.getStatus().equals(STATUS.FINISHED)) {
+ foundFinished = true;
+ }
+ }
+ }
+ polls--;
+ Thread.sleep(1000);
+ } while (!found && !foundFinished && polls > 0);
+ Assert.assertTrue(found);
+ Assert.assertTrue(foundFinished);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/6d19e129/odf/odf-core/src/test/java/org/apache/atlas/odf/core/test/notification/TestNotificationManager.java
----------------------------------------------------------------------
diff --git a/odf/odf-core/src/test/java/org/apache/atlas/odf/core/test/notification/TestNotificationManager.java b/odf/odf-core/src/test/java/org/apache/atlas/odf/core/test/notification/TestNotificationManager.java
new file mode 100755
index 0000000..80252d6
--- /dev/null
+++ b/odf/odf-core/src/test/java/org/apache/atlas/odf/core/test/notification/TestNotificationManager.java
@@ -0,0 +1,66 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.atlas.odf.core.test.notification;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+import org.apache.atlas.odf.api.OpenDiscoveryFramework;
+import org.apache.atlas.odf.api.discoveryservice.AnalysisRequestTracker;
+import org.apache.atlas.odf.core.controlcenter.StatusQueueEntry;
+import org.apache.atlas.odf.core.notification.NotificationListener;
+import org.apache.atlas.odf.core.notification.NotificationManager;
+import org.apache.atlas.odf.json.JSONUtils;
+import org.apache.wink.json4j.JSONException;
+
+public class TestNotificationManager implements NotificationManager {
+
+ public static class TestListener1 implements NotificationListener {
+
+ @Override
+ public String getTopicName() {
+ return "odf-status-topic";
+ }
+
+ @Override
+ public void onEvent(String event, OpenDiscoveryFramework odf) {
+ try {
+ StatusQueueEntry sqe = JSONUtils.fromJSON(event, StatusQueueEntry.class);
+ AnalysisRequestTracker tracker = sqe.getAnalysisRequestTracker();
+ if (tracker != null) {
+ receivedTrackers.add(tracker);
+ }
+ } catch (JSONException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ @Override
+ public String getName() {
+ return this.getClass().getName();
+ }
+
+ }
+
+ public static List<AnalysisRequestTracker> receivedTrackers = Collections.synchronizedList(new ArrayList<AnalysisRequestTracker>());
+
+ @Override
+ public List<NotificationListener> getListeners() {
+ List<NotificationListener> result = new ArrayList<>();
+ result.add(new TestListener1());
+ return result;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/6d19e129/odf/odf-core/src/test/java/org/apache/atlas/odf/core/test/runtime/RuntimeExtensionTest.java
----------------------------------------------------------------------
diff --git a/odf/odf-core/src/test/java/org/apache/atlas/odf/core/test/runtime/RuntimeExtensionTest.java b/odf/odf-core/src/test/java/org/apache/atlas/odf/core/test/runtime/RuntimeExtensionTest.java
new file mode 100755
index 0000000..8a8d9a8
--- /dev/null
+++ b/odf/odf-core/src/test/java/org/apache/atlas/odf/core/test/runtime/RuntimeExtensionTest.java
@@ -0,0 +1,114 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.atlas.odf.core.test.runtime;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import org.apache.atlas.odf.api.ODFFactory;
+import org.apache.atlas.odf.api.OpenDiscoveryFramework;
+import org.apache.atlas.odf.api.analysis.AnalysisRequest;
+import org.apache.atlas.odf.api.analysis.AnalysisRequestStatus.State;
+import org.apache.atlas.odf.api.analysis.AnalysisResponse;
+import org.apache.atlas.odf.core.controlcenter.ServiceRuntime;
+import org.apache.atlas.odf.core.controlcenter.ServiceRuntimes;
+import org.apache.atlas.odf.core.test.ODFTestBase;
+import org.apache.atlas.odf.core.test.controlcenter.ODFAPITest;
+
+public class RuntimeExtensionTest extends ODFTestBase {
+
+ static final String SERVICE_ON_TEST_RUNTIME = "testruntimeservice";
+
+ List<String> getNames(List<ServiceRuntime> rts) {
+ List<String> result = new ArrayList<>();
+ for (ServiceRuntime rt : rts) {
+ result.add(rt.getName());
+ }
+ return result;
+ }
+
+ @Test
+ public void testActiveRuntimes() {
+ List<String> allNames = getNames(ServiceRuntimes.getAllRuntimes());
+ Assert.assertTrue(allNames.contains(TestServiceRuntime.TESTSERVICERUNTIME_NAME));
+
+ List<String> activeNames = getNames(ServiceRuntimes.getActiveRuntimes());
+ Assert.assertTrue(activeNames.contains(TestServiceRuntime.TESTSERVICERUNTIME_NAME));
+ }
+
+ @Test
+ public void testRuntimeForNewService() {
+ ServiceRuntime rt = ServiceRuntimes.getRuntimeForDiscoveryService(SERVICE_ON_TEST_RUNTIME);
+ Assert.assertNotNull(rt);
+ Assert.assertEquals(TestServiceRuntime.TESTSERVICERUNTIME_NAME, rt.getName());
+ }
+
+ static Object lock = new Object();
+
+ @Test
+ public void testRuntimeExtensionSimple() throws Exception {
+ synchronized (lock) {
+ OpenDiscoveryFramework odf = new ODFFactory().create();
+ TestServiceRuntime.runtimeBlocked = false;
+ AnalysisRequest request = ODFAPITest.createAnalysisRequest(Collections.singletonList(ODFAPITest.DUMMY_SUCCESS_ID));
+ request.setDiscoveryServiceSequence(Collections.singletonList(SERVICE_ON_TEST_RUNTIME));
+ log.info("Starting service for test runtime");
+ AnalysisResponse resp = odf.getAnalysisManager().runAnalysis(request);
+ String requestId = resp.getId();
+ Assert.assertTrue(ODFAPITest.waitForRequest(requestId, odf.getAnalysisManager(), 40, State.FINISHED));
+ Assert.assertTrue(TestServiceRuntime.requests.contains(requestId));
+ log.info("testRuntimeExtensionSimple finished");
+
+ // block runtime again to restore state before testcase
+ TestServiceRuntime.runtimeBlocked = true;
+ Thread.sleep(5000);
+ }
+ }
+
+ @Test
+ public void testBlockedRuntimeExtension() throws Exception {
+ synchronized (lock) {
+ OpenDiscoveryFramework odf = new ODFFactory().create();
+ TestServiceRuntime.runtimeBlocked = true;
+ AnalysisRequest request = ODFAPITest.createAnalysisRequest(Collections.singletonList(ODFAPITest.DUMMY_SUCCESS_ID));
+ request.setDiscoveryServiceSequence(Collections.singletonList(SERVICE_ON_TEST_RUNTIME));
+ log.info("Starting service for test runtime");
+ AnalysisResponse resp = odf.getAnalysisManager().runAnalysis(request);
+ String requestId = resp.getId();
+ Assert.assertFalse(resp.isInvalidRequest());
+ log.info("Checking that service is not called");
+ for (int i = 0; i < 5; i++) {
+ Assert.assertFalse(TestServiceRuntime.requests.contains(requestId));
+ Thread.sleep(1000);
+ }
+ log.info("Unblocking runtime...");
+ TestServiceRuntime.runtimeBlocked = false;
+ Thread.sleep(5000); // give service time to start consumption
+ log.info("Checking that request has finished");
+ Assert.assertTrue(ODFAPITest.waitForRequest(requestId, odf.getAnalysisManager(), 40, State.FINISHED));
+ log.info("Checking that service was called");
+ Assert.assertTrue(TestServiceRuntime.requests.contains(requestId));
+ log.info("testBlockedRuntimeExtension finished");
+
+ // block runtime again to restore state before testcase
+ TestServiceRuntime.runtimeBlocked = true;
+ Thread.sleep(5000);
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/6d19e129/odf/odf-core/src/test/java/org/apache/atlas/odf/core/test/runtime/TestServiceRuntime.java
----------------------------------------------------------------------
diff --git a/odf/odf-core/src/test/java/org/apache/atlas/odf/core/test/runtime/TestServiceRuntime.java b/odf/odf-core/src/test/java/org/apache/atlas/odf/core/test/runtime/TestServiceRuntime.java
new file mode 100755
index 0000000..d16e10a
--- /dev/null
+++ b/odf/odf-core/src/test/java/org/apache/atlas/odf/core/test/runtime/TestServiceRuntime.java
@@ -0,0 +1,80 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.atlas.odf.core.test.runtime;
+
+import java.util.HashSet;
+import java.util.Set;
+import java.util.logging.Logger;
+
+import org.apache.atlas.odf.api.discoveryservice.DiscoveryService;
+import org.apache.atlas.odf.api.discoveryservice.DiscoveryServiceProperties;
+import org.apache.atlas.odf.api.discoveryservice.DiscoveryServiceRequest;
+import org.apache.atlas.odf.api.discoveryservice.SyncDiscoveryServiceBase;
+import org.apache.atlas.odf.api.discoveryservice.sync.DiscoveryServiceSyncResponse;
+import org.apache.atlas.odf.api.settings.validation.ValidationException;
+import org.apache.atlas.odf.core.controlcenter.ServiceRuntime;
+import org.apache.atlas.odf.core.test.ODFTestLogger;
+import org.apache.atlas.odf.api.discoveryservice.DiscoveryServiceResponse;
+
+public class TestServiceRuntime implements ServiceRuntime {
+
+ static Logger logger = ODFTestLogger.get();
+
+ public static final String TESTSERVICERUNTIME_NAME = "TestServiceRuntime";
+
+ public static boolean runtimeBlocked = true;
+
+ @Override
+ public String getName() {
+ return TESTSERVICERUNTIME_NAME;
+ }
+
+ @Override
+ public long getWaitTimeUntilAvailable() {
+ if (runtimeBlocked) {
+ return 1000;
+ }
+ return 0;
+ }
+
+ public static Set<String> requests = new HashSet<>();
+
+ public static class DSProxy extends SyncDiscoveryServiceBase {
+
+ @Override
+ public DiscoveryServiceSyncResponse runAnalysis(DiscoveryServiceRequest request) {
+ logger.info("Running test runtime service");
+ requests.add(request.getOdfRequestId());
+ DiscoveryServiceSyncResponse resp = new DiscoveryServiceSyncResponse();
+ resp.setCode(DiscoveryServiceResponse.ResponseCode.OK);
+ resp.setDetails("Test success");
+ return resp;
+ }
+ }
+
+ @Override
+ public DiscoveryService createDiscoveryServiceProxy(DiscoveryServiceProperties props) {
+ return new DSProxy();
+ }
+
+ @Override
+ public String getDescription() {
+ return "TestServiceRuntime description";
+ }
+
+ @Override
+ public void validate(DiscoveryServiceProperties props) throws ValidationException {
+ }
+
+}