You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@griffin.apache.org by gu...@apache.org on 2018/01/22 01:51:51 UTC
[1/3] incubator-griffin git commit: fix transaction and schedule bug
and update ut
Repository: incubator-griffin
Updated Branches:
refs/heads/master 87e59a527 -> 71fcf93b9
http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/71fcf93b/service/src/test/java/org/apache/griffin/core/measure/MeasureOrgServiceImplTest.java
----------------------------------------------------------------------
diff --git a/service/src/test/java/org/apache/griffin/core/measure/MeasureOrgServiceImplTest.java b/service/src/test/java/org/apache/griffin/core/measure/MeasureOrgServiceImplTest.java
index 7eda50e..939c0da 100644
--- a/service/src/test/java/org/apache/griffin/core/measure/MeasureOrgServiceImplTest.java
+++ b/service/src/test/java/org/apache/griffin/core/measure/MeasureOrgServiceImplTest.java
@@ -1,98 +1,114 @@
-///*
-//Licensed to the Apache Software Foundation (ASF) under one
-//or more contributor license agreements. See the NOTICE file
-//distributed with this work for additional information
-//regarding copyright ownership. The ASF licenses this file
-//to you under the Apache License, Version 2.0 (the
-//"License"); you may not use this file except in compliance
-//with the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-//Unless required by applicable law or agreed to in writing,
-//software distributed under the License is distributed on an
-//"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-//KIND, either express or implied. See the License for the
-//specific language governing permissions and limitations
-//under the License.
-//*/
-//
-//package org.apache.griffin.core.measure;
-//
-//
-//import org.apache.griffin.core.measure.entity.Measure;
-//import org.apache.griffin.core.measure.repo.MeasureRepo;
-//import org.junit.Test;
-//import org.junit.runner.RunWith;
-//import org.mockito.InjectMocks;
-//import org.mockito.Mock;
-//import org.springframework.test.context.junit4.SpringRunner;
-//
-//import java.io.Serializable;
-//import java.util.*;
-//
-//import static org.apache.griffin.core.util.EntityHelper.createATestGriffinMeasure;
-//import static org.apache.griffin.core.util.EntityHelper.createJobDetailMap;
-//import static org.assertj.core.api.Assertions.assertThat;
-//import static org.mockito.BDDMockito.given;
-//import static org.mockito.Mockito.when;
-//
-//@RunWith(SpringRunner.class)
-//public class MeasureOrgServiceImplTest {
-//
-// @InjectMocks
-// private MeasureOrgServiceImpl service;
-//
-// @Mock
-// private MeasureRepo measureRepo;
-//
-// @Test
-// public void testGetOrgs() {
-// String orgName = "orgName";
-// given(measureRepo.findOrganizations(false)).willReturn(Arrays.asList(orgName));
-// List<String> orgs = service.getOrgs();
-// assertThat(orgs.size()).isEqualTo(1);
-// assertThat(orgs.get(0)).isEqualTo(orgName);
-// }
-//
-// @Test
-// public void testGetMetricNameListByOrg() {
-// String orgName = "orgName";
-// String measureName = "measureName";
-// given(measureRepo.findNameByOrganization(orgName, false)).willReturn(Arrays.asList(measureName));
-// List<String> measureNames = service.getMetricNameListByOrg(orgName);
-// assertThat(measureNames.size()).isEqualTo(1);
-// assertThat(measureNames.get(0)).isEqualTo(measureName);
-// }
-//
-// @Test
-// public void testGetMeasureNamesGroupByOrg() throws Exception {
-// Measure measure = createATestGriffinMeasure("measure", "org");
-// List<Measure> measures = new ArrayList<>();
-// measures.add(measure);
-//
-// when(measureRepo.findByDeleted(false)).thenReturn(measures);
-//
-// Map<String, List<String>> map = service.getMeasureNamesGroupByOrg();
-// assertThat(map.size()).isEqualTo(1);
-//
-// }
-//
-// @Test
-// public void testMeasureWithJobDetailsGroupByOrg() throws Exception {
-// Measure measure = createATestGriffinMeasure("measure", "org");
-// measure.setId(1L);
-// given(measureRepo.findByDeleted(false)).willReturn(Arrays.asList(measure));
-//
-// Map<String, Object> jobDetail = createJobDetailMap();
-// List<Map<String, Object>> jobList = Arrays.asList(jobDetail);
-// Map<String, List<Map<String, Object>>> measuresById = new HashMap<>();
-// measuresById.put("1", jobList);
-//
-// Map<String, Map<String, List<Map<String, Object>>>> map = service.getMeasureWithJobDetailsGroupByOrg(measuresById);
-// assertThat(map.size()).isEqualTo(1);
-// assertThat(map).containsKey("org");
-// assertThat(map.get("org").get("measure")).isEqualTo(jobList);
-// }
-//
-//}
\ No newline at end of file
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+
+package org.apache.griffin.core.measure;
+
+
+import org.apache.griffin.core.measure.entity.GriffinMeasure;
+import org.apache.griffin.core.measure.repo.GriffinMeasureRepo;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.InjectMocks;
+import org.mockito.Mock;
+import org.springframework.test.context.junit4.SpringRunner;
+
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static org.apache.griffin.core.util.EntityHelper.createGriffinMeasure;
+import static org.apache.griffin.core.util.EntityHelper.createJobDetailMap;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.mockito.BDDMockito.given;
+import static org.mockito.Mockito.when;
+
+@RunWith(SpringRunner.class)
+public class MeasureOrgServiceImplTest {
+
+ @InjectMocks
+ private MeasureOrgServiceImpl service;
+
+ @Mock
+ private GriffinMeasureRepo measureRepo;
+
+ @Test
+ public void testGetOrgs() {
+ String orgName = "orgName";
+ given(measureRepo.findOrganizations(false)).willReturn(Arrays.asList(orgName));
+ List<String> orgs = service.getOrgs();
+ assertThat(orgs.size()).isEqualTo(1);
+ assertThat(orgs.get(0)).isEqualTo(orgName);
+ }
+
+ @Test
+ public void testGetMetricNameListByOrg() {
+ String orgName = "orgName";
+ String measureName = "measureName";
+ given(measureRepo.findNameByOrganization(orgName, false)).willReturn(Arrays.asList(measureName));
+ List<String> measureNames = service.getMetricNameListByOrg(orgName);
+ assertThat(measureNames.size()).isEqualTo(1);
+ assertThat(measureNames.get(0)).isEqualTo(measureName);
+ }
+
+ @Test
+ public void testGetMeasureNamesGroupByOrg() throws Exception {
+ GriffinMeasure measure = createGriffinMeasure("measure");
+ when(measureRepo.findByDeleted(false)).thenReturn(Arrays.asList(measure));
+ Map<String, List<String>> map = service.getMeasureNamesGroupByOrg();
+ assertThat(map.size()).isEqualTo(1);
+ }
+
+ @Test
+ public void testGetMeasureNamesGroupByOrgWithNull() throws Exception {
+ when(measureRepo.findByDeleted(false)).thenReturn(null);
+ Map<String, List<String>> map = service.getMeasureNamesGroupByOrg();
+ assert map == null;
+ }
+
+ @Test
+ public void testGetMeasureWithJobDetailsGroupByOrgForSuccess() throws Exception {
+ String measureName = "measureName";
+ String measureId = "1";
+ GriffinMeasure measure = createGriffinMeasure(measureName);
+ measure.setOrganization("org");
+ measure.setId(Long.valueOf(measureId));
+ given(measureRepo.findByDeleted(false)).willReturn(Arrays.asList(measure));
+
+ Map<String, Object> jobDetail = createJobDetailMap();
+
+ List<Map<String, Object>> jobList = Arrays.asList(jobDetail);
+ Map<String, List<Map<String, Object>>> measuresById = new HashMap<>();
+ measuresById.put(measureId, jobList);
+
+ Map<String, Map<String, List<Map<String, Object>>>> map = service.getMeasureWithJobDetailsGroupByOrg(measuresById);
+ assertThat(map.size()).isEqualTo(1);
+ assertThat(map).containsKey("org");
+ assertThat(map.get("org").get(measureName)).isEqualTo(jobList);
+ }
+
+ @Test
+ public void testGetMeasureWithJobDetailsGroupByOrgForFailure() throws Exception {
+ Map detail = new HashMap();
+ given(measureRepo.findByDeleted(false)).willReturn(null);
+ Map map = service.getMeasureWithJobDetailsGroupByOrg(detail);
+ assert map == null;
+ }
+
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/71fcf93b/service/src/test/java/org/apache/griffin/core/measure/MeasureServiceImplTest.java
----------------------------------------------------------------------
diff --git a/service/src/test/java/org/apache/griffin/core/measure/MeasureServiceImplTest.java b/service/src/test/java/org/apache/griffin/core/measure/MeasureServiceImplTest.java
index c49ee82..857d35f 100644
--- a/service/src/test/java/org/apache/griffin/core/measure/MeasureServiceImplTest.java
+++ b/service/src/test/java/org/apache/griffin/core/measure/MeasureServiceImplTest.java
@@ -21,155 +21,309 @@ package org.apache.griffin.core.measure;
import org.apache.griffin.core.job.JobServiceImpl;
+import org.apache.griffin.core.job.repo.VirtualJobRepo;
+import org.apache.griffin.core.measure.entity.DataConnector;
+import org.apache.griffin.core.measure.entity.ExternalMeasure;
+import org.apache.griffin.core.measure.entity.GriffinMeasure;
import org.apache.griffin.core.measure.entity.Measure;
import org.apache.griffin.core.measure.repo.DataConnectorRepo;
+import org.apache.griffin.core.measure.repo.ExternalMeasureRepo;
+import org.apache.griffin.core.measure.repo.GriffinMeasureRepo;
import org.apache.griffin.core.measure.repo.MeasureRepo;
import org.apache.griffin.core.util.GriffinOperationMessage;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
-import org.mockito.InjectMocks;
-import org.mockito.Mock;
+import org.mockito.Matchers;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.boot.test.context.TestConfiguration;
+import org.springframework.boot.test.mock.mockito.MockBean;
+import org.springframework.context.annotation.Bean;
import org.springframework.test.context.junit4.SpringRunner;
+import java.util.ArrayList;
import java.util.Arrays;
import java.util.LinkedList;
import java.util.List;
-import static org.apache.griffin.core.util.EntityHelper.createATestGriffinMeasure;
-import static org.assertj.core.api.Assertions.assertThat;
+import static org.apache.griffin.core.util.EntityHelper.*;
+import static org.apache.griffin.core.util.GriffinOperationMessage.*;
import static org.junit.Assert.assertEquals;
import static org.mockito.BDDMockito.given;
@RunWith(SpringRunner.class)
public class MeasureServiceImplTest {
+ @TestConfiguration
+ public static class MeasureServiceConf {
+ @Bean
+ public MeasureServiceImpl measureService() {
+ return new MeasureServiceImpl();
+ }
- @InjectMocks
+ @Bean(name = "griffinOperation")
+ public MeasureOperation griffinOperation() {
+ return new GriffinMeasureOperationImpl();
+ }
+
+ @Bean(name = "externalOperation")
+ public MeasureOperation externalOperation() {
+ return new ExternalMeasureOperationImpl();
+ }
+ }
+
+ @Autowired
private MeasureServiceImpl service;
- @Mock
- private MeasureRepo measureRepo;
- @Mock
+
+ @MockBean
+ private ExternalMeasureRepo externalMeasureRepo;
+
+ @MockBean
+ private GriffinMeasureRepo griffinMeasureRepo;
+
+ @MockBean
+ private MeasureRepo<Measure> measureRepo;
+
+ @MockBean
private JobServiceImpl jobService;
- @Mock
+ @MockBean
private DataConnectorRepo dataConnectorRepo;
+ @MockBean
+ private VirtualJobRepo jobRepo;
+
@Before
public void setup() {
}
@Test
public void testGetAllMeasures() throws Exception {
- Measure measure = createATestGriffinMeasure("view_item_hourly", "test");
+ Measure measure = createGriffinMeasure("view_item_hourly");
given(measureRepo.findByDeleted(false)).willReturn(Arrays.asList(measure));
- List<Measure> measures = (List<Measure>) service.getAllAliveMeasures();
- assertThat(measures.size()).isEqualTo(1);
- assertThat(measures.get(0).getName()).isEqualTo("view_item_hourly");
+ List<Measure> measures = service.getAllAliveMeasures();
+ assertEquals(measures.size(), 1);
+ assertEquals(measures.get(0).getName(), "view_item_hourly");
}
@Test
public void testGetMeasuresById() throws Exception {
- Measure measure = createATestGriffinMeasure("view_item_hourly", "test");
+ Measure measure = createGriffinMeasure("view_item_hourly");
given(measureRepo.findByIdAndDeleted(1L, false)).willReturn(measure);
Measure m = service.getMeasureById(1);
assertEquals(m.getName(), measure.getName());
}
+ @Test
+ public void testGetAliveMeasuresByOwner() throws Exception {
+ String owner = "test";
+ Measure measure = createGriffinMeasure("view_item_hourly");
+ given(measureRepo.findByOwnerAndDeleted(owner, false)).willReturn(Arrays.asList(measure));
+ List<Measure> measures = service.getAliveMeasuresByOwner(owner);
+ assertEquals(measures.get(0).getName(), measure.getName());
+ }
+
+
+ @Test
+ public void testDeleteMeasuresByIdForGriffinSuccess() throws Exception {
+ GriffinMeasure measure = createGriffinMeasure("view_item_hourly");
+ measure.setId(1L);
+ given(measureRepo.findByIdAndDeleted(measure.getId(), false)).willReturn(measure);
+ given(jobService.deleteJobsRelateToMeasure(measure.getId())).willReturn(true);
+ GriffinOperationMessage message = service.deleteMeasureById(measure.getId());
+ assertEquals(message, DELETE_MEASURE_BY_ID_SUCCESS);
+ }
+
+ @Test
+ public void testDeleteMeasuresByIdForGriffinFailureWithPause() throws Exception {
+ GriffinMeasure measure = createGriffinMeasure("view_item_hourly");
+ measure.setId(1L);
+ given(measureRepo.findByIdAndDeleted(measure.getId(), false)).willReturn(measure);
+ given(jobService.deleteJobsRelateToMeasure(measure.getId())).willReturn(false);
+ GriffinOperationMessage message = service.deleteMeasureById(measure.getId());
+ assertEquals(message, DELETE_MEASURE_BY_ID_FAIL);
+ }
+
+ @Test
+ public void testDeleteMeasuresByIdForGriffinFailureWithException() throws Exception {
+ GriffinMeasure measure = createGriffinMeasure("view_item_hourly");
+ measure.setId(1L);
+ given(measureRepo.findByIdAndDeleted(measure.getId(), false)).willReturn(measure);
+ given(jobService.deleteJobsRelateToMeasure(measure.getId())).willReturn(true);
+ given(measureRepo.save(Matchers.any(Measure.class))).willThrow(Exception.class);
+ GriffinOperationMessage message = service.deleteMeasureById(measure.getId());
+ assertEquals(message, DELETE_MEASURE_BY_ID_FAIL);
+ }
+
+ @Test
+ public void testDeleteMeasuresByIdForExternalSuccess() throws Exception {
+ ExternalMeasure measure = createExternalMeasure("externalMeasure");
+ measure.setId(1L);
+ given(measureRepo.findByIdAndDeleted(measure.getId(), false)).willReturn(measure);
+ GriffinOperationMessage message = service.deleteMeasureById(measure.getId());
+ assertEquals(message, DELETE_MEASURE_BY_ID_SUCCESS);
+ }
-// @Test
-// public void testDeleteMeasuresByIdForSuccess() throws Exception {
-// GriffinMeasure measure = createATestGriffinMeasure("view_item_hourly", "test");
-// given(measureRepo.findByIdAndDeleted(measure.getId(),false)).willReturn(measure);
-// given(jobService.deleteJobsRelateToMeasure(measure.getId())).willReturn(true);
-// GriffinOperationMessage message = service.deleteMeasureById(measure.getId());
-// assertEquals(message, GriffinOperationMessage.DELETE_MEASURE_BY_ID_SUCCESS);
-// }
+ @Test
+ public void testDeleteMeasuresByIdForExternalFailureWithException() throws Exception {
+ ExternalMeasure measure = createExternalMeasure("externalMeasure");
+ measure.setId(1L);
+ given(measureRepo.findByIdAndDeleted(measure.getId(), false)).willReturn(measure);
+ given(externalMeasureRepo.save(Matchers.any(ExternalMeasure.class))).willThrow(Exception.class);
+ GriffinOperationMessage message = service.deleteMeasureById(measure.getId());
+ assertEquals(message, DELETE_MEASURE_BY_ID_FAIL);
+ }
@Test
- public void testDeleteMeasuresByIdForNotFound() throws Exception {
- given(measureRepo.exists(1L)).willReturn(false);
+ public void testDeleteMeasuresByIdForFailureWithNotFound() throws Exception {
+ given(measureRepo.findByIdAndDeleted(1L, false)).willReturn(null);
GriffinOperationMessage message = service.deleteMeasureById(1L);
- assertEquals(message, GriffinOperationMessage.RESOURCE_NOT_FOUND);
- }
-
-// @Test
-// public void testCreateNewMeasureForSuccess() throws Exception {
-// String measureName = "view_item_hourly";
-// Measure measure = createATestGriffinMeasure(measureName, "test");
-// given(measureRepo.findByNameAndDeleted(measureName, false)).willReturn(new LinkedList<>());
-// given(measureRepo.save(measure)).willReturn(measure);
-// GriffinOperationMessage message = service.createMeasure(measure);
-// assertEquals(message, GriffinOperationMessage.CREATE_MEASURE_SUCCESS);
-// }
-
-// @Test
-// public void testCreateNewMeasureForFailureWithConnectorNameRepeat() throws Exception {
-// String measureName = "view_item_hourly";
-// Measure measure = createATestGriffinMeasure(measureName, "test");
-// given(measureRepo.findByNameAndDeleted(measureName, false)).willReturn(new LinkedList<>());
-// DataConnector dc = new DataConnector("name", "", "", "");
-// given(dataConnectorRepo.findByConnectorNames(Matchers.any())).willReturn(Arrays.asList(dc));
-// given(measureRepo.save(measure)).willReturn(measure);
-// GriffinOperationMessage message = service.createMeasure(measure);
-// assertEquals(message, GriffinOperationMessage.CREATE_MEASURE_FAIL);
-// }
-
- @Test
- public void testCreateNewMeasureForFailWithMeasureDuplicate() throws Exception {
+ assertEquals(message, RESOURCE_NOT_FOUND);
+ }
+
+ @Test
+ public void testCreateMeasureForGriffinSuccess() throws Exception {
String measureName = "view_item_hourly";
- Measure measure = createATestGriffinMeasure(measureName, "test");
- LinkedList<Measure> list = new LinkedList<>();
- list.add(measure);
- given(measureRepo.findByNameAndDeleted(measureName, false)).willReturn(list);
+ GriffinMeasure measure = createGriffinMeasure(measureName);
+ given(measureRepo.findByNameAndDeleted(measureName, false)).willReturn(new ArrayList<>());
GriffinOperationMessage message = service.createMeasure(measure);
- assertEquals(message, GriffinOperationMessage.CREATE_MEASURE_FAIL_DUPLICATE);
+ assertEquals(message, CREATE_MEASURE_SUCCESS);
}
-// @Test
-// public void testCreateNewMeasureForFailWithSaveException() throws Exception {
-// String measureName = "view_item_hourly";
-// Measure measure = createATestGriffinMeasure(measureName, "test");
-// given(measureRepo.findByNameAndDeleted(measureName, false)).willReturn(new LinkedList<>());
-// given(measureRepo.save(measure)).willReturn(null);
-// GriffinOperationMessage message = service.createMeasure(measure);
-// assertEquals(message, GriffinOperationMessage.CREATE_MEASURE_FAIL);
-// }
+ @Test
+ public void testCreateMeasureForGriffinFailureWithConnectorExist() throws Exception {
+ String measureName = "view_item_hourly";
+ GriffinMeasure measure = createGriffinMeasure(measureName);
+ DataConnector dc = new DataConnector("source_name", "1h", "1.2", null);
+ given(measureRepo.findByNameAndDeleted(measureName, false)).willReturn(new LinkedList<>());
+ given(dataConnectorRepo.findByConnectorNames(Arrays.asList("source_name", "target_name"))).willReturn(Arrays.asList(dc));
+ GriffinOperationMessage message = service.createMeasure(measure);
+ assertEquals(message, CREATE_MEASURE_FAIL);
+ }
@Test
- public void testGetAllMeasureByOwner() throws Exception {
- String owner = "test";
- Measure measure = createATestGriffinMeasure("view_item_hourly", "test");
- measure.setId(1L);
- given(measureRepo.findByOwnerAndDeleted(owner, false)).willReturn(Arrays.asList(measure));
- List<Measure> list = service.getAliveMeasuresByOwner(owner);
- assertEquals(list.get(0).getName(), measure.getName());
+ public void testCreateMeasureForGriffinFailureWithConnectorNull() throws Exception {
+ String measureName = "view_item_hourly";
+ DataConnector dcSource = createDataConnector(null, "default", "test_data_src", "dt=#YYYYMMdd# AND hour=#HH#");
+ DataConnector dcTarget = createDataConnector(null, "default", "test_data_tgt", "dt=#YYYYMMdd# AND hour=#HH#");
+ GriffinMeasure measure = createGriffinMeasure(measureName, dcSource, dcTarget);
+ given(measureRepo.findByNameAndDeleted(measureName, false)).willReturn(new LinkedList<>());
+ GriffinOperationMessage message = service.createMeasure(measure);
+ assertEquals(message, CREATE_MEASURE_FAIL);
+ }
+
+ @Test
+ public void testCreateMeasureForGriffinFailureWithException() throws Exception {
+ String measureName = "view_item_hourly";
+ GriffinMeasure measure = createGriffinMeasure(measureName);
+ given(measureRepo.findByNameAndDeleted(measureName, false)).willReturn(new ArrayList<>());
+ given(measureRepo.save(Matchers.any(Measure.class))).willThrow(Exception.class);
+ GriffinOperationMessage message = service.createMeasure(measure);
+ assertEquals(message, CREATE_MEASURE_FAIL);
+ }
+
+ @Test
+ public void testCreateMeasureForExternalSuccess() throws Exception {
+ String measureName = "view_item_hourly";
+ ExternalMeasure measure = createExternalMeasure(measureName);
+ given(measureRepo.findByNameAndDeleted(measureName, false)).willReturn(new ArrayList<>());
+ given(externalMeasureRepo.save(measure)).willReturn(measure);
+ GriffinOperationMessage message = service.createMeasure(measure);
+ assertEquals(message, CREATE_MEASURE_SUCCESS);
+ }
+
+ @Test
+ public void testCreateMeasureForExternalFailureWithBlank() throws Exception {
+ String measureName = "view_item_hourly";
+ ExternalMeasure measure = createExternalMeasure(measureName);
+ measure.setMetricName(" ");
+ given(measureRepo.findByNameAndDeleted(measureName, false)).willReturn(new ArrayList<>());
+ GriffinOperationMessage message = service.createMeasure(measure);
+ assertEquals(message, CREATE_MEASURE_FAIL);
+ }
+
+ @Test
+ public void testCreateMeasureForExternalFailureWithException() throws Exception {
+ String measureName = "view_item_hourly";
+ ExternalMeasure measure = createExternalMeasure(measureName);
+ given(measureRepo.findByNameAndDeleted(measureName, false)).willReturn(new ArrayList<>());
+ given(externalMeasureRepo.save(measure)).willReturn(measure);
+ given(externalMeasureRepo.save(Matchers.any(ExternalMeasure.class))).willThrow(Exception.class);
+ GriffinOperationMessage message = service.createMeasure(measure);
+ assertEquals(message, CREATE_MEASURE_FAIL);
+ }
+
+ @Test
+ public void testCreateMeasureForFailureWithRepeat() throws Exception {
+ String measureName = "view_item_hourly";
+ GriffinMeasure measure = createGriffinMeasure(measureName);
+ given(measureRepo.findByNameAndDeleted(measureName, false)).willReturn(Arrays.asList(measure));
+ GriffinOperationMessage message = service.createMeasure(measure);
+ assertEquals(message, CREATE_MEASURE_FAIL_DUPLICATE);
}
-// @Test
-// public void testUpdateMeasureForSuccess() throws Exception {
-// Measure measure = createATestGriffinMeasure("view_item_hourly", "test");
-// given(measureRepo.findByIdAndDeleted(measure.getId(), false)).willReturn(new GriffinMeasure());
-// given(measureRepo.save(measure)).willReturn(measure);
-// GriffinOperationMessage message = service.updateMeasure(measure);
-// assertEquals(message, GriffinOperationMessage.UPDATE_MEASURE_SUCCESS);
-// }
@Test
- public void testUpdateMeasureForNotFound() throws Exception {
- Measure measure = createATestGriffinMeasure("view_item_hourly", "test");
+ public void testUpdateMeasureForGriffinSuccess() throws Exception {
+ Measure measure = createGriffinMeasure("view_item_hourly");
+ given(measureRepo.findByIdAndDeleted(measure.getId(), false)).willReturn(measure);
+ GriffinOperationMessage message = service.updateMeasure(measure);
+ assertEquals(message, UPDATE_MEASURE_SUCCESS);
+ }
+
+ @Test
+ public void testUpdateMeasureForGriffinFailureWithDiffType() throws Exception {
+ Measure griffinMeasure = createGriffinMeasure("view_item_hourly");
+ Measure externalMeasure = createExternalMeasure("externalName");
+ given(measureRepo.findByIdAndDeleted(griffinMeasure.getId(), false)).willReturn(externalMeasure);
+ GriffinOperationMessage message = service.updateMeasure(griffinMeasure);
+ assertEquals(message, UPDATE_MEASURE_FAIL);
+ }
+
+ @Test
+ public void testUpdateMeasureForGriffinFailureWithNotFound() throws Exception {
+ Measure measure = createGriffinMeasure("view_item_hourly");
given(measureRepo.findByIdAndDeleted(measure.getId(), false)).willReturn(null);
GriffinOperationMessage message = service.updateMeasure(measure);
- assertEquals(message, GriffinOperationMessage.RESOURCE_NOT_FOUND);
+ assertEquals(message, RESOURCE_NOT_FOUND);
+ }
+
+ @Test
+ public void testUpdateMeasureForGriffinFailureWithException() throws Exception {
+ Measure measure = createGriffinMeasure("view_item_hourly");
+ given(measureRepo.findByIdAndDeleted(measure.getId(), false)).willReturn(measure);
+ given(measureRepo.save(Matchers.any(Measure.class))).willThrow(Exception.class);
+ GriffinOperationMessage message = service.updateMeasure(measure);
+ assertEquals(message, UPDATE_MEASURE_FAIL);
}
-// @Test
-// public void testUpdateMeasureForFailWithSaveException() throws Exception {
-// Measure measure = createATestGriffinMeasure("view_item_hourly", "test");
-// given(measureRepo.findByIdAndDeleted(measure.getId(), false)).willReturn(new GriffinMeasure());
-// given(measureRepo.save(measure)).willThrow(Exception.class);
-// GriffinOperationMessage message = service.updateMeasure(measure);
-// assertEquals(message, GriffinOperationMessage.UPDATE_MEASURE_FAIL);
-// }
+ @Test
+ public void testUpdateMeasureForExternalSuccess() throws Exception {
+ ExternalMeasure measure = createExternalMeasure("external_view_item_hourly");
+ given(measureRepo.findByIdAndDeleted(measure.getId(), false)).willReturn(measure);
+ given(externalMeasureRepo.findOne(measure.getId())).willReturn(measure);
+ GriffinOperationMessage message = service.updateMeasure(measure);
+ assertEquals(message, UPDATE_MEASURE_SUCCESS);
+ }
+
+ @Test
+ public void testUpdateMeasureForExternalFailureWithBlank() throws Exception {
+ String measureName = "view_item_hourly";
+ ExternalMeasure measure = createExternalMeasure(measureName);
+ measure.setMetricName(" ");
+ given(measureRepo.findByIdAndDeleted(measure.getId(), false)).willReturn(measure);
+ GriffinOperationMessage message = service.updateMeasure(measure);
+ assertEquals(message, UPDATE_MEASURE_FAIL);
+ }
+
+ @Test
+ public void testUpdateMeasureForExternalFailWithException() throws Exception {
+ ExternalMeasure measure = createExternalMeasure("external_view_item_hourly");
+ given(measureRepo.findByIdAndDeleted(measure.getId(), false)).willReturn(measure);
+ given(externalMeasureRepo.findOne(measure.getId())).willReturn(measure);
+ given(externalMeasureRepo.save(Matchers.any(ExternalMeasure.class))).willThrow(Exception.class);
+ GriffinOperationMessage message = service.updateMeasure(measure);
+ assertEquals(message, GriffinOperationMessage.UPDATE_MEASURE_FAIL);
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/71fcf93b/service/src/test/java/org/apache/griffin/core/measure/repo/DataConnectorRepoTest.java
----------------------------------------------------------------------
diff --git a/service/src/test/java/org/apache/griffin/core/measure/repo/DataConnectorRepoTest.java b/service/src/test/java/org/apache/griffin/core/measure/repo/DataConnectorRepoTest.java
new file mode 100644
index 0000000..78ac6a2
--- /dev/null
+++ b/service/src/test/java/org/apache/griffin/core/measure/repo/DataConnectorRepoTest.java
@@ -0,0 +1,80 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+
+package org.apache.griffin.core.measure.repo;
+
+import org.apache.griffin.core.measure.entity.DataConnector;
+import org.apache.griffin.core.measure.entity.Measure;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.boot.test.autoconfigure.orm.jpa.DataJpaTest;
+import org.springframework.boot.test.autoconfigure.orm.jpa.TestEntityManager;
+import org.springframework.test.context.junit4.SpringRunner;
+
+import java.util.Arrays;
+import java.util.List;
+
+import static org.apache.griffin.core.util.EntityHelper.createDataConnector;
+import static org.apache.griffin.core.util.EntityHelper.createGriffinMeasure;
+import static org.junit.Assert.*;
+
+@RunWith(SpringRunner.class)
+@DataJpaTest
+public class DataConnectorRepoTest {
+
+ @Autowired
+ private TestEntityManager entityManager;
+
+ @Autowired
+ private DataConnectorRepo dcRepo;
+
+ @Before
+ public void setup() throws Exception {
+ entityManager.clear();
+ entityManager.flush();
+ setEntityManager();
+ }
+
+ @Test
+ public void testFindByConnectorNames() throws Exception {
+ List<DataConnector> connectors = dcRepo.findByConnectorNames(Arrays.asList("name1", "name2"));
+ assertEquals(connectors.size(),2);
+ }
+
+ @Test
+ public void testFindByConnectorNamesWithNull() throws Exception {
+ List<DataConnector> connectors = dcRepo.findByConnectorNames(null);
+ assertEquals(connectors.size(),0);
+ }
+
+ public void setEntityManager() throws Exception {
+ DataConnector dc1 = createDataConnector("name1","database1","table1","/dt=#YYYYMM#");
+
+ entityManager.persistAndFlush(dc1);
+
+ DataConnector dc2 = createDataConnector("name2","database2","table2","/dt=#YYYYMM#");
+ entityManager.persistAndFlush(dc2);
+
+ DataConnector dc3 = createDataConnector("name3","database3","table3","/dt=#YYYYMM#");
+ entityManager.persistAndFlush(dc3);
+ }
+
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/71fcf93b/service/src/test/java/org/apache/griffin/core/measure/repo/MeasureRepoTest.java
----------------------------------------------------------------------
diff --git a/service/src/test/java/org/apache/griffin/core/measure/repo/MeasureRepoTest.java b/service/src/test/java/org/apache/griffin/core/measure/repo/MeasureRepoTest.java
index c7132e6..3f66cdf 100644
--- a/service/src/test/java/org/apache/griffin/core/measure/repo/MeasureRepoTest.java
+++ b/service/src/test/java/org/apache/griffin/core/measure/repo/MeasureRepoTest.java
@@ -1,85 +1,104 @@
-///*
-//Licensed to the Apache Software Foundation (ASF) under one
-//or more contributor license agreements. See the NOTICE file
-//distributed with this work for additional information
-//regarding copyright ownership. The ASF licenses this file
-//to you under the Apache License, Version 2.0 (the
-//"License"); you may not use this file except in compliance
-//with the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-//Unless required by applicable law or agreed to in writing,
-//software distributed under the License is distributed on an
-//"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-//KIND, either express or implied. See the License for the
-//specific language governing permissions and limitations
-//under the License.
-//*/
-//
-//package org.apache.griffin.core.measure.repo;
-//
-//import org.apache.griffin.core.measure.entity.Measure;
-//import org.junit.Before;
-//import org.junit.Test;
-//import org.junit.runner.RunWith;
-//import org.springframework.beans.factory.annotation.Autowired;
-//import org.springframework.boot.test.autoconfigure.orm.jpa.DataJpaTest;
-//import org.springframework.boot.test.autoconfigure.orm.jpa.TestEntityManager;
-//import org.springframework.test.context.junit4.SpringRunner;
-//
-//import java.util.List;
-//
-//import static org.apache.griffin.core.util.EntityHelper.createATestGriffinMeasure;
-//import static org.assertj.core.api.Assertions.assertThat;
-//
-//@RunWith(SpringRunner.class)
-//@DataJpaTest
-//public class MeasureRepoTest {
-//
-// @Autowired
-// private TestEntityManager entityManager;
-//
-// @Autowired
-// private MeasureRepo measureRepo;
-//
-// @Before
-// public void setup() throws Exception {
-// entityManager.clear();
-// entityManager.flush();
-// setEntityManager();
-// }
-//
-// @Test
-// public void testFindAllOrganizations() {
-// List<String> orgs = measureRepo.findOrganizations(false);
-// assertThat(orgs.size()).isEqualTo(3);
-// }
-//
-//
-// @Test
-// public void testFindNameByOrganization() {
-// List<String> orgs = measureRepo.findNameByOrganization("org1",false);
-// assertThat(orgs.size()).isEqualTo(1);
-// assertThat(orgs.get(0)).isEqualToIgnoringCase("m1");
-//
-// }
-//
-// @Test
-// public void testFindOrgByName() {
-// String org = measureRepo.findOrgByName("m2");
-// assertThat(org).isEqualTo("org2");
-// }
-//
-//
-// public void setEntityManager() throws Exception {
-// Measure measure = createATestGriffinMeasure("m1", "org1");
-// entityManager.persistAndFlush(measure);
-//
-// Measure measure2 = createATestGriffinMeasure("m2", "org2");
-// entityManager.persistAndFlush(measure2);
-//
-// Measure measure3 = createATestGriffinMeasure("m3", "org3");
-// entityManager.persistAndFlush(measure3);
-// }
-//}
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+
+package org.apache.griffin.core.measure.repo;
+
+import org.apache.griffin.core.measure.entity.Measure;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.boot.test.autoconfigure.orm.jpa.DataJpaTest;
+import org.springframework.boot.test.autoconfigure.orm.jpa.TestEntityManager;
+import org.springframework.test.context.junit4.SpringRunner;
+
+import java.util.List;
+
+import static org.apache.griffin.core.util.EntityHelper.createGriffinMeasure;
+import static org.assertj.core.api.Assertions.assertThat;
+
+@RunWith(SpringRunner.class)
+@DataJpaTest
+public class MeasureRepoTest {
+
+ @Autowired
+ private TestEntityManager entityManager;
+
+ @Autowired
+ private MeasureRepo measureRepo;
+
+ @Before
+ public void setup() throws Exception {
+ entityManager.clear();
+ entityManager.flush();
+ setEntityManager();
+ }
+
+ @Test
+ public void testFindByNameAndDeleted() {
+ String name = "m1";
+ List<Measure> measures = measureRepo.findByNameAndDeleted(name, false);
+ assertThat(measures.get(0).getName()).isEqualTo(name);
+ }
+
+ @Test
+ public void testFindByDeleted() {
+ List<Measure> measures = measureRepo.findByDeleted(false);
+ assertThat(measures.size()).isEqualTo(3);
+ }
+
+ @Test
+ public void testFindByOwnerAndDeleted() {
+ List<Measure> measures = measureRepo.findByOwnerAndDeleted("test", false);
+ assertThat(measures.size()).isEqualTo(2);
+ }
+
+ @Test
+ public void testFindByIdAndDeleted() {
+ Measure measure = measureRepo.findByIdAndDeleted(1L, true);
+ assertThat(measure).isNull();
+ }
+
+ @Test
+ public void testFindOrganizations() {
+ List<String> organizations = measureRepo.findOrganizations(false);
+ assertThat(organizations.size()).isEqualTo(3);
+ }
+
+ @Test
+ public void testFindNameByOrganization() {
+ List<String> names = measureRepo.findNameByOrganization("org1", false);
+ assertThat(names.size()).isEqualTo(1);
+ }
+
+ public void setEntityManager() throws Exception {
+ Measure measure1 = createGriffinMeasure("m1");
+ measure1.setOrganization("org1");
+ entityManager.persistAndFlush(measure1);
+
+ Measure measure2 = createGriffinMeasure("m2");
+ measure2.setOrganization("org2");
+ entityManager.persistAndFlush(measure2);
+
+ Measure measure3 = createGriffinMeasure("m3");
+ measure3.setOrganization("org3");
+ measure3.setOwner("owner");
+ entityManager.persistAndFlush(measure3);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/71fcf93b/service/src/test/java/org/apache/griffin/core/util/EntityHelper.java
----------------------------------------------------------------------
diff --git a/service/src/test/java/org/apache/griffin/core/util/EntityHelper.java b/service/src/test/java/org/apache/griffin/core/util/EntityHelper.java
index 627325c..1805608 100644
--- a/service/src/test/java/org/apache/griffin/core/util/EntityHelper.java
+++ b/service/src/test/java/org/apache/griffin/core/util/EntityHelper.java
@@ -20,29 +20,39 @@ under the License.
package org.apache.griffin.core.util;
+import com.fasterxml.jackson.core.JsonProcessingException;
+import org.apache.griffin.core.job.entity.*;
import org.apache.griffin.core.measure.entity.*;
-import org.codehaus.jackson.map.ObjectMapper;
import org.quartz.JobDataMap;
-import org.quartz.Trigger;
+import org.quartz.JobKey;
+import org.quartz.SimpleTrigger;
import org.quartz.impl.JobDetailImpl;
+import org.quartz.impl.triggers.SimpleTriggerImpl;
-import java.io.Serializable;
+import java.io.IOException;
import java.util.*;
+import static org.apache.griffin.core.job.JobInstance.*;
+import static org.apache.griffin.core.job.JobServiceImpl.GRIFFIN_JOB_ID;
+import static org.apache.griffin.core.job.JobServiceImpl.JOB_SCHEDULE_ID;
+import static org.apache.hadoop.mapreduce.MRJobConfig.JOB_NAME;
+
public class EntityHelper {
- public static GriffinMeasure createATestGriffinMeasure(String name, String org) throws Exception {
- HashMap<String, String> configMap1 = new HashMap<>();
- configMap1.put("database", "default");
- configMap1.put("table.name", "test_data_src");
- HashMap<String, String> configMap2 = new HashMap<>();
- configMap2.put("database", "default");
- configMap2.put("table.name", "test_data_tgt");
- String configJson1 = new ObjectMapper().writeValueAsString(configMap1);
- String configJson2 = new ObjectMapper().writeValueAsString(configMap2);
-
- DataSource dataSource = new DataSource("source", Arrays.asList(new DataConnector("source_name", "HIVE", "1.2", configJson1)));
- DataSource targetSource = new DataSource("target", Arrays.asList(new DataConnector("target-name", "HIVE", "1.2", configJson2)));
+ public static GriffinMeasure createGriffinMeasure(String name) throws Exception {
+ DataConnector dcSource = createDataConnector("source_name", "default", "test_data_src", "dt=#YYYYMMdd# AND hour=#HH#");
+ DataConnector dcTarget = createDataConnector("target_name", "default", "test_data_tgt", "dt=#YYYYMMdd# AND hour=#HH#");
+ return createGriffinMeasure(name, dcSource, dcTarget);
+ }
+
+ public static GriffinMeasure createGriffinMeasure(String name, SegmentPredicate srcPredicate, SegmentPredicate tgtPredicate) throws Exception {
+ DataConnector dcSource = createDataConnector("source_name", "default", "test_data_src", "dt=#YYYYMMdd# AND hour=#HH#", srcPredicate);
+ DataConnector dcTarget = createDataConnector("target_name", "default", "test_data_tgt", "dt=#YYYYMMdd# AND hour=#HH#", tgtPredicate);
+ return createGriffinMeasure(name, dcSource, dcTarget);
+ }
+ public static GriffinMeasure createGriffinMeasure(String name, DataConnector dcSource, DataConnector dcTarget) throws Exception {
+ DataSource dataSource = new DataSource("source", Arrays.asList(dcSource));
+ DataSource targetSource = new DataSource("target", Arrays.asList(dcTarget));
List<DataSource> dataSources = new ArrayList<>();
dataSources.add(dataSource);
dataSources.add(targetSource);
@@ -51,39 +61,112 @@ public class EntityHelper {
map.put("detail", "detail info");
Rule rule = new Rule("griffin-dsl", "accuracy", rules, map);
EvaluateRule evaluateRule = new EvaluateRule(Arrays.asList(rule));
- return new GriffinMeasure(1L,name, "description", org, "batch", "test", dataSources, evaluateRule);
+ return new GriffinMeasure(name, "test", dataSources, evaluateRule);
+ }
+
+ public static DataConnector createDataConnector(String name, String database, String table, String where) throws IOException {
+ HashMap<String, String> config = new HashMap<>();
+ config.put("database", database);
+ config.put("table.name", table);
+ config.put("where", where);
+ return new DataConnector(name, "1h", config, null);
+ }
+
+ public static DataConnector createDataConnector(String name, String database, String table, String where, SegmentPredicate predicate) throws IOException {
+ HashMap<String, String> config = new HashMap<>();
+ config.put("database", database);
+ config.put("table.name", table);
+ config.put("where", where);
+ return new DataConnector(name, "1h", config, Arrays.asList(predicate));
}
- public static JobDetailImpl createJobDetail() {
+ public static ExternalMeasure createExternalMeasure(String name) {
+ return new ExternalMeasure(name, "description", "org", "test", "metricName", new VirtualJob());
+ }
+
+ public static JobSchedule createJobSchedule() throws JsonProcessingException {
+ return createJobSchedule("jobName");
+ }
+
+ public static JobSchedule createJobSchedule(String jobName) throws JsonProcessingException {
+ JobDataSegment segment1 = createJobDataSegment("source_name", true);
+ JobDataSegment segment2 = createJobDataSegment("target_name", false);
+ List<JobDataSegment> segments = new ArrayList<>();
+ segments.add(segment1);
+ segments.add(segment2);
+ return new JobSchedule(1L, jobName, "0 0/4 * * * ?", "GMT+8:00", segments);
+ }
+
+ public static JobSchedule createJobSchedule(String jobName, SegmentRange range) throws JsonProcessingException {
+ JobDataSegment segment1 = createJobDataSegment("source_name", true, range);
+ JobDataSegment segment2 = createJobDataSegment("target_name", false, range);
+ List<JobDataSegment> segments = new ArrayList<>();
+ segments.add(segment1);
+ segments.add(segment2);
+ return new JobSchedule(1L, jobName, "0 0/4 * * * ?", "GMT+8:00", segments);
+ }
+
+ public static JobSchedule createJobSchedule(String jobName, JobDataSegment source, JobDataSegment target) throws JsonProcessingException {
+ List<JobDataSegment> segments = new ArrayList<>();
+ segments.add(source);
+ segments.add(target);
+ return new JobSchedule(1L, jobName, "0 0/4 * * * ?", "GMT+8:00", segments);
+ }
+
+ public static JobDataSegment createJobDataSegment(String dataConnectorName, Boolean baseline, SegmentRange range) {
+ return new JobDataSegment(dataConnectorName, baseline, range);
+ }
+
+ public static JobDataSegment createJobDataSegment(String dataConnectorName, Boolean baseline) {
+ return new JobDataSegment(dataConnectorName, baseline);
+ }
+
+ public static JobInstanceBean createJobInstance() {
+ JobInstanceBean jobBean = new JobInstanceBean();
+ jobBean.setSessionId(1L);
+ jobBean.setState(LivySessionStates.State.starting);
+ jobBean.setAppId("app_id");
+ jobBean.setTms(System.currentTimeMillis());
+ return jobBean;
+ }
+
+ public static JobDetailImpl createJobDetail(String measureJson, String predicatesJson) {
JobDetailImpl jobDetail = new JobDetailImpl();
- JobDataMap jobInfoMap = new JobDataMap();
- jobInfoMap.put("triggerState", Trigger.TriggerState.NORMAL);
- jobInfoMap.put("measureId", "1");
- jobInfoMap.put("sourcePattern", "YYYYMMdd-HH");
- jobInfoMap.put("targetPattern", "YYYYMMdd-HH");
- jobInfoMap.put("jobStartTime", "1506356105876");
- jobInfoMap.put("interval", "3000");
- jobInfoMap.put("deleted", "false");
- jobInfoMap.put("blockStartTimestamp", "1506634804254");
- jobInfoMap.put("lastBlockStartTimestamp", "1506634804254");
- jobInfoMap.put("groupName", "BA");
- jobInfoMap.put("jobName", "jobName");
- jobDetail.setJobDataMap(jobInfoMap);
+ JobKey jobKey = new JobKey("name", "group");
+ jobDetail.setKey(jobKey);
+ JobDataMap jobDataMap = new JobDataMap();
+ jobDataMap.put(MEASURE_KEY, measureJson);
+ jobDataMap.put(PREDICATES_KEY, predicatesJson);
+ jobDataMap.put(JOB_NAME, "jobName");
+ jobDataMap.put(PREDICATE_JOB_NAME, "predicateJobName");
+ jobDataMap.put(JOB_SCHEDULE_ID, 1L);
+ jobDataMap.put(GRIFFIN_JOB_ID, 1L);
+ jobDetail.setJobDataMap(jobDataMap);
return jobDetail;
}
+ public static SegmentPredicate createFileExistPredicate() throws JsonProcessingException {
+ Map<String, String> config = new HashMap<>();
+ config.put("root.path", "hdfs:///griffin/demo_src");
+ config.put("path", "/dt=#YYYYMMdd#/hour=#HH#/_DONE");
+ return new SegmentPredicate("file.exist", config);
+ }
+
public static Map<String, Object> createJobDetailMap() {
- Map<String, Object> jobDetailMap = new HashMap<>();
- jobDetailMap.put("jobName", "jobName");
- jobDetailMap.put("measureId", "1");
- jobDetailMap.put("groupName", "BA");
- jobDetailMap.put("targetPattern", "YYYYMMdd-HH");
- jobDetailMap.put("triggerState", Trigger.TriggerState.NORMAL);
- jobDetailMap.put("nextFireTime", "1509613440000");
- jobDetailMap.put("previousFireTime", "1509613410000");
- jobDetailMap.put("interval", "3000");
- jobDetailMap.put("sourcePattern", "YYYYMMdd-HH");
- jobDetailMap.put("jobStartTime", "1506356105876");
- return jobDetailMap;
+ Map<String, Object> detail = new HashMap<>();
+ detail.put("jobId", 1L);
+ detail.put("jobName", "jobName");
+ detail.put("measureId", 1L);
+ detail.put("cronExpression", "0 0/4 * * * ?");
+ return detail;
}
+
+ public static SimpleTrigger createSimpleTrigger(int repeatCount, int triggerCount) {
+ SimpleTriggerImpl trigger = new SimpleTriggerImpl();
+ trigger.setRepeatCount(repeatCount);
+ trigger.setTimesTriggered(triggerCount);
+ trigger.setPreviousFireTime(new Date());
+ return trigger;
+ }
+
}
http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/71fcf93b/service/src/test/java/org/apache/griffin/core/util/GriffinUtilTest.java
----------------------------------------------------------------------
diff --git a/service/src/test/java/org/apache/griffin/core/util/GriffinUtilTest.java b/service/src/test/java/org/apache/griffin/core/util/GriffinUtilTest.java
deleted file mode 100644
index f1563d1..0000000
--- a/service/src/test/java/org/apache/griffin/core/util/GriffinUtilTest.java
+++ /dev/null
@@ -1,87 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements. See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership. The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License. You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied. See the License for the
-specific language governing permissions and limitations
-under the License.
-*/
-
-package org.apache.griffin.core.util;
-
-import com.fasterxml.jackson.core.JsonProcessingException;
-import com.fasterxml.jackson.core.type.TypeReference;
-import org.apache.griffin.core.job.entity.JobHealth;
-import org.junit.Before;
-import org.junit.Test;
-import org.springframework.core.io.ClassPathResource;
-
-import java.io.IOException;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.Properties;
-
-import static org.junit.Assert.assertEquals;
-
-public class GriffinUtilTest {
-
- @Before
- public void setup() {
- }
-
- @Test
- public void testToJson() throws JsonProcessingException {
- JobHealth jobHealth = new JobHealth(5, 10);
- String jobHealthStr = JsonUtil.toJson(jobHealth);
- System.out.println(jobHealthStr);
- assertEquals(jobHealthStr, "{\"healthyJobCount\":5,\"jobCount\":10}");
- }
-
- @Test
- public void testToEntityWithParamClass() throws IOException {
- String str = "{\"healthyJobCount\":5,\"jobCount\":10}";
- JobHealth jobHealth = JsonUtil.toEntity(str, JobHealth.class);
- assertEquals(jobHealth.getJobCount(), 10);
- assertEquals(jobHealth.getHealthyJobCount(), 5);
- }
-
- @Test
- public void testToEntityWithParamTypeReference() throws IOException {
- String str = "{\"aaa\":12, \"bbb\":13}";
- TypeReference<HashMap<String, Integer>> type = new TypeReference<HashMap<String, Integer>>() {
- };
- Map map = JsonUtil.toEntity(str, type);
- assertEquals(map.get("aaa"), 12);
- }
-
- @Test
- public void testGetPropertiesForSuccess() {
- String path = "/quartz.properties";
- Properties properties = PropertiesUtil.getProperties(path, new ClassPathResource(path));
- assertEquals(properties.get("org.quartz.jobStore.isClustered"), "true");
- }
-
- @Test
- public void testGetPropertiesForFailWithWrongPath() {
- String path = ".././quartz.properties";
- Properties properties = PropertiesUtil.getProperties(path, new ClassPathResource(path));
- assertEquals(properties, null);
- }
-
- @Test
- public void testToJsonWithFormat() throws JsonProcessingException {
- JobHealth jobHealth = new JobHealth(5, 10);
- String jobHealthStr = JsonUtil.toJsonWithFormat(jobHealth);
- System.out.println(jobHealthStr);
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/71fcf93b/service/src/test/java/org/apache/griffin/core/util/JsonUtilTest.java
----------------------------------------------------------------------
diff --git a/service/src/test/java/org/apache/griffin/core/util/JsonUtilTest.java b/service/src/test/java/org/apache/griffin/core/util/JsonUtilTest.java
new file mode 100644
index 0000000..baa20a9
--- /dev/null
+++ b/service/src/test/java/org/apache/griffin/core/util/JsonUtilTest.java
@@ -0,0 +1,84 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+
+package org.apache.griffin.core.util;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.core.type.TypeReference;
+import org.apache.griffin.core.job.entity.JobHealth;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.junit.Assert.*;
+
+public class JsonUtilTest {
+
+ @Test
+ public void testToJson() throws JsonProcessingException {
+ JobHealth jobHealth = new JobHealth(5, 10);
+ String jobHealthStr = JsonUtil.toJson(jobHealth);
+ System.out.println(jobHealthStr);
+ assertEquals(jobHealthStr, "{\"healthyJobCount\":5,\"jobCount\":10}");
+ }
+
+ @Test
+ public void testToJsonWithFormat() throws JsonProcessingException {
+ JobHealth jobHealth = new JobHealth(5, 10);
+ String jobHealthStr = JsonUtil.toJsonWithFormat(jobHealth);
+ System.out.println(jobHealthStr);
+ }
+
+ @Test
+ public void testToEntityWithParamClass() throws IOException {
+ String str = "{\"healthyJobCount\":5,\"jobCount\":10}";
+ JobHealth jobHealth = JsonUtil.toEntity(str, JobHealth.class);
+ assertEquals(jobHealth.getJobCount(), 10);
+ assertEquals(jobHealth.getHealthyJobCount(), 5);
+ }
+
+ @Test
+ public void testToEntityWithNullParamClass() throws IOException {
+ String str = null;
+ JobHealth jobHealth = JsonUtil.toEntity(str, JobHealth.class);
+ assert jobHealth == null;
+ }
+
+ @Test
+ public void testToEntityWithParamTypeReference() throws IOException {
+ String str = "{\"aaa\":12, \"bbb\":13}";
+ TypeReference<HashMap<String, Integer>> type = new TypeReference<HashMap<String, Integer>>() {
+ };
+ Map map = JsonUtil.toEntity(str, type);
+ assertEquals(map.get("aaa"), 12);
+ }
+
+ @Test
+ public void testToEntityWithNullParamTypeReference() throws IOException {
+ String str = null;
+ TypeReference<HashMap<String, Integer>> type = new TypeReference<HashMap<String, Integer>>() {
+ };
+ Map map = JsonUtil.toEntity(str, type);
+ assert map == null;
+ }
+
+
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/71fcf93b/service/src/test/java/org/apache/griffin/core/util/PropertiesUtilTest.java
----------------------------------------------------------------------
diff --git a/service/src/test/java/org/apache/griffin/core/util/PropertiesUtilTest.java b/service/src/test/java/org/apache/griffin/core/util/PropertiesUtilTest.java
new file mode 100644
index 0000000..ca57369
--- /dev/null
+++ b/service/src/test/java/org/apache/griffin/core/util/PropertiesUtilTest.java
@@ -0,0 +1,45 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+
+package org.apache.griffin.core.util;
+
+import org.junit.Test;
+import org.springframework.core.io.ClassPathResource;
+
+import java.util.Properties;
+
+import static org.junit.Assert.*;
+
+public class PropertiesUtilTest {
+
+ @Test
+ public void testGetPropertiesForSuccess() {
+ String path = "/quartz.properties";
+ Properties properties = PropertiesUtil.getProperties(path, new ClassPathResource(path));
+ assertEquals(properties.get("org.quartz.jobStore.isClustered"), "true");
+ }
+
+ @Test
+ public void testGetPropertiesForFailureWithWrongPath() {
+ String path = ".././quartz.properties";
+ Properties properties = PropertiesUtil.getProperties(path, new ClassPathResource(path));
+ assertEquals(properties, null);
+ }
+
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/71fcf93b/service/src/test/java/org/apache/griffin/core/util/TimeUtilTest.java
----------------------------------------------------------------------
diff --git a/service/src/test/java/org/apache/griffin/core/util/TimeUtilTest.java b/service/src/test/java/org/apache/griffin/core/util/TimeUtilTest.java
index 02c7320..b215f93 100644
--- a/service/src/test/java/org/apache/griffin/core/util/TimeUtilTest.java
+++ b/service/src/test/java/org/apache/griffin/core/util/TimeUtilTest.java
@@ -20,8 +20,102 @@ under the License.
package org.apache.griffin.core.util;
import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.springframework.test.context.junit4.SpringRunner;
+import static org.junit.Assert.assertEquals;
+
+@RunWith(SpringRunner.class)
public class TimeUtilTest {
+ @Test
+ public void testStr2LongWithPositive() throws Exception {
+ String time = "2h3m4s";
+ assertEquals(String.valueOf(TimeUtil.str2Long(time)), "7384000");
+ }
+
+ @Test
+ public void testStr2LongWithNegative() throws Exception {
+ String time = "-2h3m4s";
+ assertEquals(String.valueOf(TimeUtil.str2Long(time)), "-7384000");
+ }
+
+ @Test
+ public void testStr2LongWithNull() throws Exception {
+ String time = null;
+ assertEquals(String.valueOf(TimeUtil.str2Long(time)), "0");
+ }
+
+ @Test
+ public void testStr2LongWithDay() throws Exception {
+ String time = "1d";
+ System.out.println(TimeUtil.str2Long(time));
+ assertEquals(String.valueOf(TimeUtil.str2Long(time)), "86400000");
+ }
+ @Test
+ public void testStr2LongWithHour() throws Exception {
+ String time = "1h";
+ assertEquals(String.valueOf(TimeUtil.str2Long(time)), "3600000");
+ }
+
+ @Test
+ public void testStr2LongWithMinute() throws Exception {
+ String time = "1m";
+ assertEquals(String.valueOf(TimeUtil.str2Long(time)), "60000");
+ }
+
+ @Test
+ public void testStr2LongWithSecond() throws Exception {
+ String time = "1s";
+ assertEquals(String.valueOf(TimeUtil.str2Long(time)), "1000");
+ }
+
+ @Test
+ public void testStr2LongWithMillisecond() throws Exception {
+ String time = "1ms";
+ assertEquals(String.valueOf(TimeUtil.str2Long(time)), "1");
+ }
+
+ @Test
+ public void testStr2LongWithIllegalFormat() throws Exception {
+ String time = "1y2m3s";
+ assertEquals(String.valueOf(TimeUtil.str2Long(time)), "123000");
+ }
+
+ @Test
+ public void testFormat() throws Exception {
+ String format = "dt=#YYYYMMdd#";
+ Long time = 1516186620155L;
+ String timeZone = "GMT+8:00";
+ assertEquals(TimeUtil.format(format,time,timeZone),"dt=20180117");
+ }
+
+ @Test
+ public void testFormatWithDiff() throws Exception {
+ String format = "dt=#YYYYMMdd#/hour=#HH#";
+ Long time = 1516186620155L;
+ String timeZone = "GMT+8:00";
+ assertEquals(TimeUtil.format(format,time,timeZone),"dt=20180117/hour=18");
+ }
+
+ @Test
+ public void testFormatWithIllegalException() throws Exception {
+ String format = "\\#YYYYMMdd\\#";
+ Long time = 1516186620155L;
+ String timeZone = "GMT+8:00";
+ IllegalArgumentException exception = formatException(format, time,timeZone);
+ assert exception != null;
+ }
+
+ private IllegalArgumentException formatException(String format,Long time,String timeZone) {
+ IllegalArgumentException exception = null;
+ try {
+ TimeUtil.format(format,time,timeZone);
+ } catch (IllegalArgumentException e) {
+ exception = e;
+ }
+ return exception;
+ }
+
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/71fcf93b/service/src/test/resources/application.properties
----------------------------------------------------------------------
diff --git a/service/src/test/resources/application.properties b/service/src/test/resources/application.properties
index f303911..ebd6a41 100644
--- a/service/src/test/resources/application.properties
+++ b/service/src/test/resources/application.properties
@@ -17,45 +17,54 @@
# under the License.
#
-# spring.datasource.x
-
-spring.datasource.driver-class-name=org.h2.Driver
-spring.datasource.url=jdbc:h2:mem:db;DB_CLOSE_DELAY=-1
-spring.datasource.username=sa
-spring.datasource.password=sa
-#spring.datasource.url= jdbc:mysql://localhost:3306/quartz?autoReconnect=true&useSSL=false
-#spring.datasource.username =griffin
-#spring.datasource.password =123456
-
-# hibernate.X
-hibernate.dialect=org.hibernate.dialect.H2Dialect
-#spring.jpa.properties.hibernate.dialect=org.hibernate.dialect.H2Dialect
-
-hibernate.show_sql=true
-spring.jpa.hibernate.ddl-auto = create-drop
-
-#hibernate.hbm2ddl.auto=create-drop
-hibernate.cache.use_second_level_cache=true
-hibernate.cache.use_query_cache=true
-hibernate.cache.region.factory_class=org.hibernate.cache.ehcache.EhCacheRegionFactory
-
-# hive metastore
-hive.metastore.uris = thrift://10.9.246.187:9083
+spring.datasource.url = jdbc:mysql://localhost:3306/quartz?autoReconnect=true&useSSL=false
+spring.datasource.username = test
+spring.datasource.password = test
+spring.datasource.driver-class-name = com.mysql.jdbc.Driver
+
+# Hibernate ddl auto (validate, create, create-drop, update)
+spring.jpa.hibernate.ddl-auto = update
+spring.jpa.show-sql = true
+spring.jpa.properties.hibernate.dialect = org.hibernate.dialect.MySQL5Dialect
+# Naming strategy
+spring.jpa.hibernate.naming-strategy = org.hibernate.cfg.ImprovedNamingStrategy
+
+# Hive metastore
+hive.metastore.uris = thrift://localhost:9083
hive.metastore.dbname = default
hive.hmshandler.retry.attempts = 15
hive.hmshandler.retry.interval = 2000ms
+# Hive cache time
+cache.evict.hive.fixedRate.in.milliseconds = 900000
+
+# Kafka schema registry
+kafka.schema.registry.url = http://localhost:8081
+
+# Update job instance state at regular intervals
+jobInstance.fixedDelay.in.milliseconds = 60000
+# Expired time of job instance which is 7 days that is 604800000 milliseconds.Time unit only supports milliseconds
+jobInstance.expired.milliseconds = 604800000
+
+# schedule predicate job every 5 minutes and repeat 12 times at most
+#interval time unit m:minute h:hour d:day,only support these three units
+predicate.job.interval = 5m
+predicate.job.repeat.count = 12
+
+# external properties directory location
+external.config.location =
-# kafka schema registry
-kafka.schema.registry.url = http://10.65.159.119:8081
+# login strategy ("test" or "ldap")
+login.strategy = test
-#logging level
-logging.level.root=ERROR
-logging.level.org.hibernate=ERROR
-logging.level.org.springframework.test=ERROR
-logging.level.org.apache.griffin=ERROR
-logging.file=target/test.log
+# ldap
+ldap.url = ldap://hostname:port
+ldap.email = @example.com
+ldap.searchBase = DC=org,DC=example
+ldap.searchPattern = (sAMAccountName={0})
+# hdfs
+fs.defaultFS = hdfs://hdfs-default-name
-jobInstance.fixedDelay.in.milliseconds=60000
-# spring cache
-cache.evict.hive.fixedRate.in.milliseconds=900000
\ No newline at end of file
+# elasticsearch
+elasticsearch.host = localhost
+elasticsearch.port = 9200
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/71fcf93b/service/src/test/resources/quartz.properties
----------------------------------------------------------------------
diff --git a/service/src/test/resources/quartz.properties b/service/src/test/resources/quartz.properties
index 640f067..3cb7158 100644
--- a/service/src/test/resources/quartz.properties
+++ b/service/src/test/resources/quartz.properties
@@ -17,7 +17,7 @@
# under the License.
#
-org.quartz.scheduler.instanceName=spring-boot-quartz
+org.quartz.scheduler.instanceName=spring-boot-quartz-test
org.quartz.scheduler.instanceId=AUTO
org.quartz.threadPool.threadCount=5
org.quartz.jobStore.class=org.quartz.impl.jdbcjobstore.JobStoreTX
http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/71fcf93b/service/src/test/resources/sparkJob.properties
----------------------------------------------------------------------
diff --git a/service/src/test/resources/sparkJob.properties b/service/src/test/resources/sparkJob.properties
new file mode 100644
index 0000000..4b36826
--- /dev/null
+++ b/service/src/test/resources/sparkJob.properties
@@ -0,0 +1,50 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+# spark required
+sparkJob.file=hdfs://apollo-phx-nn-ha/apps/hdmi-technology/b_des/griffin/jar/griffin-measure.jar
+sparkJob.className=org.apache.griffin.measure.Application
+sparkJob.args_1=hdfs://apollo-phx-nn-ha/apps/hdmi-technology/b_des/griffin/conf/env.json
+sparkJob.args_3=hdfs,raw
+
+sparkJob.name=test
+sparkJob.queue=hdlq-gdi-sla
+
+# options
+sparkJob.numExecutors=10
+sparkJob.executorCores=1
+sparkJob.driverMemory=2g
+sparkJob.executorMemory=2g
+
+# shouldn't config in server, but in
+sparkJob.jars = hdfs://apollo-phx-nn-ha/apps/hdmi-technology/b_des/griffin/livy/spark-avro_2.11-2.0.1.jar;\
+ hdfs://apollo-phx-nn-ha/apps/hdmi-technology/b_des/griffin/livy/datanucleus-api-jdo-3.2.6.jar;\
+ hdfs://apollo-phx-nn-ha/apps/hdmi-technology/b_des/griffin/livy/datanucleus-core-3.2.10.jar;\
+ hdfs://apollo-phx-nn-ha/apps/hdmi-technology/b_des/griffin/livy/datanucleus-rdbms-3.2.9.jar
+
+spark.yarn.dist.files = hdfs://apollo-phx-nn-ha/apps/hdmi-technology/b_des/griffin/livy/hive-site.xml
+
+# livy
+# livy.uri=http://10.9.246.187:8998/batches
+livy.uri=http://localhost:8998/batches
+
+# spark-admin
+# spark.uri=http://10.149.247.156:28088
+# spark.uri=http://10.9.246.187:8088
+spark.uri=http://localhost:8088
\ No newline at end of file
[3/3] incubator-griffin git commit: fix transaction and schedule bug
and update ut
Posted by gu...@apache.org.
fix transaction and schedule bug and update ut
Fix Bugs:
1.fix transaction rollback failure bug
2.job schedule range null bug
3.fix time fromat without timezone bug
Author: ahutsunshine <ah...@gmail.com>
Author: He Wang <wa...@qq.com>
Closes #195 from ahutsunshine/master.
Project: http://git-wip-us.apache.org/repos/asf/incubator-griffin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-griffin/commit/71fcf93b
Tree: http://git-wip-us.apache.org/repos/asf/incubator-griffin/tree/71fcf93b
Diff: http://git-wip-us.apache.org/repos/asf/incubator-griffin/diff/71fcf93b
Branch: refs/heads/master
Commit: 71fcf93b99f1401d14e5baeb1f6137c6891d96fc
Parents: 87e59a5
Author: ahutsunshine <ah...@gmail.com>
Authored: Mon Jan 22 09:51:54 2018 +0800
Committer: Lionel Liu <bh...@163.com>
Committed: Mon Jan 22 09:51:54 2018 +0800
----------------------------------------------------------------------
README.md | 31 +-
griffin-doc/service/postman/griffin.json | 68 +-
.../griffin/core/config/PropertiesConfig.java | 4 +-
.../griffin/core/job/FileExistPredicator.java | 4 +-
.../apache/griffin/core/job/JobInstance.java | 12 +-
.../apache/griffin/core/job/JobServiceImpl.java | 65 +-
.../griffin/core/job/entity/GriffinJob.java | 6 +-
.../griffin/core/job/entity/JobDataSegment.java | 10 +-
.../griffin/core/job/entity/JobSchedule.java | 8 +-
.../core/job/entity/SegmentPredicate.java | 5 +
.../griffin/core/job/entity/SegmentRange.java | 8 +
.../measure/ExternalMeasureOperationImpl.java | 6 +-
.../measure/GriffinMeasureOperationImpl.java | 19 +-
.../griffin/core/measure/MeasureOperation.java | 2 +-
.../griffin/core/measure/MeasureOrgService.java | 1 -
.../core/measure/MeasureServiceImpl.java | 11 +-
.../core/measure/entity/ExternalMeasure.java | 3 +-
.../core/measure/entity/GriffinMeasure.java | 20 +-
.../griffin/core/measure/entity/Measure.java | 4 +-
.../griffin/core/measure/entity/Rule.java | 8 +-
.../griffin/core/measure/repo/MeasureRepo.java | 10 -
.../org/apache/griffin/core/util/AvroUtil.java | 30 -
.../org/apache/griffin/core/util/TimeUtil.java | 44 +-
.../src/main/resources/application.properties | 4 +-
service/src/main/resources/sparkJob.properties | 16 +-
.../core/config/PropertiesConfigTest.java | 160 ++++
.../griffin/core/job/JobInstanceTest.java | 186 ++++
.../griffin/core/job/JobServiceImplTest.java | 934 +++++++++++--------
.../griffin/core/job/SparkSubmitJobTest.java | 179 +++-
.../griffin/core/job/repo/JobRepoTest.java | 94 ++
.../core/measure/MeasureControllerTest.java | 24 +-
.../core/measure/MeasureOrgServiceImplTest.java | 212 +++--
.../core/measure/MeasureServiceImplTest.java | 336 +++++--
.../measure/repo/DataConnectorRepoTest.java | 80 ++
.../core/measure/repo/MeasureRepoTest.java | 189 ++--
.../apache/griffin/core/util/EntityHelper.java | 167 +++-
.../griffin/core/util/GriffinUtilTest.java | 87 --
.../apache/griffin/core/util/JsonUtilTest.java | 84 ++
.../griffin/core/util/PropertiesUtilTest.java | 45 +
.../apache/griffin/core/util/TimeUtilTest.java | 94 ++
.../src/test/resources/application.properties | 79 +-
service/src/test/resources/quartz.properties | 2 +-
service/src/test/resources/sparkJob.properties | 50 +
43 files changed, 2332 insertions(+), 1069 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/71fcf93b/README.md
----------------------------------------------------------------------
diff --git a/README.md b/README.md
index 452b3f2..13a52eb 100644
--- a/README.md
+++ b/README.md
@@ -18,7 +18,8 @@ under the License.
-->
-## Apache Griffin
+## Apache Griffin
+[![Build Status](https://travis-ci.org/apache/incubator-griffin.svg?branch=master)](https://travis-ci.org/apache/incubator-griffin) [![License: Apache 2.0](https://camo.githubusercontent.com/8cb994f6c4a156c623fe057fccd7fb7d7d2e8c9b/68747470733a2f2f696d672e736869656c64732e696f2f62616467652f6c6963656e73652d417061636865253230322d3445423142412e737667)](https://www.apache.org/licenses/LICENSE-2.0.html)
Apache Griffin is a model driven data quality solution for modern data systems.
It provides a standard process to define data quality measures, execute, report, as well as an unified dashboard across multiple data systems.
@@ -27,16 +28,7 @@ You can access our wiki page [here](https://cwiki.apache.org/confluence/display/
You can access our issues jira page [here](https://issues.apache.org/jira/secure/Dashboard.jspa?selectPageId=12330914).
### Contact us
-[Dev List](mailto://dev@griffin.incubator.apache.org)
-
-
-### CI
-
-
-### Repository
-Snapshot:
-
-Release:
+Email: <a href="mailto:dev@griffin.incubator.apache.org">dev@griffin.incubator.apache.org</a>
### How to run in docker
1. Install [docker](https://docs.docker.com/engine/installation/) and [docker compose](https://docs.docker.com/compose/install/).
@@ -59,10 +51,10 @@ Release:
```
docker-compose -f docker-compose-batch.yml up -d
```
-6. Now you can try griffin APIs by using postman after importing the [json files](https://github.com/apache/incubator-griffin/blob/master/griffin-doc/postman).
+6. Now you can try griffin APIs by using postman after importing the [json files](https://github.com/apache/incubator-griffin/tree/master/griffin-doc/service/postman).
In which you need to modify the environment `BASE_PATH` value into `<your local IP address>:38080`.
-More details about griffin docker [here](https://github.com/apache/incubator-griffin/blob/master/griffin-doc/griffin-docker-guide.md).
+More details about griffin docker [here](https://github.com/apache/incubator-griffin/blob/master/griffin-doc/docker/griffin-docker-guide.md).
### How to deploy and run at local
1. Install jdk (1.8 or later versions).
@@ -124,12 +116,23 @@ More details about griffin docker [here](https://github.com/apache/incubator-gri
```
http://<your IP>:8080
```
-11. Follow the steps using UI [here](https://github.com/apache/incubator-griffin/blob/master/griffin-doc/dockerUIguide.md#webui-test-case-guide).
+11. Follow the steps using UI [here](https://github.com/apache/incubator-griffin/blob/master/griffin-doc/ui/dockerUIguide.md#webui-test-case-guide).
**Note**: The front-end UI is still under development, you can only access some basic features currently.
+### Document List
+- [Wiki](https://cwiki.apache.org/confluence/display/GRIFFIN/Apache+Griffin)
+- [Measure](https://github.com/apache/incubator-griffin/tree/master/griffin-doc/measure)
+- [Service](https://github.com/apache/incubator-griffin/tree/master/griffin-doc/service)
+- [UI](https://github.com/apache/incubator-griffin/tree/master/griffin-doc/ui)
+- [Docker usage](https://github.com/apache/incubator-griffin/tree/master/griffin-doc/docker)
+- [Postman API](https://github.com/apache/incubator-griffin/tree/master/griffin-doc/service/postman)
### Contributing
See [CONTRIBUTING.md](CONTRIBUTING.md) for details on how to contribute code, documentation, etc.
+
+
+
+
http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/71fcf93b/griffin-doc/service/postman/griffin.json
----------------------------------------------------------------------
diff --git a/griffin-doc/service/postman/griffin.json b/griffin-doc/service/postman/griffin.json
index 88a220a..fdd172a 100644
--- a/griffin-doc/service/postman/griffin.json
+++ b/griffin-doc/service/postman/griffin.json
@@ -376,7 +376,7 @@
"tests": null,
"currentHelper": "normal",
"helperAttributes": {},
- "time": 1515399022575,
+ "time": 1516341608732,
"name": "Update measure",
"description": "`PUT /api/v1/measures`\n\n#### Request Header\nkey | value\n--- | ---\nContent-Type | application/json\n\n#### Request Body\n\nname | description | type\n--- | --- | --- \nmeasure | measure entity | Measure\n\nThere are two different measures that are griffin measure and external measure.\nIf you want to update an external measure,you can use following example json in request body.\n```\n{\n\t\"id\":1,\n \"type\": \"external\",\n \"name\": \"external_name\",\n \"description\": \" update test measure\",\n \"organization\": \"orgName\",\n \"owner\": \"test\",\n \"metricName\": \"metricName\"\n}\n```\nPostman gives a griffin measure example in request body and response body. \n#### Response Body Sample\n```\n{\n \"code\": 204,\n \"description\": \"Update Measure Succeed\"\n}\n```\n\nIt may return failed messages.Such as,\n\n```\n {\n \"code\": 400,\n \"description\": \"Resource Not Found\"\n}\n\n```\n\nThe reason for failure may be that measure
id doesn't exist.",
"collectionId": "a743e1b9-583f-6bd7-e2ae-f03a1f807c63",
@@ -462,14 +462,13 @@
}
],
"headers": "Content-Type: application/json\n",
- "data": "{\n \"id\": 1,\n \"name\": \"measure_official_update\",\n \"description\": \"create a measure\",\n \"organization\": \"test\",\n \"owner\": \"test\",\n \"deleted\": false,\n \"type\": \"griffin\",\n \"process.type\": \"batch\",\n \"data.sources\": [\n {\n \"id\": 1,\n \"name\": \"source\",\n \"connectors\": [\n {\n \"id\": 1,\n \"name\": \"connector_name_source\",\n \"type\": \"HIVE\",\n \"version\": \"1.2\",\n \"predicates\": [],\n \"data.unit\": \"1h\",\n \"config\": {\n \"database\": \"default\",\n \"table.name\": \"demo_src\",\n \"where\": \"dt=#YYYYMMdd# AND hour=#HH#\"\n
}\n }\n ]\n },\n {\n \"id\": 2,\n \"name\": \"target\",\n \"connectors\": [\n {\n \"id\": 2,\n \"name\": \"connector_name_target\",\n \"type\": \"HIVE\",\n \"version\": \"1.2\",\n \"predicates\": [],\n \"data.unit\": \"1h\",\n \"config\": {\n \"database\": \"default\",\n \"table.name\": \"demo_src\",\n \"where\": \"dt=#YYYYMMdd# AND hour=#HH#\"\n }\n }\n ]\n }\n ],\n \"evaluate.rule\": {\n \"id\": 1,\n \"rules\": [\n {\n \"id\": 1,\n \"rule\": \"source.desc=target.desc
\",\n \"dsl.type\": \"griffin-dsl\",\n \"dq.type\": \"accuracy\",\n \"details\": {}\n }\n ]\n }\n }",
+ "data": "{\n \"id\": 1,\n \"name\": \"measureName_edit\",\n \"description\": \"measure description\",\n \"organization\": \"orgName\",\n \"owner\": \"test\",\n \"deleted\": false,\n \"dq.type\": \"accuracy\",\n \"process.type\": \"batch\",\n \"data.sources\": [\n {\n \"id\": 1,\n \"name\": \"source\",\n \"connectors\": [\n {\n \"id\": 1,\n \"name\": \"connector_name_source\",\n \"type\": \"HIVE\",\n \"version\": \"1.2\",\n \"predicates\": [\n {\n \"id\": 1,\n \"type\": \"file.exist\",\n \"config\": {\n \"root.path\": \"hdfs:///griffin/demo_src\",\n
\"path\": \"/dt=#YYYYMMdd#/hour=#HH#/_DONE\"\n }\n }\n ],\n \"data.unit\": \"1h\",\n \"config\": {\n \"database\": \"default\",\n \"table.name\": \"demo_src\",\n \"where\": \"dt=#YYYYMMdd# AND hour=#HH#\"\n }\n }\n ]\n },\n {\n \"id\": 2,\n \"name\": \"target\",\n \"connectors\": [\n {\n \"id\": 2,\n \"name\": \"connector_name_target\",\n \"type\": \"HIVE\",\n \"version\": \"1.2\",\n \"predicates\": [\n {\n \"id\": 2,\n \"type\": \
"file.exist\",\n \"config\": {\n \"root.path\": \"hdfs:///griffin/demo_src\",\n \"path\": \"/dt=#YYYYMMdd#/hour=#HH#/_DONE\"\n }\n }\n ],\n \"data.unit\": \"1h\",\n \"config\": {\n \"database\": \"default\",\n \"table.name\": \"demo_src\",\n \"where\": \"dt=#YYYYMMdd# AND hour=#HH#\"\n }\n }\n ]\n }\n ],\n \"evaluate.rule\": {\n \"id\": 1,\n \"rules\": [\n {\n \"id\": 1,\n \"rule\": \"source.desc=target.desc\",\n \"name\": \"rule_name\",\n \"description\": \"Total count\",\n
\"dsl.type\": \"griffin-dsl\",\n \"dq.type\": \"accuracy\",\n \"details\": {}\n }\n ]\n },\n \"measure.type\": \"griffin\"\n }",
"method": "PUT",
"dataMode": "raw"
}
}
],
- "rawModeData": "{\n \"id\": 1,\n \"name\": \"measureName_test_edit\",\n \"description\": \"This is a test measure\",\n \"organization\": \"orgName\",\n \"evaluateRule\": {\n \"rules\": [\n {\n \"rule\": \"source.id = target.id and source.age = target.age and source.desc = target.desc\",\n \"dsl.type\": \"griffin-dsl\",\n \"dq.type\": \"accuracy\"\n }\n ]\n },\n \"owner\": \"test\",\n \"deleted\": false,\n \"process.type\": \"batch\",\n \"data.sources\": [\n {\n \"name\": \"source\",\n \"connectors\": [\n {\n \"type\": \"HIVE\",\n \"version\": \"1.2\",\n \"config\": {\n \"database\": \"default\",\n \"table.name\": \"demo_src\
"\n }\n }\n ]\n },\n {\n \"name\": \"target\",\n \"connectors\": [\n {\n \"type\": \"HIVE\",\n \"version\": \"1.2\",\n \"config\": {\n \"database\": \"default\",\n \"table.name\": \"demo_tgt\"\n }\n }\n ]\n }\n ]\n}",
- "collection_id": "a743e1b9-583f-6bd7-e2ae-f03a1f807c63"
+ "rawModeData": "{\n \"id\": 1,\n \"name\": \"measureName_edit\",\n \"description\": \"measure description\",\n \"organization\": \"orgName\",\n \"owner\": \"test\",\n \"deleted\": false,\n \"dq.type\": \"accuracy\",\n \"process.type\": \"batch\",\n \"data.sources\": [\n {\n \"id\": 1,\n \"name\": \"source\",\n \"connectors\": [\n {\n \"id\": 1,\n \"name\": \"connector_name_source\",\n \"type\": \"HIVE\",\n \"version\": \"1.2\",\n \"predicates\": [\n {\n \"id\": 1,\n \"type\": \"file.exist\",\n \"config\": {\n \"root.path\": \"hdfs:///griffin/demo_src\",\n
\"path\": \"/dt=#YYYYMMdd#/hour=#HH#/_DONE\"\n }\n }\n ],\n \"data.unit\": \"1h\",\n \"config\": {\n \"database\": \"default\",\n \"table.name\": \"demo_src\",\n \"where\": \"dt=#YYYYMMdd# AND hour=#HH#\"\n }\n }\n ]\n },\n {\n \"id\": 2,\n \"name\": \"target\",\n \"connectors\": [\n {\n \"id\": 2,\n \"name\": \"connector_name_target\",\n \"type\": \"HIVE\",\n \"version\": \"1.2\",\n \"predicates\": [\n {\n \"id\": 2,\n \"type\
": \"file.exist\",\n \"config\": {\n \"root.path\": \"hdfs:///griffin/demo_src\",\n \"path\": \"/dt=#YYYYMMdd#/hour=#HH#/_DONE\"\n }\n }\n ],\n \"data.unit\": \"1h\",\n \"config\": {\n \"database\": \"default\",\n \"table.name\": \"demo_src\",\n \"where\": \"dt=#YYYYMMdd# AND hour=#HH#\"\n }\n }\n ]\n }\n ],\n \"evaluate.rule\": {\n \"id\": 1,\n \"rules\": [\n {\n \"id\": 1,\n \"rule\": \"source.desc=target.desc\",\n \"name\": \"rule_name\",\n \"description\": \"Total count\",\n
\"dsl.type\": \"griffin-dsl\",\n \"dq.type\": \"accuracy\",\n \"details\": {}\n }\n ]\n },\n \"measure.type\": \"griffin\"\n }"
},
{
"id": "2bfc82ab-ec97-ee89-d6b4-db5ffefce28b",
@@ -704,7 +703,7 @@
"tests": null,
"currentHelper": "normal",
"helperAttributes": {},
- "time": 1515398727266,
+ "time": 1516341583244,
"name": "Add measure",
"description": "`POST /api/v1/measures`\n\n#### Request Header\nkey | value\n--- | ---\nContent-Type | application/json\n\n#### Request Body\n\nname | description | type\n--- | --- | --- \nmeasure | measure entity | Measure\n\nThere are two different measures that are griffin measure and external measure.\nIf you want to create an external measure,you can use following example json in request body.\n```\n{\n \"type\": \"external\",\n \"name\": \"external_name\",\n \"description\": \" test measure\",\n \"organization\": \"orgName\",\n \"owner\": \"test\",\n \"metricName\": \"metricName\"\n}\n```\nPostman gives a griffin measure example in request body and response body. \n#### Response Body Sample\n```\n{\n \"code\": 201,\n \"description\": \"Create Measure Succeed\"\n}\n```\n\nIt may return failed messages.Such as,\n\n```\n {\n \"code\": 410,\n \"description\": \"Create Measure Failed, duplicate records\"\n}\n\n```\n\nThe reason for failure may be that measu
re name already exists.You can change measure name to make it unique.\n\n```\n {\n \"code\": 401,\n \"description\": \"Create Measure Failed\"\n}\n```\nThe reason for failure may be that connector names already exist or connector names are empty.",
"collectionId": "a743e1b9-583f-6bd7-e2ae-f03a1f807c63",
@@ -790,14 +789,13 @@
}
],
"headers": "Content-Type: application/json\n",
- "data": "{\r\n \"name\":\"measure_name\",\r\n\t\"type\":\"griffin\",\r\n \"description\":\"create a measure\",\r\n \"organization\":\"test\",\r\n \"evaluate.rule\":{\r\n \"rules\":[\r\n {\r\n \"rule\":\"source.desc=target.desc\",\r\n \"dsl.type\":\"griffin-dsl\",\r\n \"dq.type\":\"accuracy\",\r\n \"details\":{}\r\n }\r\n ]\r\n },\r\n \"owner\":\"test\",\r\n \"process.type\":\"batch\",\r\n \"data.sources\":[\r\n {\r\n \"name\":\"source\",\r\n \"connectors\":[\r\n {\r\n\t\t\t\t\t\"name\":\"connector_name_source\",\r\n \"type\":\"HIVE\",\r\n \"version\":\"1.2\",\r\n\t\t\t\t\t\"data.unit\":\"1h\",\r\n \"config\":{\r\n \"database\":\"default\",\r\n \"table.name\":\"demo_src\",\r\n \"where\":\"dt=#
YYYYMMdd# AND hour=#HH#\"\r\n },\r\n \"predicates\":[\r\n {\r\n \"type\":\"file.exist\",\r\n \"config\":{\r\n \"root.path\":\"hdfs:///griffin/demo_src\",\r\n \"path\":\"/dt=#YYYYMMdd#/hour=#HH#/_DONE\"\r\n }\r\n }\r\n ]\r\n }\r\n ]\r\n },\r\n {\r\n \"name\":\"target\",\r\n \"connectors\":[\r\n {\r\n\t\t\t\t\t\"name\":\"connector_name_target\",\r\n \"type\":\"HIVE\",\r\n \"version\":\"1.2\",\r\n\t\t\t\t\t\"data.unit\":\"1h\",\r\n \"config\":{\r\n \"database\":\"default\",\r\n \"table.name\":\"demo_src\",\r\n \"where\":\"dt=#YYYYMMdd# AND hour=
#HH#\"\r\n },\r\n \"predicates\":[\r\n {\r\n \"type\":\"file.exist\",\r\n \"config\":{\r\n \"root.path\":\"hdfs:///griffin/demo_src\",\r\n \"path\":\"/dt=#YYYYMMdd#/hour=#HH#/_DONE\"\r\n }\r\n }\r\n ]\r\n }\r\n ]\r\n }\r\n ]\r\n}",
+ "data": "{\r\n \"name\":\"measureName\",\r\n\t\"measure.type\":\"griffin\",\r\n \"description\":\"measure description\",\r\n \"organization\":\"orgName\",\r\n\t\"owner\":\"test\",\r\n \"process.type\":\"batch\",\r\n\t\"dq.type\":\"accuracy\",\r\n \"evaluate.rule\":{\r\n \"rules\":[\r\n {\r\n\t\t\t\t\"name\":\"rule_name\",\r\n \"rule\":\"source.desc=target.desc\",\r\n\t\t\t\t\"description\":\"Total count\",\r\n \"dsl.type\":\"griffin-dsl\",\r\n \"dq.type\":\"accuracy\",\r\n \"details\":{}\r\n }\r\n ]\r\n },\r\n \r\n \"data.sources\":[\r\n {\r\n \"name\":\"source\",\r\n \"connectors\":[\r\n {\r\n\t\t\t\t\t\"name\":\"connector_name_source\",\r\n \"type\":\"HIVE\",\r\n \"version\":\"1.2\",\r\n\t\t\t\t\t\"data.unit\":\"1h\",\r\n \"config\":{\r\n
\"database\":\"default\",\r\n \"table.name\":\"demo_src\",\r\n \"where\":\"dt=#YYYYMMdd# AND hour=#HH#\"\r\n },\r\n \"predicates\":[\r\n {\r\n \"type\":\"file.exist\",\r\n \"config\":{\r\n \"root.path\":\"hdfs:///griffin/demo_src\",\r\n \"path\":\"/dt=#YYYYMMdd#/hour=#HH#/_DONE\"\r\n }\r\n }\r\n ]\r\n }\r\n ]\r\n },\r\n {\r\n \"name\":\"target\",\r\n \"connectors\":[\r\n {\r\n\t\t\t\t\t\"name\":\"connector_name_target\",\r\n \"type\":\"HIVE\",\r\n \"version\":\"1.2\",\r\n\t\t\t\t\t\"data.unit\":\"1h\",\r\n \"config\":{\r\n \"database\":\"d
efault\",\r\n \"table.name\":\"demo_src\",\r\n \"where\":\"dt=#YYYYMMdd# AND hour=#HH#\"\r\n },\r\n \"predicates\":[\r\n {\r\n \"type\":\"file.exist\",\r\n \"config\":{\r\n \"root.path\":\"hdfs:///griffin/demo_src\",\r\n \"path\":\"/dt=#YYYYMMdd#/hour=#HH#/_DONE\"\r\n }\r\n }\r\n ]\r\n }\r\n ]\r\n }\r\n ]\r\n}",
"method": "POST",
"dataMode": "raw"
}
}
],
- "rawModeData": "{\r\n \"name\":\"measure_official\",\r\n\t\"type\":\"griffin\",\r\n \"description\":\"create a measure\",\r\n \"organization\":\"test\",\r\n \"evaluate.rule\":{\r\n \"rules\":[\r\n {\r\n \"rule\":\"source.desc=target.desc\",\r\n \"dsl.type\":\"griffin-dsl\",\r\n \"dq.type\":\"accuracy\",\r\n \"details\":{}\r\n }\r\n ]\r\n },\r\n \"owner\":\"test\",\r\n \"process.type\":\"batch\",\r\n \"data.sources\":[\r\n {\r\n \"name\":\"source\",\r\n \"connectors\":[\r\n {\r\n\t\t\t\t\t\"name\":\"connector_name_source\",\r\n \"type\":\"HIVE\",\r\n \"version\":\"1.2\",\r\n\t\t\t\t\t\"data.unit\":\"1h\",\r\n \"config\":{\r\n \"database\":\"default\",\r\n \"table.name\":\"demo_src\",\r\n \"where\
":\"dt=#YYYYMMdd# AND hour=#HH#\"\r\n },\r\n \"predicates\":[\r\n {\r\n \"type\":\"file.exist\",\r\n \"config\":{\r\n \"root.path\":\"hdfs:///griffin/demo_src\",\r\n \"path\":\"/dt=#YYYYMMdd#/hour=#HH#/_DONE\"\r\n }\r\n }\r\n ]\r\n }\r\n ]\r\n },\r\n {\r\n \"name\":\"target\",\r\n \"connectors\":[\r\n {\r\n\t\t\t\t\t\"name\":\"connector_name_target\",\r\n \"type\":\"HIVE\",\r\n \"version\":\"1.2\",\r\n\t\t\t\t\t\"data.unit\":\"1h\",\r\n \"config\":{\r\n \"database\":\"default\",\r\n \"table.name\":\"demo_src\",\r\n \"where\":\"dt=#YYYYMMdd# A
ND hour=#HH#\"\r\n },\r\n \"predicates\":[\r\n {\r\n \"type\":\"file.exist\",\r\n \"config\":{\r\n \"root.path\":\"hdfs:///griffin/demo_src\",\r\n \"path\":\"/dt=#YYYYMMdd#/hour=#HH#/_DONE\"\r\n }\r\n }\r\n ]\r\n }\r\n ]\r\n }\r\n ]\r\n}",
- "collection_id": "a743e1b9-583f-6bd7-e2ae-f03a1f807c63"
+ "rawModeData": "{\r\n \"name\":\"measureName\",\r\n\t\"measure.type\":\"griffin\",\r\n \"description\":\"measure description\",\r\n \"organization\":\"orgName\",\r\n\t\"owner\":\"test\",\r\n \"process.type\":\"batch\",\r\n\t\"dq.type\":\"accuracy\",\r\n \"evaluate.rule\":{\r\n \"rules\":[\r\n {\r\n\t\t\t\t\"name\":\"rule_name\",\r\n \"rule\":\"source.desc=target.desc\",\r\n\t\t\t\t\"description\":\"Total count\",\r\n \"dsl.type\":\"griffin-dsl\",\r\n \"dq.type\":\"accuracy\",\r\n \"details\":{}\r\n }\r\n ]\r\n },\r\n \r\n \"data.sources\":[\r\n {\r\n \"name\":\"source\",\r\n \"connectors\":[\r\n {\r\n\t\t\t\t\t\"name\":\"connector_name_source\",\r\n \"type\":\"HIVE\",\r\n \"version\":\"1.2\",\r\n\t\t\t\t\t\"data.unit\":\"1h\",\r\n \"config\":{\r\n
\"database\":\"default\",\r\n \"table.name\":\"demo_src\",\r\n \"where\":\"dt=#YYYYMMdd# AND hour=#HH#\"\r\n },\r\n \"predicates\":[\r\n {\r\n \"type\":\"file.exist\",\r\n \"config\":{\r\n \"root.path\":\"hdfs:///griffin/demo_src\",\r\n \"path\":\"/dt=#YYYYMMdd#/hour=#HH#/_DONE\"\r\n }\r\n }\r\n ]\r\n }\r\n ]\r\n },\r\n {\r\n \"name\":\"target\",\r\n \"connectors\":[\r\n {\r\n\t\t\t\t\t\"name\":\"connector_name_target\",\r\n \"type\":\"HIVE\",\r\n \"version\":\"1.2\",\r\n\t\t\t\t\t\"data.unit\":\"1h\",\r\n \"config\":{\r\n \"database\"
:\"default\",\r\n \"table.name\":\"demo_src\",\r\n \"where\":\"dt=#YYYYMMdd# AND hour=#HH#\"\r\n },\r\n \"predicates\":[\r\n {\r\n \"type\":\"file.exist\",\r\n \"config\":{\r\n \"root.path\":\"hdfs:///griffin/demo_src\",\r\n \"path\":\"/dt=#YYYYMMdd#/hour=#HH#/_DONE\"\r\n }\r\n }\r\n ]\r\n }\r\n ]\r\n }\r\n ]\r\n}"
},
{
"id": "45aef93d-2bcf-4a1f-245a-29611d3d740e",
@@ -1092,7 +1090,7 @@
],
"cookies": [],
"mime": "",
- "text": "[{\"id\":2,\"name\":\"measureName_test_edit\",\"description\":\"This is a test measure\",\"organization\":\"orgName\",\"evaluateRule\":{\"id\":18,\"rules\":[{\"id\":10,\"rule\":\"source.id==target.id\",\"dsl.type\":\"griffin-dsl\",\"dq.type\":\"accuracy\"}]},\"owner\":\"test\",\"deleted\":false,\"process.type\":\"batch\",\"data.sources\":[{\"id\":35,\"name\":\"source\",\"connectors\":[{\"id\":19,\"type\":\"HIVE\",\"version\":\"1.2\",\"config\":{\"database\":\"default\",\"table.name\":\"demo_src\"}}]},{\"id\":36,\"name\":\"target\",\"connectors\":[{\"id\":20,\"type\":\"HIVE\",\"version\":\"1.2\",\"config\":{\"database\":\"default\",\"table.name\":\"demo_tgt\"}}]}]},{\"id\":6,\"name\":\"third_measure\",\"description\":null,\"organization\":\"ebay\",\"evaluateRule\":{\"id\":6,\"rules\":[{\"id\":6,\"rule\":\"source.id=target.id AND source.age=target.age\",\"dsl.type\":\"griffin-dsl\",\"dq.type\":\"accuracy\"}]},\"owner\":\"test\",\"deleted\":false,\"process.type\":\"batch\
",\"data.sources\":[{\"id\":11,\"name\":\"source\",\"connectors\":[{\"id\":11,\"type\":\"HIVE\",\"version\":\"1.2\",\"config\":{\"database\":\"default\",\"table.name\":\"demo_src\"}}]},{\"id\":12,\"name\":\"target\",\"connectors\":[{\"id\":12,\"type\":\"HIVE\",\"version\":\"1.2\",\"config\":{\"database\":\"default\",\"table.name\":\"demo_tgt\"}}]}]},{\"id\":8,\"name\":\"measure1\",\"description\":null,\"organization\":\"test\",\"evaluateRule\":{\"id\":8,\"rules\":[{\"id\":8,\"rule\":\"source.age=target.age\",\"dsl.type\":\"griffin-dsl\",\"dq.type\":\"accuracy\"}]},\"owner\":\"test\",\"deleted\":false,\"process.type\":\"batch\",\"data.sources\":[{\"id\":15,\"name\":\"source\",\"connectors\":[{\"id\":15,\"type\":\"HIVE\",\"version\":\"1.2\",\"config\":{\"database\":\"default\",\"table.name\":\"demo_src\"}}]},{\"id\":16,\"name\":\"target\",\"connectors\":[{\"id\":16,\"type\":\"HIVE\",\"version\":\"1.2\",\"config\":{\"database\":\"default\",\"table.name\":\"demo_tgt\"}}]}]},{\"id\":9,\"
name\":\"measureName_test_edit\",\"description\":\"This is a test measure\",\"organization\":\"orgName\",\"evaluateRule\":{\"id\":14,\"rules\":[]},\"owner\":\"test\",\"deleted\":false,\"process.type\":\"batch\",\"data.sources\":[{\"id\":27,\"name\":null,\"connectors\":[]},{\"id\":28,\"name\":null,\"connectors\":[]}]},{\"id\":10,\"name\":\"measureName1\",\"description\":\"This is a test measure\",\"organization\":\"orgName\",\"evaluateRule\":{\"id\":19,\"rules\":[{\"id\":11,\"rule\":\"source.id==target.id\",\"dsl.type\":\"griffin-dsl\",\"dq.type\":\"accuracy\"}]},\"owner\":\"test\",\"deleted\":false,\"process.type\":\"batch\",\"data.sources\":[{\"id\":37,\"name\":\"source\",\"connectors\":[{\"id\":21,\"type\":\"HIVE\",\"version\":\"1.2\",\"config\":{\"database\":\"default\",\"table.name\":\"demo_src\"}}]},{\"id\":38,\"name\":\"target\",\"connectors\":[{\"id\":22,\"type\":\"HIVE\",\"version\":\"1.2\",\"config\":{\"database\":\"default\",\"table.name\":\"demo_tgt\"}}]}]}]",
+ "text": "[\n {\n \"measure.type\": \"griffin\",\n \"id\": 1,\n \"name\": \"measureName\",\n \"description\": \"measure description\",\n \"organization\": \"orgName\",\n \"owner\": \"test\",\n \"deleted\": false,\n \"dq.type\": \"accuracy\",\n \"process.type\": \"batch\",\n \"data.sources\": [\n {\n \"id\": 1,\n \"name\": \"source\",\n \"connectors\": [\n {\n \"id\": 1,\n \"name\": \"connector_name_source\",\n \"type\": \"HIVE\",\n \"version\": \"1.2\",\n \"predicates\": [\n {\n \"id\": 1,\n \"type\": \"file.exist\",\n \"config\": {\n \"root.path\": \"hdfs:///griffin/d
emo_src\",\n \"path\": \"/dt=#YYYYMMdd#/hour=#HH#/_DONE\"\n }\n }\n ],\n \"data.unit\": \"1h\",\n \"config\": {\n \"database\": \"default\",\n \"table.name\": \"demo_src\",\n \"where\": \"dt=#YYYYMMdd# AND hour=#HH#\"\n }\n }\n ]\n },\n {\n \"id\": 2,\n \"name\": \"target\",\n \"connectors\": [\n {\n \"id\": 2,\n \"name\": \"connector_name_target\",\n \"type\": \"HIVE\",\n \"version\": \"1.2\",\n \"predicates\": [\n {\n \"id\": 2,\n
\"type\": \"file.exist\",\n \"config\": {\n \"root.path\": \"hdfs:///griffin/demo_src\",\n \"path\": \"/dt=#YYYYMMdd#/hour=#HH#/_DONE\"\n }\n }\n ],\n \"data.unit\": \"1h\",\n \"config\": {\n \"database\": \"default\",\n \"table.name\": \"demo_src\",\n \"where\": \"dt=#YYYYMMdd# AND hour=#HH#\"\n }\n }\n ]\n }\n ],\n \"evaluate.rule\": {\n \"id\": 1,\n \"rules\": [\n {\n \"id\": 1,\n \"rule\": \"source.desc=target.desc\",\n \"name\": \"rule_name\",\n \"descri
ption\": \"Total count\",\n \"dsl.type\": \"griffin-dsl\",\n \"dq.type\": \"accuracy\",\n \"details\": {}\n }\n ]\n }\n },\n {\n \"measure.type\": \"external\",\n \"id\": 2,\n \"name\": \"external_name\",\n \"description\": \" test measure\",\n \"organization\": \"orgName\",\n \"owner\": \"test\",\n \"deleted\": false,\n \"metricName\": \"metricName\"\n }\n]",
"language": "json",
"rawDataType": "text",
"previewType": "text",
@@ -1106,9 +1104,24 @@
"isSample": true,
"scrollToResult": false,
"runTests": false,
- "request": "738b5d6d-4fea-85af-89a8-949468d3cde2",
- "owner": "503523",
- "requestObject": "{\"url\":\"{{BASE_PATH}}/api/v1/measures/owner/:owner\",\"pathVariables\":{\"owner\":\"test\"},\"pathVariableData\":[{\"key\":\"owner\",\"value\":\"test\"}],\"queryParams\":[],\"headerData\":[],\"headers\":\"\",\"data\":null,\"method\":\"GET\",\"dataMode\":\"params\"}"
+ "request": {
+ "url": "{{BASE_PATH}}/api/v1/measures/owner/:owner",
+ "pathVariables": {
+ "owner": "test"
+ },
+ "pathVariableData": [
+ {
+ "key": "owner",
+ "value": "test"
+ }
+ ],
+ "queryParams": [],
+ "headerData": [],
+ "headers": "",
+ "data": null,
+ "method": "GET",
+ "dataMode": "params"
+ }
}
],
"isFromCollection": true,
@@ -2301,7 +2314,7 @@
],
"cookies": [],
"mime": "",
- "text": "{\"id\":1,\"name\":\"measureName_test_edit\",\"description\":\"This is a test measure\",\"organization\":\"orgName\",\"evaluateRule\":{\"id\":20,\"rules\":[{\"id\":12,\"rule\":\"source.id = target.id and source.age = target.age and source.desc = target.desc\",\"dsl.type\":\"griffin-dsl\",\"dq.type\":\"accuracy\"}]},\"owner\":\"test\",\"deleted\":false,\"process.type\":\"batch\",\"data.sources\":[{\"id\":39,\"name\":\"source\",\"connectors\":[{\"id\":23,\"type\":\"HIVE\",\"version\":\"1.2\",\"config\":{\"database\":\"default\",\"table.name\":\"demo_src\"}}]},{\"id\":40,\"name\":\"target\",\"connectors\":[{\"id\":24,\"type\":\"HIVE\",\"version\":\"1.2\",\"config\":{\"database\":\"default\",\"table.name\":\"demo_tgt\"}}]}]}",
+ "text": "{\n \"measure.type\": \"griffin\",\n \"id\": 1,\n \"name\": \"measureName\",\n \"description\": \"measure description\",\n \"organization\": \"orgName\",\n \"owner\": \"test\",\n \"deleted\": false,\n \"dq.type\": \"accuracy\",\n \"process.type\": \"batch\",\n \"data.sources\": [\n {\n \"id\": 1,\n \"name\": \"source\",\n \"connectors\": [\n {\n \"id\": 1,\n \"name\": \"connector_name_source\",\n \"type\": \"HIVE\",\n \"version\": \"1.2\",\n \"predicates\": [\n {\n \"id\": 1,\n \"type\": \"file.exist\",\n \"config\": {\n \"root.path\": \"hdfs:///griffin/demo_src\",\n \"path\": \"/dt=#YYYYMMdd#/hour=#HH#/_DONE\"\n
}\n }\n ],\n \"data.unit\": \"1h\",\n \"config\": {\n \"database\": \"default\",\n \"table.name\": \"demo_src\",\n \"where\": \"dt=#YYYYMMdd# AND hour=#HH#\"\n }\n }\n ]\n },\n {\n \"id\": 2,\n \"name\": \"target\",\n \"connectors\": [\n {\n \"id\": 2,\n \"name\": \"connector_name_target\",\n \"type\": \"HIVE\",\n \"version\": \"1.2\",\n \"predicates\": [\n {\n \"id\": 2,\n \"type\": \"file.exist\",\n \"config\": {\n \"root.path\": \"hdfs:///griffin/demo_src\",\n \
"path\": \"/dt=#YYYYMMdd#/hour=#HH#/_DONE\"\n }\n }\n ],\n \"data.unit\": \"1h\",\n \"config\": {\n \"database\": \"default\",\n \"table.name\": \"demo_src\",\n \"where\": \"dt=#YYYYMMdd# AND hour=#HH#\"\n }\n }\n ]\n }\n ],\n \"evaluate.rule\": {\n \"id\": 1,\n \"rules\": [\n {\n \"id\": 1,\n \"rule\": \"source.desc=target.desc\",\n \"name\": \"rule_name\",\n \"description\": \"Total count\",\n \"dsl.type\": \"griffin-dsl\",\n \"dq.type\": \"accuracy\",\n \"details\": {}\n }\n ]\n }\n}",
"language": "json",
"rawDataType": "text",
"previewType": "text",
@@ -2332,8 +2345,7 @@
"data": null,
"method": "GET",
"dataMode": "params"
- },
- "owner": "503523"
+ }
}
],
"collection_id": "a743e1b9-583f-6bd7-e2ae-f03a1f807c63"
@@ -2804,6 +2816,7 @@
"headers": "",
"headerData": [],
"url": "{{BASE_PATH}}/api/v1/measures",
+ "folder": "523a7f9f-1970-018e-9241-57caa3d6ea60",
"queryParams": [],
"preRequestScript": null,
"pathVariables": {},
@@ -2814,8 +2827,8 @@
"version": 2,
"tests": null,
"currentHelper": "normal",
- "helperAttributes": "{}",
- "time": 1508997057521,
+ "helperAttributes": {},
+ "time": 1516340119702,
"name": "Get measures",
"description": "`GET /api/v1/measures`",
"collectionId": "a743e1b9-583f-6bd7-e2ae-f03a1f807c63",
@@ -2873,7 +2886,7 @@
],
"cookies": [],
"mime": "",
- "text": "[{\"id\":2,\"name\":\"measureName_test_edit\",\"description\":\"This is a test measure\",\"organization\":\"orgName\",\"evaluateRule\":{\"id\":18,\"rules\":[{\"id\":10,\"rule\":\"source.id==target.id\",\"dsl.type\":\"griffin-dsl\",\"dq.type\":\"accuracy\"}]},\"owner\":\"test\",\"deleted\":false,\"process.type\":\"batch\",\"data.sources\":[{\"id\":35,\"name\":\"source\",\"connectors\":[{\"id\":19,\"type\":\"HIVE\",\"version\":\"1.2\",\"config\":{\"database\":\"default\",\"table.name\":\"demo_src\"}}]},{\"id\":36,\"name\":\"target\",\"connectors\":[{\"id\":20,\"type\":\"HIVE\",\"version\":\"1.2\",\"config\":{\"database\":\"default\",\"table.name\":\"demo_tgt\"}}]}]},{\"id\":6,\"name\":\"third_measure\",\"description\":null,\"organization\":\"ebay\",\"evaluateRule\":{\"id\":6,\"rules\":[{\"id\":6,\"rule\":\"source.id=target.id AND source.age=target.age\",\"dsl.type\":\"griffin-dsl\",\"dq.type\":\"accuracy\"}]},\"owner\":\"test\",\"deleted\":false,\"process.type\":\"batch\
",\"data.sources\":[{\"id\":11,\"name\":\"source\",\"connectors\":[{\"id\":11,\"type\":\"HIVE\",\"version\":\"1.2\",\"config\":{\"database\":\"default\",\"table.name\":\"demo_src\"}}]},{\"id\":12,\"name\":\"target\",\"connectors\":[{\"id\":12,\"type\":\"HIVE\",\"version\":\"1.2\",\"config\":{\"database\":\"default\",\"table.name\":\"demo_tgt\"}}]}]},{\"id\":8,\"name\":\"measure1\",\"description\":null,\"organization\":\"test\",\"evaluateRule\":{\"id\":8,\"rules\":[{\"id\":8,\"rule\":\"source.age=target.age\",\"dsl.type\":\"griffin-dsl\",\"dq.type\":\"accuracy\"}]},\"owner\":\"test\",\"deleted\":false,\"process.type\":\"batch\",\"data.sources\":[{\"id\":15,\"name\":\"source\",\"connectors\":[{\"id\":15,\"type\":\"HIVE\",\"version\":\"1.2\",\"config\":{\"database\":\"default\",\"table.name\":\"demo_src\"}}]},{\"id\":16,\"name\":\"target\",\"connectors\":[{\"id\":16,\"type\":\"HIVE\",\"version\":\"1.2\",\"config\":{\"database\":\"default\",\"table.name\":\"demo_tgt\"}}]}]},{\"id\":9,\"
name\":\"measureName_test_edit\",\"description\":\"This is a test measure\",\"organization\":\"orgName\",\"evaluateRule\":{\"id\":14,\"rules\":[]},\"owner\":\"test\",\"deleted\":false,\"process.type\":\"batch\",\"data.sources\":[{\"id\":27,\"name\":null,\"connectors\":[]},{\"id\":28,\"name\":null,\"connectors\":[]}]},{\"id\":10,\"name\":\"measureName1\",\"description\":\"This is a test measure\",\"organization\":\"orgName\",\"evaluateRule\":{\"id\":19,\"rules\":[{\"id\":11,\"rule\":\"source.id==target.id\",\"dsl.type\":\"griffin-dsl\",\"dq.type\":\"accuracy\"}]},\"owner\":\"test\",\"deleted\":false,\"process.type\":\"batch\",\"data.sources\":[{\"id\":37,\"name\":\"source\",\"connectors\":[{\"id\":21,\"type\":\"HIVE\",\"version\":\"1.2\",\"config\":{\"database\":\"default\",\"table.name\":\"demo_src\"}}]},{\"id\":38,\"name\":\"target\",\"connectors\":[{\"id\":22,\"type\":\"HIVE\",\"version\":\"1.2\",\"config\":{\"database\":\"default\",\"table.name\":\"demo_tgt\"}}]}]}]",
+ "text": "[\n {\n \"id\": 1,\n \"name\": \"measureName\",\n \"description\": \"measure description\",\n \"organization\": \"orgName\",\n \"owner\": \"test\",\n \"deleted\": false,\n \"dq.type\": \"accuracy\",\n \"process.type\": \"batch\",\n \"data.sources\": [\n {\n \"id\": 1,\n \"name\": \"source\",\n \"connectors\": [\n {\n \"id\": 1,\n \"name\": \"connector_name_source\",\n \"type\": \"HIVE\",\n \"version\": \"1.2\",\n \"predicates\": [\n {\n \"id\": 1,\n \"type\": \"file.exist\",\n \"config\": {\n \"root.path\": \"hdfs:///griffin/demo_src\",\n
\"path\": \"/dt=#YYYYMMdd#/hour=#HH#/_DONE\"\n }\n }\n ],\n \"data.unit\": \"1h\",\n \"config\": {\n \"database\": \"default\",\n \"table.name\": \"demo_src\",\n \"where\": \"dt=#YYYYMMdd# AND hour=#HH#\"\n }\n }\n ]\n },\n {\n \"id\": 2,\n \"name\": \"target\",\n \"connectors\": [\n {\n \"id\": 2,\n \"name\": \"connector_name_target\",\n \"type\": \"HIVE\",\n \"version\": \"1.2\",\n \"predicates\": [\n {\n \"id\": 2,\n \"type\":
\"file.exist\",\n \"config\": {\n \"root.path\": \"hdfs:///griffin/demo_src\",\n \"path\": \"/dt=#YYYYMMdd#/hour=#HH#/_DONE\"\n }\n }\n ],\n \"data.unit\": \"1h\",\n \"config\": {\n \"database\": \"default\",\n \"table.name\": \"demo_src\",\n \"where\": \"dt=#YYYYMMdd# AND hour=#HH#\"\n }\n }\n ]\n }\n ],\n \"evaluate.rule\": {\n \"id\": 1,\n \"rules\": [\n {\n \"id\": 1,\n \"rule\": \"source.desc=target.desc\",\n \"name\": \"rule_name\",\n \"description\": \"Total count\",\n
\"dsl.type\": \"griffin-dsl\",\n \"dq.type\": \"accuracy\",\n \"details\": {}\n }\n ]\n },\n \"measure.type\": \"griffin\"\n },\n {\n \"id\": 2,\n \"name\": \"external_name\",\n \"description\": \" test measure\",\n \"organization\": \"orgName\",\n \"owner\": \"test\",\n \"deleted\": false,\n \"metricName\": \"metricName\",\n \"measure.type\": \"external\"\n }\n]",
"language": "json",
"rawDataType": "text",
"previewType": "text",
@@ -2887,14 +2900,19 @@
"isSample": true,
"scrollToResult": false,
"runTests": false,
- "request": "d4242bb8-d273-6bdd-588a-ec5367c3fe57",
- "owner": "503523",
- "requestObject": "{\"url\":\"{{BASE_PATH}}/api/v1/measures\",\"pathVariables\":{},\"pathVariableData\":[],\"queryParams\":[],\"headerData\":[],\"headers\":\"\",\"data\":null,\"method\":\"GET\",\"dataMode\":\"params\"}"
+ "request": {
+ "url": "{{BASE_PATH}}/api/v1/measures",
+ "pathVariables": {},
+ "pathVariableData": [],
+ "queryParams": [],
+ "headerData": [],
+ "headers": "",
+ "data": null,
+ "method": "GET",
+ "dataMode": "params"
+ }
}
- ],
- "collection_id": "a743e1b9-583f-6bd7-e2ae-f03a1f807c63",
- "isFromCollection": true,
- "folder": "523a7f9f-1970-018e-9241-57caa3d6ea60"
+ ]
},
{
"id": "f989dff6-0847-cc8a-0989-ccae76f33562",
http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/71fcf93b/service/src/main/java/org/apache/griffin/core/config/PropertiesConfig.java
----------------------------------------------------------------------
diff --git a/service/src/main/java/org/apache/griffin/core/config/PropertiesConfig.java b/service/src/main/java/org/apache/griffin/core/config/PropertiesConfig.java
index 95b8676..bfaba35 100644
--- a/service/src/main/java/org/apache/griffin/core/config/PropertiesConfig.java
+++ b/service/src/main/java/org/apache/griffin/core/config/PropertiesConfig.java
@@ -47,14 +47,14 @@ public class PropertiesConfig {
this.location = location;
}
- private String getPath(String defaultPath, String name) {
+ private String getPath(String defaultPath, String name) throws FileNotFoundException {
String path = defaultPath;
File file = new File(location);
LOGGER.info("File absolute path:" + file.getAbsolutePath());
File[] files = file.listFiles();
if (files == null || files.length == 0) {
LOGGER.error("The defaultPath {} does not exist.Please check your config in application.properties.", location);
- throw new NullPointerException();
+ throw new FileNotFoundException();
}
for (File f : files) {
if (f.getName().equals(name)) {
http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/71fcf93b/service/src/main/java/org/apache/griffin/core/job/FileExistPredicator.java
----------------------------------------------------------------------
diff --git a/service/src/main/java/org/apache/griffin/core/job/FileExistPredicator.java b/service/src/main/java/org/apache/griffin/core/job/FileExistPredicator.java
index a97d812..7354176 100644
--- a/service/src/main/java/org/apache/griffin/core/job/FileExistPredicator.java
+++ b/service/src/main/java/org/apache/griffin/core/job/FileExistPredicator.java
@@ -32,8 +32,8 @@ import static org.apache.griffin.core.job.JobInstance.PATH_CONNECTOR_CHARACTER;
public class FileExistPredicator implements Predicator {
private static final Logger LOGGER = LoggerFactory.getLogger(FileExistPredicator.class);
- public static final String PREDICT_PATH = "path";
- public static final String PREDICT_ROOT_PATH = "root.path";
+ private static final String PREDICT_PATH = "path";
+ private static final String PREDICT_ROOT_PATH = "root.path";
private SegmentPredicate predicate;
http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/71fcf93b/service/src/main/java/org/apache/griffin/core/job/JobInstance.java
----------------------------------------------------------------------
diff --git a/service/src/main/java/org/apache/griffin/core/job/JobInstance.java b/service/src/main/java/org/apache/griffin/core/job/JobInstance.java
index 0c8b554..ba0b1fb 100644
--- a/service/src/main/java/org/apache/griffin/core/job/JobInstance.java
+++ b/service/src/main/java/org/apache/griffin/core/job/JobInstance.java
@@ -52,10 +52,10 @@ import static org.quartz.TriggerKey.triggerKey;
@DisallowConcurrentExecution
public class JobInstance implements Job {
private static final Logger LOGGER = LoggerFactory.getLogger(JobInstance.class);
- static final String MEASURE_KEY = "measure";
- static final String PREDICATES_KEY = "predicts";
- static final String PREDICATE_JOB_NAME = "predicateJobName";
- static final String JOB_NAME = "jobName";
+ public static final String MEASURE_KEY = "measure";
+ public static final String PREDICATES_KEY = "predicts";
+ public static final String PREDICATE_JOB_NAME = "predicateJobName";
+ public static final String JOB_NAME = "jobName";
static final String PATH_CONNECTOR_CHARACTER = ",";
@Autowired
@@ -201,14 +201,14 @@ public class JobInstance implements Job {
* @param conf map with file predicate,data split and partitions info
* @param sampleTs collection of data split start timestamp
* @return all config data combine,like {"where": "year=2017 AND month=11 AND dt=15 AND hour=09,year=2017 AND month=11 AND dt=15 AND hour=10"}
- * or like {"path": "/year=#2017/month=11/dt=15/hour=09/_DONE,/year=#2017/month=11/dt=15/hour=10/_DONE"}
+ * or like {"path": "/year=2017/month=11/dt=15/hour=09/_DONE,/year=2017/month=11/dt=15/hour=10/_DONE"}
*/
private void genConfMap(Map<String, String> conf, Long[] sampleTs) {
for (Map.Entry<String, String> entry : conf.entrySet()) {
String value = entry.getValue();
Set<String> set = new HashSet<>();
for (Long timestamp : sampleTs) {
- set.add(TimeUtil.format(value, timestamp));
+ set.add(TimeUtil.format(value, timestamp,jobSchedule.getTimeZone()));
}
conf.put(entry.getKey(), StringUtils.join(set, PATH_CONNECTOR_CHARACTER));
}
http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/71fcf93b/service/src/main/java/org/apache/griffin/core/job/JobServiceImpl.java
----------------------------------------------------------------------
diff --git a/service/src/main/java/org/apache/griffin/core/job/JobServiceImpl.java b/service/src/main/java/org/apache/griffin/core/job/JobServiceImpl.java
index 92188b4..ef2fb9f 100644
--- a/service/src/main/java/org/apache/griffin/core/job/JobServiceImpl.java
+++ b/service/src/main/java/org/apache/griffin/core/job/JobServiceImpl.java
@@ -38,6 +38,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
+import org.springframework.dao.DataAccessException;
import org.springframework.data.domain.PageRequest;
import org.springframework.data.domain.Pageable;
import org.springframework.data.domain.Sort;
@@ -62,8 +63,8 @@ import static org.quartz.TriggerKey.triggerKey;
@Service
public class JobServiceImpl implements JobService {
private static final Logger LOGGER = LoggerFactory.getLogger(JobServiceImpl.class);
- static final String JOB_SCHEDULE_ID = "jobScheduleId";
- static final String GRIFFIN_JOB_ID = "griffinJobId";
+ public static final String JOB_SCHEDULE_ID = "jobScheduleId";
+ public static final String GRIFFIN_JOB_ID = "griffinJobId";
static final int MAX_PAGE_SIZE = 1024;
static final int DEFAULT_PAGE_SIZE = 10;
@@ -143,26 +144,22 @@ public class JobServiceImpl implements JobService {
public GriffinOperationMessage addJob(JobSchedule js) throws Exception {
Long measureId = js.getMeasureId();
GriffinMeasure measure = getMeasureIfValid(measureId);
- if (measure != null && addJob(js, measure)) {
+ if (measure != null) {
+ String qName = getQuartzName(js);
+ String qGroup = getQuartzGroupName();
+ TriggerKey triggerKey = triggerKey(qName, qGroup);
+ if (!isJobScheduleParamValid(js, measure) || factory.getObject().checkExists(triggerKey)) {
+ return CREATE_JOB_FAIL;
+ }
+ GriffinJob job = new GriffinJob(measure.getId(), js.getJobName(), qName, qGroup, false);
+ job = jobRepo.save(job);
+ js = jobScheduleRepo.save(js);
+ addJob(triggerKey, js, job);
return CREATE_JOB_SUCCESS;
}
return CREATE_JOB_FAIL;
}
- private boolean addJob(JobSchedule js, GriffinMeasure measure) throws Exception {
- String qName = getQuartzName(js);
- String qGroup = getQuartzGroupName();
- TriggerKey triggerKey = triggerKey(qName, qGroup);
- if (!isJobScheduleParamValid(js, measure) || factory.getObject().checkExists(triggerKey)) {
- return false;
- }
- GriffinJob job = new GriffinJob(measure.getId(), js.getJobName(), qName, qGroup, false);
- jobRepo.save(job);
- js = jobScheduleRepo.save(js);
- addJob(triggerKey, js, job);
- return true;
- }
-
private void addJob(TriggerKey triggerKey, JobSchedule js, GriffinJob job) throws Exception {
Scheduler scheduler = factory.getObject();
JobDetail jobDetail = addJobDetail(scheduler, triggerKey, js, job);
@@ -185,7 +182,7 @@ public class JobServiceImpl implements JobService {
return false;
}
List<String> names = getConnectorNames(measure);
- return isConnectorNamesValid(js.getSegments(), names);
+ return names != null && isConnectorNamesValid(js.getSegments(), names);
}
private boolean isJobNameValid(String jobName) {
@@ -212,11 +209,18 @@ public class JobServiceImpl implements JobService {
}
private boolean isConnectorNamesValid(List<JobDataSegment> segments, List<String> names) {
+ Set<String> dcSets = new HashSet<>();
for (JobDataSegment segment : segments) {
- if (!isConnectorNameValid(segment.getDataConnectorName(), names)) {
+ String dcName = segment.getDataConnectorName();
+ dcSets.add(dcName);
+ if (!isConnectorNameValid(dcName, names)) {
return false;
}
}
+ if (dcSets.size() < segments.size()) {
+ LOGGER.warn("Connector names in job data segment cannot be repeated.");
+ return false;
+ }
return true;
}
@@ -239,11 +243,11 @@ public class JobServiceImpl implements JobService {
sets.add(dc.getName());
});
}
- names.addAll(sets);
- if (names.size() < sets.size()) {
- LOGGER.error("Connector names cannot be repeated.");
- throw new IllegalArgumentException();
+ if (sets.size() < sources.size()) {
+ LOGGER.warn("Connector names cannot be repeated.");
+ return null;
}
+ names.addAll(sets);
return names;
}
@@ -403,7 +407,7 @@ public class JobServiceImpl implements JobService {
private boolean deleteJob(String group, String name) throws SchedulerException {
Scheduler scheduler = factory.getObject();
JobKey jobKey = new JobKey(name, group);
- if (scheduler.checkExists(jobKey)) {
+ if (!scheduler.checkExists(jobKey)) {
LOGGER.warn("Job({},{}) does not exist.", jobKey.getGroup(), jobKey.getName());
return true;
}
@@ -424,10 +428,11 @@ public class JobServiceImpl implements JobService {
LOGGER.info("Measure id {} has no related jobs.", measureId);
return true;
}
+ boolean status = true;
for (GriffinJob job : jobs) {
- deleteJob(job);
+ status = status && deleteJob(job);
}
- return true;
+ return status;
}
@Override
@@ -445,12 +450,13 @@ public class JobServiceImpl implements JobService {
@Scheduled(fixedDelayString = "${jobInstance.expired.milliseconds}")
public void deleteExpiredJobInstance() {
- List<JobInstanceBean> instances = jobInstanceRepo.findByExpireTmsLessThanEqual(System.currentTimeMillis());
+ Long timeMills = System.currentTimeMillis();
+ List<JobInstanceBean> instances = jobInstanceRepo.findByExpireTmsLessThanEqual(timeMills);
if (!pauseJob(instances)) {
LOGGER.error("Pause job failure.");
return;
}
- jobInstanceRepo.deleteByExpireTimestamp(System.currentTimeMillis());
+ jobInstanceRepo.deleteByExpireTimestamp(timeMills);
LOGGER.info("Delete expired job instances success.");
}
@@ -485,6 +491,8 @@ public class JobServiceImpl implements JobService {
LOGGER.error("Job instance json converts to map failed. {}", e.getMessage());
} catch (IllegalArgumentException e) {
LOGGER.error("Livy status is illegal. {}", e.getMessage());
+ } catch (Exception e) {
+ LOGGER.error("Sync job instances failure. {}",e.getMessage());
}
}
@@ -499,6 +507,7 @@ public class JobServiceImpl implements JobService {
instance.setAppUri(appUri);
}
jobInstanceRepo.save(instance);
+
}
}
http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/71fcf93b/service/src/main/java/org/apache/griffin/core/job/entity/GriffinJob.java
----------------------------------------------------------------------
diff --git a/service/src/main/java/org/apache/griffin/core/job/entity/GriffinJob.java b/service/src/main/java/org/apache/griffin/core/job/entity/GriffinJob.java
index 65d8e15..ee5e107 100644
--- a/service/src/main/java/org/apache/griffin/core/job/entity/GriffinJob.java
+++ b/service/src/main/java/org/apache/griffin/core/job/entity/GriffinJob.java
@@ -65,11 +65,11 @@ public class GriffinJob extends AbstractJob {
super();
}
- public GriffinJob(Long measureId, String jobName, String qJobName, String qGroupName, boolean deleted) {
+ public GriffinJob(Long measureId, String jobName, String quartzName, String quartzGroup, boolean deleted) {
super(measureId, jobName, deleted);
this.metricName = jobName;
- this.quartzName = qJobName;
- this.quartzGroup = qGroupName;
+ this.quartzName = quartzName;
+ this.quartzGroup = quartzGroup;
}
public GriffinJob(Long jobId, Long measureId, String jobName, String qJobName, String qGroupName, boolean deleted) {
http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/71fcf93b/service/src/main/java/org/apache/griffin/core/job/entity/JobDataSegment.java
----------------------------------------------------------------------
diff --git a/service/src/main/java/org/apache/griffin/core/job/entity/JobDataSegment.java b/service/src/main/java/org/apache/griffin/core/job/entity/JobDataSegment.java
index b0f81cb..e765702 100644
--- a/service/src/main/java/org/apache/griffin/core/job/entity/JobDataSegment.java
+++ b/service/src/main/java/org/apache/griffin/core/job/entity/JobDataSegment.java
@@ -40,7 +40,7 @@ public class JobDataSegment extends AbstractAuditableEntity {
@OneToOne(fetch = FetchType.EAGER, cascade = {CascadeType.PERSIST, CascadeType.REMOVE, CascadeType.MERGE})
@JoinColumn(name = "segment_range_id")
- private SegmentRange segmentRange;
+ private SegmentRange segmentRange = new SegmentRange();
@JsonProperty("as.baseline")
public Boolean getBaseline() {
@@ -80,7 +80,13 @@ public class JobDataSegment extends AbstractAuditableEntity {
}
public JobDataSegment(String dataConnectorName, boolean baseline) {
- this.dataConnectorName =dataConnectorName;
+ this.dataConnectorName = dataConnectorName;
+ this.baseline = baseline;
+ }
+
+ public JobDataSegment(String dataConnectorName, boolean baseline, SegmentRange segmentRange) {
+ this.dataConnectorName = dataConnectorName;
this.baseline = baseline;
+ this.segmentRange = segmentRange;
}
}
http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/71fcf93b/service/src/main/java/org/apache/griffin/core/job/entity/JobSchedule.java
----------------------------------------------------------------------
diff --git a/service/src/main/java/org/apache/griffin/core/job/entity/JobSchedule.java b/service/src/main/java/org/apache/griffin/core/job/entity/JobSchedule.java
index 1406b5e..d1dd44f 100644
--- a/service/src/main/java/org/apache/griffin/core/job/entity/JobSchedule.java
+++ b/service/src/main/java/org/apache/griffin/core/job/entity/JobSchedule.java
@@ -30,11 +30,7 @@ import org.apache.griffin.core.util.PropertiesUtil;
import org.quartz.CronExpression;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.beans.factory.annotation.Configurable;
-import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.core.io.ClassPathResource;
-import org.springframework.stereotype.Component;
import javax.persistence.*;
import javax.validation.constraints.NotNull;
@@ -155,7 +151,7 @@ public class JobSchedule extends AbstractAuditableEntity {
*/
private Map<String, Object> defaultPredicatesConfig() throws JsonProcessingException {
String path = "/application.properties";
- Properties appConf = PropertiesUtil.getProperties(path,new ClassPathResource(path));
+ Properties appConf = PropertiesUtil.getProperties(path, new ClassPathResource(path));
Map<String, Object> scheduleConf = new HashMap<>();
Map<String, Object> map = new HashMap<>();
map.put("interval", appConf.getProperty("predicate.job.interval"));
@@ -176,7 +172,7 @@ public class JobSchedule extends AbstractAuditableEntity {
public JobSchedule() throws JsonProcessingException {
}
- public JobSchedule(Long measureId, String jobName, String cronExpression,String timeZone, List<JobDataSegment> segments) throws JsonProcessingException {
+ public JobSchedule(Long measureId, String jobName, String cronExpression, String timeZone, List<JobDataSegment> segments) throws JsonProcessingException {
this.measureId = measureId;
this.jobName = jobName;
this.cronExpression = cronExpression;
http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/71fcf93b/service/src/main/java/org/apache/griffin/core/job/entity/SegmentPredicate.java
----------------------------------------------------------------------
diff --git a/service/src/main/java/org/apache/griffin/core/job/entity/SegmentPredicate.java b/service/src/main/java/org/apache/griffin/core/job/entity/SegmentPredicate.java
index 0f5a624..78b2794 100644
--- a/service/src/main/java/org/apache/griffin/core/job/entity/SegmentPredicate.java
+++ b/service/src/main/java/org/apache/griffin/core/job/entity/SegmentPredicate.java
@@ -78,4 +78,9 @@ public class SegmentPredicate extends AbstractAuditableEntity {
public SegmentPredicate() {
}
+
+ public SegmentPredicate(String type, Map configMap) throws JsonProcessingException {
+ this.type = type;
+ setConfigMap(configMap);
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/71fcf93b/service/src/main/java/org/apache/griffin/core/job/entity/SegmentRange.java
----------------------------------------------------------------------
diff --git a/service/src/main/java/org/apache/griffin/core/job/entity/SegmentRange.java b/service/src/main/java/org/apache/griffin/core/job/entity/SegmentRange.java
index b8ca5cf..5393f22 100644
--- a/service/src/main/java/org/apache/griffin/core/job/entity/SegmentRange.java
+++ b/service/src/main/java/org/apache/griffin/core/job/entity/SegmentRange.java
@@ -50,4 +50,12 @@ public class SegmentRange extends AbstractAuditableEntity {
this.length = length;
}
+ public SegmentRange(String begin, String length) {
+ this.begin = begin;
+ this.length = length;
+ }
+
+ SegmentRange() {
+ }
+
}
http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/71fcf93b/service/src/main/java/org/apache/griffin/core/measure/ExternalMeasureOperationImpl.java
----------------------------------------------------------------------
diff --git a/service/src/main/java/org/apache/griffin/core/measure/ExternalMeasureOperationImpl.java b/service/src/main/java/org/apache/griffin/core/measure/ExternalMeasureOperationImpl.java
index ca9aae1..ab04567 100644
--- a/service/src/main/java/org/apache/griffin/core/measure/ExternalMeasureOperationImpl.java
+++ b/service/src/main/java/org/apache/griffin/core/measure/ExternalMeasureOperationImpl.java
@@ -81,17 +81,17 @@ public class ExternalMeasureOperationImpl implements MeasureOperation {
}
@Override
- public Boolean delete(Measure measure) {
+ public GriffinOperationMessage delete(Measure measure) {
try {
ExternalMeasure em = (ExternalMeasure) measure;
em.setDeleted(true);
em.getVirtualJob().setDeleted(true);
measureRepo.save(em);
- return true;
+ return DELETE_MEASURE_BY_ID_SUCCESS;
} catch (Exception e) {
LOGGER.error("Failed to delete measure. {}", e.getMessage());
}
- return false;
+ return DELETE_MEASURE_BY_ID_FAIL;
}
http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/71fcf93b/service/src/main/java/org/apache/griffin/core/measure/GriffinMeasureOperationImpl.java
----------------------------------------------------------------------
diff --git a/service/src/main/java/org/apache/griffin/core/measure/GriffinMeasureOperationImpl.java b/service/src/main/java/org/apache/griffin/core/measure/GriffinMeasureOperationImpl.java
index f21b60d..b5d9805 100644
--- a/service/src/main/java/org/apache/griffin/core/measure/GriffinMeasureOperationImpl.java
+++ b/service/src/main/java/org/apache/griffin/core/measure/GriffinMeasureOperationImpl.java
@@ -77,14 +77,19 @@ public class GriffinMeasureOperationImpl implements MeasureOperation {
}
@Override
- public Boolean delete(Measure measure) {
- boolean pauseStatus = jobService.deleteJobsRelateToMeasure(measure.getId());
- if (!pauseStatus) {
- return false;
+ public GriffinOperationMessage delete(Measure measure) {
+ try {
+ boolean pauseStatus = jobService.deleteJobsRelateToMeasure(measure.getId());
+ if (!pauseStatus) {
+ return DELETE_MEASURE_BY_ID_FAIL;
+ }
+ measure.setDeleted(true);
+ measureRepo.save(measure);
+ return DELETE_MEASURE_BY_ID_SUCCESS;
+ } catch (Exception e) {
+ LOGGER.error(e.getMessage());
}
- measure.setDeleted(true);
- measureRepo.save(measure);
- return true;
+ return DELETE_MEASURE_BY_ID_FAIL;
}
private boolean isConnectorNamesValid(GriffinMeasure measure) {
http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/71fcf93b/service/src/main/java/org/apache/griffin/core/measure/MeasureOperation.java
----------------------------------------------------------------------
diff --git a/service/src/main/java/org/apache/griffin/core/measure/MeasureOperation.java b/service/src/main/java/org/apache/griffin/core/measure/MeasureOperation.java
index 80f1f30..81e9f06 100644
--- a/service/src/main/java/org/apache/griffin/core/measure/MeasureOperation.java
+++ b/service/src/main/java/org/apache/griffin/core/measure/MeasureOperation.java
@@ -29,6 +29,6 @@ public interface MeasureOperation {
GriffinOperationMessage update(Measure measure);
- Boolean delete(Measure measure);
+ GriffinOperationMessage delete(Measure measure);
}
http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/71fcf93b/service/src/main/java/org/apache/griffin/core/measure/MeasureOrgService.java
----------------------------------------------------------------------
diff --git a/service/src/main/java/org/apache/griffin/core/measure/MeasureOrgService.java b/service/src/main/java/org/apache/griffin/core/measure/MeasureOrgService.java
index 754f3d1..228d5bf 100644
--- a/service/src/main/java/org/apache/griffin/core/measure/MeasureOrgService.java
+++ b/service/src/main/java/org/apache/griffin/core/measure/MeasureOrgService.java
@@ -19,7 +19,6 @@ under the License.
package org.apache.griffin.core.measure;
-import java.io.Serializable;
import java.util.List;
import java.util.Map;
http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/71fcf93b/service/src/main/java/org/apache/griffin/core/measure/MeasureServiceImpl.java
----------------------------------------------------------------------
diff --git a/service/src/main/java/org/apache/griffin/core/measure/MeasureServiceImpl.java b/service/src/main/java/org/apache/griffin/core/measure/MeasureServiceImpl.java
index 34a780d..86dc9a9 100644
--- a/service/src/main/java/org/apache/griffin/core/measure/MeasureServiceImpl.java
+++ b/service/src/main/java/org/apache/griffin/core/measure/MeasureServiceImpl.java
@@ -94,15 +94,8 @@ public class MeasureServiceImpl implements MeasureService {
if (measure == null) {
return RESOURCE_NOT_FOUND;
}
- try {
- MeasureOperation op = getOperation(measure);
- if (op.delete(measure)) {
- return DELETE_MEASURE_BY_ID_SUCCESS;
- }
- } catch (Exception e) {
- LOGGER.error("Delete measure id: {} name: {} failure. {}", measure.getId(), measure.getName(), e.getMessage());
- }
- return DELETE_MEASURE_BY_ID_FAIL;
+ MeasureOperation op = getOperation(measure);
+ return op.delete(measure);
}
private MeasureOperation getOperation(Measure measure) {
http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/71fcf93b/service/src/main/java/org/apache/griffin/core/measure/entity/ExternalMeasure.java
----------------------------------------------------------------------
diff --git a/service/src/main/java/org/apache/griffin/core/measure/entity/ExternalMeasure.java b/service/src/main/java/org/apache/griffin/core/measure/entity/ExternalMeasure.java
index eb4a19d..2339df0 100644
--- a/service/src/main/java/org/apache/griffin/core/measure/entity/ExternalMeasure.java
+++ b/service/src/main/java/org/apache/griffin/core/measure/entity/ExternalMeasure.java
@@ -43,9 +43,10 @@ public class ExternalMeasure extends Measure {
super();
}
- public ExternalMeasure(String name, String description, String organization, String owner, String metricName) {
+ public ExternalMeasure(String name, String description, String organization, String owner, String metricName,VirtualJob vj) {
super(name, description, organization, owner);
this.metricName = metricName;
+ this.virtualJob = vj;
}
public String getMetricName() {
http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/71fcf93b/service/src/main/java/org/apache/griffin/core/measure/entity/GriffinMeasure.java
----------------------------------------------------------------------
diff --git a/service/src/main/java/org/apache/griffin/core/measure/entity/GriffinMeasure.java b/service/src/main/java/org/apache/griffin/core/measure/entity/GriffinMeasure.java
index c448c0b..6b060b5 100644
--- a/service/src/main/java/org/apache/griffin/core/measure/entity/GriffinMeasure.java
+++ b/service/src/main/java/org/apache/griffin/core/measure/entity/GriffinMeasure.java
@@ -34,13 +34,14 @@ import java.util.List;
@Entity
public class GriffinMeasure extends Measure {
+ private String dqType;
+
private String processType;
@Transient
@JsonInclude(JsonInclude.Include.NON_NULL)
private Long timestamp;
-
@NotNull
@OneToMany(fetch = FetchType.EAGER, cascade = {CascadeType.PERSIST, CascadeType.REMOVE, CascadeType.MERGE})
@JoinColumn(name = "measure_id")
@@ -51,6 +52,16 @@ public class GriffinMeasure extends Measure {
@JoinColumn(name = "evaluate_rule_id")
private EvaluateRule evaluateRule;
+ @JsonProperty("dq.type")
+ public String getDqType() {
+ return dqType;
+ }
+
+ @JsonProperty("dq.type")
+ public void setDqType(String dqType) {
+ this.dqType = dqType;
+ }
+
@JsonProperty("process.type")
public String getProcessType() {
return processType;
@@ -104,10 +115,9 @@ public class GriffinMeasure extends Measure {
super();
}
- public GriffinMeasure(Long measureId,String name, String description, String organization, String processType, String owner, List<DataSource> dataSources, EvaluateRule evaluateRule) {
- super(name, description, organization, owner);
- this.setId(measureId);
- this.processType = processType;
+ public GriffinMeasure(String name, String owner, List<DataSource> dataSources, EvaluateRule evaluateRule) {
+ this.name = name;
+ this.owner = owner;
this.dataSources = dataSources;
this.evaluateRule = evaluateRule;
}
http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/71fcf93b/service/src/main/java/org/apache/griffin/core/measure/entity/Measure.java
----------------------------------------------------------------------
diff --git a/service/src/main/java/org/apache/griffin/core/measure/entity/Measure.java b/service/src/main/java/org/apache/griffin/core/measure/entity/Measure.java
index cf2daec..a5c97a1 100644
--- a/service/src/main/java/org/apache/griffin/core/measure/entity/Measure.java
+++ b/service/src/main/java/org/apache/griffin/core/measure/entity/Measure.java
@@ -19,6 +19,7 @@ under the License.
package org.apache.griffin.core.measure.entity;
+import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.annotation.JsonSubTypes;
import com.fasterxml.jackson.annotation.JsonTypeInfo;
@@ -29,7 +30,7 @@ import javax.validation.constraints.NotNull;
@Entity
@Inheritance(strategy = InheritanceType.JOINED)
-@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, include = JsonTypeInfo.As.PROPERTY, property = "type")
+@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, include = JsonTypeInfo.As.PROPERTY, property = "measure.type")
@JsonSubTypes({@JsonSubTypes.Type(value = GriffinMeasure.class, name = "griffin"), @JsonSubTypes.Type(value = ExternalMeasure.class, name = "external")})
public abstract class Measure extends AbstractAuditableEntity {
private static final long serialVersionUID = -4748881017029815714L;
@@ -95,5 +96,6 @@ public abstract class Measure extends AbstractAuditableEntity {
this.owner = owner;
}
+ @JsonProperty("measure.type")
public abstract String getType();
}
http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/71fcf93b/service/src/main/java/org/apache/griffin/core/measure/entity/Rule.java
----------------------------------------------------------------------
diff --git a/service/src/main/java/org/apache/griffin/core/measure/entity/Rule.java b/service/src/main/java/org/apache/griffin/core/measure/entity/Rule.java
index f0c6516..8a61bfb 100644
--- a/service/src/main/java/org/apache/griffin/core/measure/entity/Rule.java
+++ b/service/src/main/java/org/apache/griffin/core/measure/entity/Rule.java
@@ -40,25 +40,23 @@ public class Rule extends AbstractAuditableEntity {
private String dqType;
- @Column(length = 10 * 1024)
+ @Column(length = 8 * 1024)
private String rule;
- @JsonIgnore
private String name;
- @JsonIgnore
+ @Column(length = 1024)
private String description;
@JsonIgnore
@Access(AccessType.PROPERTY)
- @Column(length = 10 * 1024)
+ @Column(length = 1024)
private String details;
@Transient
@JsonInclude(JsonInclude.Include.NON_NULL)
private Map<String, Object> detailsMap;
-
@JsonProperty("dsl.type")
public String getDslType() {
return dslType;
http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/71fcf93b/service/src/main/java/org/apache/griffin/core/measure/repo/MeasureRepo.java
----------------------------------------------------------------------
diff --git a/service/src/main/java/org/apache/griffin/core/measure/repo/MeasureRepo.java b/service/src/main/java/org/apache/griffin/core/measure/repo/MeasureRepo.java
index 976bec2..c88cd3a 100644
--- a/service/src/main/java/org/apache/griffin/core/measure/repo/MeasureRepo.java
+++ b/service/src/main/java/org/apache/griffin/core/measure/repo/MeasureRepo.java
@@ -42,14 +42,4 @@ public interface MeasureRepo<T extends Measure> extends CrudRepository<T, Long>
@Query("select m.name from #{#entityName} m " +
"where m.organization= ?1 and m.deleted= ?2")
List<String> findNameByOrganization(String organization, Boolean deleted);
-
- @Query("select m.organization from #{#entityName} m " +
- "where m.name= ?1")
- String findOrgByName(String measureName);
-
-// @Modifying
-// @Transactional
-// @Query("update Measure m "+
-// "set m.description= ?2,m.organization= ?3,m.source= ?4,m.target= ?5,m.evaluateRule= ?6 where m.id= ?1")
-// void update(Long Id, String description, String organization, DataConnector source, DataConnector target, EvaluateRule evaluateRule);
}
http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/71fcf93b/service/src/main/java/org/apache/griffin/core/util/AvroUtil.java
----------------------------------------------------------------------
diff --git a/service/src/main/java/org/apache/griffin/core/util/AvroUtil.java b/service/src/main/java/org/apache/griffin/core/util/AvroUtil.java
deleted file mode 100644
index e1fbf99..0000000
--- a/service/src/main/java/org/apache/griffin/core/util/AvroUtil.java
+++ /dev/null
@@ -1,30 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements. See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership. The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License. You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied. See the License for the
-specific language governing permissions and limitations
-under the License.
-*/
-
-package org.apache.griffin.core.util;
-
-import org.apache.avro.Schema;
-
-public class AvroUtil {
-
- public static Schema schemaOf(String schema) {
- return new Schema.Parser().parse(schema);
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/71fcf93b/service/src/main/java/org/apache/griffin/core/util/TimeUtil.java
----------------------------------------------------------------------
diff --git a/service/src/main/java/org/apache/griffin/core/util/TimeUtil.java b/service/src/main/java/org/apache/griffin/core/util/TimeUtil.java
index 859fe5b..5f4396a 100644
--- a/service/src/main/java/org/apache/griffin/core/util/TimeUtil.java
+++ b/service/src/main/java/org/apache/griffin/core/util/TimeUtil.java
@@ -23,10 +23,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.text.SimpleDateFormat;
-import java.util.ArrayList;
-import java.util.Date;
-import java.util.IllegalFormatException;
-import java.util.List;
+import java.util.*;
import java.util.concurrent.TimeUnit;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
@@ -55,8 +52,8 @@ public class TimeUtil {
list.add(group.toLowerCase());
}
long time = 0;
- for (int i = 0; i < list.size(); i++) {
- long t = milliseconds(list.get(i).toLowerCase());
+ for (String aList : list) {
+ long t = milliseconds(aList.toLowerCase());
if (positive) {
time += t;
} else {
@@ -67,23 +64,18 @@ public class TimeUtil {
}
private static Long milliseconds(String str) {
- try {
- if (str.endsWith("ms")) {
- return milliseconds(Long.parseLong(str.substring(0, str.length() - 2)), TimeUnit.MILLISECONDS);
- } else if (str.endsWith("s")) {
- return milliseconds(Long.parseLong(str.substring(0, str.length() - 1)), TimeUnit.SECONDS);
- } else if (str.endsWith("m")) {
- return milliseconds(Long.parseLong(str.substring(0, str.length() - 1)), TimeUnit.MINUTES);
- } else if (str.endsWith("h")) {
- return milliseconds(Long.parseLong(str.substring(0, str.length() - 1)), TimeUnit.HOURS);
- } else if (str.endsWith("d")) {
- return milliseconds(Long.parseLong(str.substring(0, str.length() - 1)), TimeUnit.DAYS);
- } else {
- LOGGER.error("Time string format error.It only supports d(day),h(hour),m(minute),s(second),ms(millsecond).Please check your time format.)");
- throw new IllegalArgumentException();
- }
- } catch (Exception e) {
- LOGGER.error("Parse exception occur. {}",e);
+ if (str.endsWith("ms")) {
+ return milliseconds(Long.parseLong(str.substring(0, str.length() - 2)), TimeUnit.MILLISECONDS);
+ } else if (str.endsWith("s")) {
+ return milliseconds(Long.parseLong(str.substring(0, str.length() - 1)), TimeUnit.SECONDS);
+ } else if (str.endsWith("m")) {
+ return milliseconds(Long.parseLong(str.substring(0, str.length() - 1)), TimeUnit.MINUTES);
+ } else if (str.endsWith("h")) {
+ return milliseconds(Long.parseLong(str.substring(0, str.length() - 1)), TimeUnit.HOURS);
+ } else if (str.endsWith("d")) {
+ return milliseconds(Long.parseLong(str.substring(0, str.length() - 1)), TimeUnit.DAYS);
+ } else {
+ LOGGER.error("Time string format error.It only supports d(day),h(hour),m(minute),s(second),ms(millsecond).Please check your time format.)");
return 0L;
}
}
@@ -92,7 +84,7 @@ public class TimeUtil {
return unit.toMillis(duration);
}
- public static String format(String timeFormat, long time) {
+ public static String format(String timeFormat, long time,String timeZone) {
String timePattern = "#(?:\\\\#|[^#])*#";
Date t = new Date(time);
Pattern ptn = Pattern.compile(timePattern);
@@ -103,11 +95,11 @@ public class TimeUtil {
String content = group.substring(1, group.length() - 1);
String pattern = refreshEscapeHashTag(content);
SimpleDateFormat sdf = new SimpleDateFormat(pattern);
+ sdf.setTimeZone(TimeZone.getTimeZone(timeZone));
matcher.appendReplacement(sb, sdf.format(t));
}
matcher.appendTail(sb);
- String endString = refreshEscapeHashTag(sb.toString());
- return endString;
+ return refreshEscapeHashTag(sb.toString());
}
private static String refreshEscapeHashTag(String str) {
[2/3] incubator-griffin git commit: fix transaction and schedule bug
and update ut
Posted by gu...@apache.org.
http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/71fcf93b/service/src/main/resources/application.properties
----------------------------------------------------------------------
diff --git a/service/src/main/resources/application.properties b/service/src/main/resources/application.properties
index b3f5e94..c7b57be 100644
--- a/service/src/main/resources/application.properties
+++ b/service/src/main/resources/application.properties
@@ -42,11 +42,11 @@ kafka.schema.registry.url = http://localhost:8081
# Update job instance state at regular intervals
jobInstance.fixedDelay.in.milliseconds = 60000
-# Expired time of job instance which is 7 days that is 604800000 milliseconds
+# Expired time of job instance which is 7 days that is 604800000 milliseconds.Time unit only supports milliseconds
jobInstance.expired.milliseconds = 604800000
# schedule predicate job every 5 minutes and repeat 12 times at most
-#interval unit m:minute h:hour d:day,only support these three units
+#interval time unit s:second m:minute h:hour d:day,only support these four units
predicate.job.interval = 5m
predicate.job.repeat.count = 12
http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/71fcf93b/service/src/main/resources/sparkJob.properties
----------------------------------------------------------------------
diff --git a/service/src/main/resources/sparkJob.properties b/service/src/main/resources/sparkJob.properties
index f9fd2f9..a9be693 100644
--- a/service/src/main/resources/sparkJob.properties
+++ b/service/src/main/resources/sparkJob.properties
@@ -18,13 +18,13 @@
#
# spark required
-sparkJob.file=hdfs://apollo-phx-nn-ha/apps/hdmi-technology/b_des/griffin/jar/griffin-measure.jar
+sparkJob.file=hdfs:///griffin/griffin-measure.jar
sparkJob.className=org.apache.griffin.measure.Application
-sparkJob.args_1=hdfs://apollo-phx-nn-ha/apps/hdmi-technology/b_des/griffin/conf/env.json
+sparkJob.args_1=hdfs:///griffin/json/env.json
sparkJob.args_3=hdfs,raw
sparkJob.name=griffin
-sparkJob.queue=hdlq-gdi-sla
+sparkJob.queue=default
# options
sparkJob.numExecutors=10
@@ -33,12 +33,12 @@ sparkJob.driverMemory=2g
sparkJob.executorMemory=2g
# shouldn't config in server, but in
-sparkJob.jars = hdfs://apollo-phx-nn-ha/apps/hdmi-technology/b_des/griffin/livy/spark-avro_2.11-2.0.1.jar;\
- hdfs://apollo-phx-nn-ha/apps/hdmi-technology/b_des/griffin/livy/datanucleus-api-jdo-3.2.6.jar;\
- hdfs://apollo-phx-nn-ha/apps/hdmi-technology/b_des/griffin/livy/datanucleus-core-3.2.10.jar;\
- hdfs://apollo-phx-nn-ha/apps/hdmi-technology/b_des/griffin/livy/datanucleus-rdbms-3.2.9.jar
+sparkJob.jars = hdfs://livy/spark-avro_2.11-2.0.1.jar;\
+ hdfs://livy/datanucleus-api-jdo-3.2.6.jar;\
+ hdfs://livy/datanucleus-core-3.2.10.jar;\
+ hdfs://livy/datanucleus-rdbms-3.2.9.jar
-spark.yarn.dist.files = hdfs://apollo-phx-nn-ha/apps/hdmi-technology/b_des/griffin/livy/hive-site.xml
+spark.yarn.dist.files = hdfs://livy/hive-site.xml
# livy
# livy.uri=http://10.9.246.187:8998/batches
http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/71fcf93b/service/src/test/java/org/apache/griffin/core/config/PropertiesConfigTest.java
----------------------------------------------------------------------
diff --git a/service/src/test/java/org/apache/griffin/core/config/PropertiesConfigTest.java b/service/src/test/java/org/apache/griffin/core/config/PropertiesConfigTest.java
new file mode 100644
index 0000000..65a8a1d
--- /dev/null
+++ b/service/src/test/java/org/apache/griffin/core/config/PropertiesConfigTest.java
@@ -0,0 +1,160 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+
+package org.apache.griffin.core.config;
+
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.beans.factory.annotation.Qualifier;
+import org.springframework.boot.test.context.TestConfiguration;
+import org.springframework.context.annotation.Bean;
+import org.springframework.test.context.junit4.SpringRunner;
+
+import java.io.FileNotFoundException;
+import java.util.Properties;
+
+import static org.junit.Assert.*;
+
+@RunWith(SpringRunner.class)
+//@TestPropertySource("classpath")
+public class PropertiesConfigTest {
+
+ @TestConfiguration
+ public static class PropertiesConf {
+
+ @Bean(name = "noLivyConf")
+ public PropertiesConfig noSparkConf() {
+ return new PropertiesConfig(null);
+ }
+
+ @Bean(name = "livyConf")
+ public PropertiesConfig sparkConf() {
+ return new PropertiesConfig("src/test/resources");
+ }
+
+ @Bean(name = "livyNotFoundConfig")
+ public PropertiesConfig sparkNotFoundConfig() {
+ return new PropertiesConfig("test");
+ }
+
+ @Bean(name = "noQuartzConf")
+ public PropertiesConfig noQuartzConf() {
+ return new PropertiesConfig(null);
+ }
+
+ @Bean(name = "quartzConf")
+ public PropertiesConfig quartzConf() {
+ return new PropertiesConfig("src/test/resources");
+ }
+
+ @Bean(name = "quartzNotFoundConfig")
+ public PropertiesConfig quartzNotFoundConfig() {
+ return new PropertiesConfig("test");
+ }
+ }
+
+ @Autowired
+ @Qualifier(value = "noLivyConf")
+ private PropertiesConfig noLivyConf;
+
+ @Autowired
+ @Qualifier(value = "livyConf")
+ private PropertiesConfig livyConf;
+
+ @Autowired
+ @Qualifier(value = "livyNotFoundConfig")
+ private PropertiesConfig livyNotFoundConfig;
+
+
+ @Autowired
+ @Qualifier(value = "noQuartzConf")
+ private PropertiesConfig noQuartzConf;
+
+ @Autowired
+ @Qualifier(value = "quartzConf")
+ private PropertiesConfig quartzConf;
+
+ @Autowired
+ @Qualifier(value = "quartzNotFoundConfig")
+ private PropertiesConfig quartzNotFoundConfig;
+
+ @Test
+ public void appConf() throws Exception {
+ Properties conf = noLivyConf.appConf();
+ assertEquals(conf.get("spring.datasource.username"),"test");
+ }
+
+ @Test
+ public void livyConfWithLocationNotNull() throws Exception {
+ Properties conf = livyConf.livyConf();
+ assertEquals(conf.get("sparkJob.name"),"test");
+ }
+
+ @Test
+ public void livyConfWithLocationNull() throws Exception {
+ Properties conf = noLivyConf.livyConf();
+ assertEquals(conf.get("sparkJob.name"),"test");
+ }
+
+ @Test
+ public void livyConfWithFileNotFoundException() throws Exception {
+ FileNotFoundException e = livyFileNotFoundException();
+ assert e != null;
+ }
+
+ @Test
+ public void quartzConfWithLocationNotNull() throws Exception {
+ Properties conf = quartzConf.quartzConf();
+ assertEquals(conf.get("org.quartz.scheduler.instanceName"),"spring-boot-quartz-test");
+ }
+
+ @Test
+ public void quartzConfWithLocationNull() throws Exception {
+ Properties conf = noQuartzConf.quartzConf();
+ assertEquals(conf.get("org.quartz.scheduler.instanceName"),"spring-boot-quartz-test");
+ }
+
+ @Test
+ public void quartzConfWithFileNotFoundException() throws Exception {
+ FileNotFoundException e = quartzFileNotFoundException();
+ assert e != null;
+ }
+
+ private FileNotFoundException livyFileNotFoundException() {
+ FileNotFoundException exception = null;
+ try {
+ livyNotFoundConfig.livyConf();
+ } catch (FileNotFoundException e) {
+ exception = e;
+ }
+ return exception;
+ }
+
+ private FileNotFoundException quartzFileNotFoundException() {
+ FileNotFoundException exception = null;
+ try {
+ quartzNotFoundConfig.livyConf();
+ } catch (FileNotFoundException e) {
+ exception = e;
+ }
+ return exception;
+ }
+
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/71fcf93b/service/src/test/java/org/apache/griffin/core/job/JobInstanceTest.java
----------------------------------------------------------------------
diff --git a/service/src/test/java/org/apache/griffin/core/job/JobInstanceTest.java b/service/src/test/java/org/apache/griffin/core/job/JobInstanceTest.java
new file mode 100644
index 0000000..6a39685
--- /dev/null
+++ b/service/src/test/java/org/apache/griffin/core/job/JobInstanceTest.java
@@ -0,0 +1,186 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+
+package org.apache.griffin.core.job;
+
+import org.apache.griffin.core.job.entity.*;
+import org.apache.griffin.core.job.repo.GriffinJobRepo;
+import org.apache.griffin.core.job.repo.JobScheduleRepo;
+import org.apache.griffin.core.measure.entity.GriffinMeasure;
+import org.apache.griffin.core.measure.repo.GriffinMeasureRepo;
+import org.apache.griffin.core.util.JsonUtil;
+import org.apache.griffin.core.util.PropertiesUtil;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.Matchers;
+import org.quartz.*;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.beans.factory.annotation.Qualifier;
+import org.springframework.boot.test.context.TestConfiguration;
+import org.springframework.boot.test.mock.mockito.MockBean;
+import org.springframework.context.annotation.Bean;
+import org.springframework.core.io.ClassPathResource;
+import org.springframework.scheduling.quartz.SchedulerFactoryBean;
+import org.springframework.test.context.junit4.SpringRunner;
+
+import javax.validation.constraints.AssertTrue;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Properties;
+
+import static org.apache.griffin.core.util.EntityHelper.*;
+import static org.junit.Assert.*;
+import static org.mockito.BDDMockito.given;
+import static org.mockito.Mockito.mock;
+
+@RunWith(SpringRunner.class)
+public class JobInstanceTest {
+
+ @TestConfiguration
+ public static class jobInstanceBean{
+ @Bean
+ public JobInstance instance() {
+ return new JobInstance();
+ }
+
+ @Bean(name = "appConf")
+ public Properties sparkJobProps() {
+ String path = "application.properties";
+ return PropertiesUtil.getProperties(path, new ClassPathResource(path));
+ }
+
+ @Bean
+ public SchedulerFactoryBean factoryBean() {
+ return new SchedulerFactoryBean();
+ }
+ }
+
+ @Autowired
+ private JobInstance jobInstance;
+
+ @Autowired
+ @Qualifier("appConf")
+ private Properties appConfProps;
+
+ @MockBean
+ private SchedulerFactoryBean factory;
+
+ @MockBean
+ private GriffinMeasureRepo measureRepo;
+
+ @MockBean
+ private GriffinJobRepo jobRepo;
+
+ @MockBean
+ private JobScheduleRepo jobScheduleRepo;
+
+
+
+ @Test
+ public void testExecute() throws Exception {
+ JobExecutionContext context = mock(JobExecutionContext.class);
+ Scheduler scheduler = mock(Scheduler.class);
+ GriffinMeasure measure = createGriffinMeasure("measureName");
+ JobDetail jd = createJobDetail(JsonUtil.toJson(measure), "");
+ JobSchedule jobSchedule = createJobSchedule();
+ GriffinJob job = new GriffinJob(1L, "jobName", "qName", "qGroup", false);
+ List<Trigger> triggers = Arrays.asList(createSimpleTrigger(2, 0));
+ given(context.getJobDetail()).willReturn(jd);
+ given(jobScheduleRepo.findOne(Matchers.anyLong())).willReturn(jobSchedule);
+ given(measureRepo.findOne(Matchers.anyLong())).willReturn(measure);
+ given(jobRepo.findOne(Matchers.anyLong())).willReturn(job);
+ given(factory.getObject()).willReturn(scheduler);
+ given((List<Trigger>)scheduler.getTriggersOfJob(Matchers.any(JobKey.class))).willReturn(triggers);
+ given(scheduler.checkExists(Matchers.any(TriggerKey.class))).willReturn(false);
+ given(jobRepo.save(Matchers.any(GriffinJob.class))).willReturn(job);
+ given(scheduler.checkExists(Matchers.any(JobKey.class))).willReturn(false);
+ jobInstance.execute(context);
+ }
+
+ @Test
+ public void testExecuteWithRangeLessThanZero() throws Exception {
+ JobExecutionContext context = mock(JobExecutionContext.class);
+ Scheduler scheduler = mock(Scheduler.class);
+ GriffinMeasure measure = createGriffinMeasure("measureName");
+ JobDetail jd = createJobDetail(JsonUtil.toJson(measure), "");
+ JobSchedule jobSchedule = createJobSchedule("jobName",new SegmentRange("-1h","-1h"));
+ GriffinJob job = new GriffinJob(1L, "jobName", "qName", "qGroup", false);
+ List<Trigger> triggers = Arrays.asList(createSimpleTrigger(2, 0));
+ given(context.getJobDetail()).willReturn(jd);
+ given(jobScheduleRepo.findOne(Matchers.anyLong())).willReturn(jobSchedule);
+ given(measureRepo.findOne(Matchers.anyLong())).willReturn(measure);
+ given(jobRepo.findOne(Matchers.anyLong())).willReturn(job);
+ given(factory.getObject()).willReturn(scheduler);
+ given((List<Trigger>)scheduler.getTriggersOfJob(Matchers.any(JobKey.class))).willReturn(triggers);
+ given(scheduler.checkExists(Matchers.any(TriggerKey.class))).willReturn(false);
+ given(jobRepo.save(Matchers.any(GriffinJob.class))).willReturn(job);
+ given(scheduler.checkExists(Matchers.any(JobKey.class))).willReturn(false);
+ jobInstance.execute(context);
+ }
+
+ @Test
+ public void testExecuteWithRangeGreaterThanDataUnit() throws Exception {
+ JobExecutionContext context = mock(JobExecutionContext.class);
+ Scheduler scheduler = mock(Scheduler.class);
+ GriffinMeasure measure = createGriffinMeasure("measureName");
+ JobDetail jd = createJobDetail(JsonUtil.toJson(measure), "");
+ JobSchedule jobSchedule = createJobSchedule("jobName",new SegmentRange("-1h","5h"));
+ GriffinJob job = new GriffinJob(1L, "jobName", "qName", "qGroup", false);
+ List<Trigger> triggers = Arrays.asList(createSimpleTrigger(2, 0));
+ given(context.getJobDetail()).willReturn(jd);
+ given(jobScheduleRepo.findOne(Matchers.anyLong())).willReturn(jobSchedule);
+ given(measureRepo.findOne(Matchers.anyLong())).willReturn(measure);
+ given(jobRepo.findOne(Matchers.anyLong())).willReturn(job);
+ given(factory.getObject()).willReturn(scheduler);
+ given((List<Trigger>)scheduler.getTriggersOfJob(Matchers.any(JobKey.class))).willReturn(triggers);
+ given(scheduler.checkExists(Matchers.any(TriggerKey.class))).willReturn(false);
+ given(jobRepo.save(Matchers.any(GriffinJob.class))).willReturn(job);
+ given(scheduler.checkExists(Matchers.any(JobKey.class))).willReturn(false);
+ jobInstance.execute(context);
+ }
+
+ @Test
+ public void testExecuteWithPredicate() throws Exception {
+ JobExecutionContext context = mock(JobExecutionContext.class);
+ Scheduler scheduler = mock(Scheduler.class);
+ GriffinMeasure measure = createGriffinMeasure("measureName",createFileExistPredicate(),createFileExistPredicate());
+ JobDetail jd = createJobDetail(JsonUtil.toJson(measure), "");
+ JobSchedule jobSchedule = createJobSchedule("jobName");
+ GriffinJob job = new GriffinJob(1L, "jobName", "qName", "qGroup", false);
+ List<Trigger> triggers = Arrays.asList(createSimpleTrigger(2, 0));
+ given(context.getJobDetail()).willReturn(jd);
+ given(jobScheduleRepo.findOne(Matchers.anyLong())).willReturn(jobSchedule);
+ given(measureRepo.findOne(Matchers.anyLong())).willReturn(measure);
+ given(jobRepo.findOne(Matchers.anyLong())).willReturn(job);
+ given(factory.getObject()).willReturn(scheduler);
+ given((List<Trigger>)scheduler.getTriggersOfJob(Matchers.any(JobKey.class))).willReturn(triggers);
+ given(scheduler.checkExists(Matchers.any(TriggerKey.class))).willReturn(false);
+ given(jobRepo.save(Matchers.any(GriffinJob.class))).willReturn(job);
+ given(scheduler.checkExists(Matchers.any(JobKey.class))).willReturn(false);
+ jobInstance.execute(context);
+ }
+
+ @Test
+ public void testExecuteWithNullException() throws Exception {
+ JobExecutionContext context = mock(JobExecutionContext.class);
+ jobInstance.execute(context);
+ assertTrue(true);
+ }
+
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/71fcf93b/service/src/test/java/org/apache/griffin/core/job/JobServiceImplTest.java
----------------------------------------------------------------------
diff --git a/service/src/test/java/org/apache/griffin/core/job/JobServiceImplTest.java b/service/src/test/java/org/apache/griffin/core/job/JobServiceImplTest.java
index 988a188..d529753 100644
--- a/service/src/test/java/org/apache/griffin/core/job/JobServiceImplTest.java
+++ b/service/src/test/java/org/apache/griffin/core/job/JobServiceImplTest.java
@@ -1,374 +1,560 @@
-///*
-//Licensed to the Apache Software Foundation (ASF) under one
-//or more contributor license agreements. See the NOTICE file
-//distributed with this work for additional information
-//regarding copyright ownership. The ASF licenses this file
-//to you under the Apache License, Version 2.0 (the
-//"License"); you may not use this file except in compliance
-//with the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-//Unless required by applicable law or agreed to in writing,
-//software distributed under the License is distributed on an
-//"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-//KIND, either express or implied. See the License for the
-//specific language governing permissions and limitations
-//under the License.
-//*/
-//
-//package org.apache.griffin.core.job;
-//
-//import org.apache.griffin.core.error.exception.GriffinException;
-//import org.apache.griffin.core.job.entity.GriffinJob;
-//import org.apache.griffin.core.job.entity.JobInstanceBean;
-//import org.apache.griffin.core.job.entity.LivySessionStates;
-//import org.apache.griffin.core.job.repo.JobInstanceRepo;
-//import org.apache.griffin.core.job.repo.JobRepo;
-//import org.apache.griffin.core.job.repo.JobScheduleRepo;
-//import org.apache.griffin.core.measure.repo.MeasureRepo;
-//import org.apache.griffin.core.util.GriffinOperationMessage;
-//import org.apache.griffin.core.util.PropertiesUtil;
-//import org.junit.Before;
-//import org.junit.Test;
-//import org.junit.runner.RunWith;
-//import org.mockito.Matchers;
-//import org.mockito.Mockito;
-//import org.mockito.internal.util.reflection.Whitebox;
-//import org.quartz.*;
-//import org.quartz.impl.JobDetailImpl;
-//import org.quartz.impl.triggers.SimpleTriggerImpl;
-//import org.springframework.beans.factory.annotation.Autowired;
-//import org.springframework.boot.test.context.TestConfiguration;
-//import org.springframework.boot.test.mock.mockito.MockBean;
-//import org.springframework.context.annotation.Bean;
-//import org.springframework.core.io.ClassPathResource;
-//import org.springframework.scheduling.quartz.SchedulerFactoryBean;
-//import org.springframework.test.context.junit4.SpringRunner;
-//import org.springframework.web.client.RestTemplate;
-//
-//import java.util.*;
-//
-//import static org.junit.Assert.assertEquals;
-//import static org.mockito.BDDMockito.given;
-//import static org.mockito.Mockito.doNothing;
-//import static org.mockito.Mockito.mock;
-//import static org.quartz.TriggerBuilder.newTrigger;
-//
-//@RunWith(SpringRunner.class)
-//public class JobServiceImplTest {
-//
-// @TestConfiguration
-// public static class SchedulerServiceConfiguration {
-// @Bean
-// public JobServiceImpl service() {
-// return new JobServiceImpl();
-// }
-//
-// @Bean
-// public SchedulerFactoryBean factoryBean() {
-// return new SchedulerFactoryBean();
-// }
-// }
-//
-// @MockBean
-// private JobScheduleRepo jobScheduleRepo;
-//
-// @MockBean
-// private MeasureRepo measureRepo;
-//
-// @MockBean
-// private JobRepo<GriffinJob> jobRepo;
-// @MockBean
-// private JobInstanceRepo jobInstanceRepo;
-//
-// @MockBean
-// private SchedulerFactoryBean factory;
-//
-// @MockBean
-// private Properties sparkJobProps;
-//
-// @MockBean
-// private RestTemplate restTemplate;
-//
-// @Autowired
-// private JobServiceImpl service;
-//
-//
-// @Before
-// public void setup() {
-//
-// }
-//
-// @Test
-// public void testGetAliveJobsForNormalRun() throws SchedulerException {
-// Scheduler scheduler = Mockito.mock(Scheduler.class);
-// GriffinJob job = new GriffinJob(1L, 1L, "jobName", "quartzJobName", "quartzGroupName", false);
-// given(factory.getObject()).willReturn(scheduler);
-// given(jobRepo.findByDeleted(false)).willReturn(Arrays.asList(job));
-// JobKey jobKey = new JobKey(job.getQuartzName(), job.getQuartzGroup());
-// SimpleTrigger trigger = new SimpleTriggerImpl();
-// List<Trigger> triggers = new ArrayList<>();
-// triggers.add(trigger);
-// given((List<Trigger>) scheduler.getTriggersOfJob(jobKey)).willReturn(triggers);
-// assertEquals(service.getAliveJobs().size(), 1);
-// }
-//
-// @Test
-// public void testGetAliveJobsForNoJobsWithTriggerEmpty() throws SchedulerException {
-// Scheduler scheduler = Mockito.mock(Scheduler.class);
-// GriffinJob job = new GriffinJob(1L, 1L, "jobName", "quartzJobName", "quartzGroupName", false);
-// given(factory.getObject()).willReturn(scheduler);
-// given(jobRepo.findByDeleted(false)).willReturn(Arrays.asList(job));
-// JobKey jobKey = new JobKey(job.getQuartzName(), job.getQuartzGroup());
-// List<Trigger> triggers = new ArrayList<>();
-// given((List<Trigger>) scheduler.getTriggersOfJob(jobKey)).willReturn(triggers);
-// assertEquals(service.getAliveJobs().size(), 0);
-// }
-//
-//
-//// @Test
-//// public void testAddJobForSuccess() throws Exception {
-//// JobRequestBody jobRequestBody = new JobRequestBody("YYYYMMdd-HH", "YYYYMMdd-HH",
-//// String.valueOf(System.currentTimeMillis()), String.valueOf(System.currentTimeMillis()), "1000");
-//// Scheduler scheduler = Mockito.mock(Scheduler.class);
-//// given(factory.getObject()).willReturn(scheduler);
-//// given(measureRepo.findOne(1L)).willReturn(createATestGriffinMeasure("measureName","org"));
-//// assertEquals(service.addJob("BA", "jobName", 1L, jobRequestBody), GriffinOperationMessage.CREATE_JOB_SUCCESS);
-//// }
-////
-//// @Test
-//// public void testAddJobForFailWithFormatError() {
-//// JobRequestBody jobRequestBody = new JobRequestBody();
-//// Scheduler scheduler = Mockito.mock(Scheduler.class);
-//// given(factory.getObject()).willReturn(scheduler);
-//// assertEquals(service.addJob("BA", "jobName", 0L, jobRequestBody), GriffinOperationMessage.CREATE_JOB_FAIL);
-//// }
-////
-//// @Test
-//// public void testAddJobForFailWithTriggerKeyExist() throws SchedulerException {
-//// String groupName = "BA";
-//// String jobName = "jobName";
-//// JobRequestBody jobRequestBody = new JobRequestBody("YYYYMMdd-HH", "YYYYMMdd-HH",
-//// String.valueOf(System.currentTimeMillis()), String.valueOf(System.currentTimeMillis()), "1000");
-//// Scheduler scheduler = Mockito.mock(Scheduler.class);
-//// given(factory.getObject()).willReturn(scheduler);
-//// given(scheduler.checkExists(TriggerKey.triggerKey(jobName, groupName))).willReturn(true);
-//// assertEquals(service.addJob(groupName, jobName, 0L, jobRequestBody), GriffinOperationMessage.CREATE_JOB_FAIL);
-//// }
-////
-//// @Test
-//// public void testAddJobForFailWithScheduleException() throws SchedulerException {
-//// String groupName = "BA";
-//// String jobName = "jobName";
-//// JobRequestBody jobRequestBody = new JobRequestBody("YYYYMMdd-HH", "YYYYMMdd-HH",
-//// String.valueOf(System.currentTimeMillis()), String.valueOf(System.currentTimeMillis()), "1000");
-//// Scheduler scheduler = Mockito.mock(Scheduler.class);
-//// given(factory.getObject()).willReturn(scheduler);
-//// Trigger trigger = newTrigger().withIdentity(TriggerKey.triggerKey(jobName, groupName)).build();
-//// given(scheduler.scheduleJob(trigger)).willThrow(SchedulerException.class);
-//// assertEquals(service.addJob(groupName, jobName, 0L, jobRequestBody), GriffinOperationMessage.CREATE_JOB_FAIL);
-//// }
-//
-// @Test
-// public void testDeleteJobForJobIdSuccess() throws SchedulerException {
-// Long jobId = 1L;
-//// GriffinJob job = new GriffinJob(1L, "jobName", "quartzJobName", "quartzGroupName", "pJobName", "pGroupName", false);
-//// Scheduler scheduler = Mockito.mock(Scheduler.class);
-//// JobKey jobKey = new JobKey(job.getQuartzJobName(), job.getQuartzGroupName());
-//// JobKey pJobKey = new JobKey(job.getJobName(), job.getGroupName());
-//// given(factory.getObject()).willReturn(scheduler);
-//// given(scheduler.checkExists(pJobKey)).willReturn(true);
-//// given(scheduler.checkExists(jobKey)).willReturn(true);
-//// doNothing().when(scheduler).pauseJob(pJobKey);
-//// doNothing().when(scheduler).pauseJob(jobKey);
-//// given(jobRepo.findByIdAndDeleted(jobId, false)).willReturn(job);
-//// assertEquals(service.deleteJob(jobId), GriffinOperationMessage.DELETE_JOB_SUCCESS);
-// }
-//
-// @Test
-// public void testDeleteJobForJobIdFailureWithNull() throws SchedulerException {
-// Long jobId = 1L;
-// given(jobRepo.findByIdAndDeleted(jobId, false)).willReturn(null);
-// assertEquals(service.deleteJob(jobId), GriffinOperationMessage.DELETE_JOB_FAIL);
-// }
-//
-// @Test
-// public void testDeleteJobForJobIdFailureWithTriggerNotExist() throws SchedulerException {
-// Long jobId = 1L;
-// GriffinJob job = new GriffinJob(1L, "jobName", "quartzJobName", "quartzGroupName", false);
-// Scheduler scheduler = Mockito.mock(Scheduler.class);
-// JobKey jobKey = new JobKey(job.getQuartzName(), job.getQuartzGroup());
-// given(factory.getObject()).willReturn(scheduler);
-// given(scheduler.checkExists(jobKey)).willReturn(false);
-// assertEquals(service.deleteJob(jobId), GriffinOperationMessage.DELETE_JOB_FAIL);
-// }
-//
-//
-// @Test
-// public void testDeleteJobForJobNameSuccess() throws SchedulerException {
-// GriffinJob job = new GriffinJob(1L, 1L, "jobName", "quartzJobName", "quartzGroupName", false);
-// Scheduler scheduler = Mockito.mock(Scheduler.class);
-// JobKey jobKey = new JobKey(job.getQuartzName(), job.getQuartzGroup());
-//// given(jobRepo.findByJobNameAndDeleted(job.getJobName(), false)).willReturn(Arrays.asList(job));
-// given(factory.getObject()).willReturn(scheduler);
-// given(scheduler.checkExists(jobKey)).willReturn(true);
-// doNothing().when(scheduler).pauseJob(jobKey);
-// assertEquals(service.deleteJob(job.getJobName()), GriffinOperationMessage.DELETE_JOB_SUCCESS);
-// }
-//
-// @Test
-// public void testDeleteJobForJobNameFailureWithNull() throws SchedulerException {
-// String jobName = "jobName";
-//// given(jobRepo.findByJobNameAndDeleted(jobName, false)).willReturn(new ArrayList<>());
-// assertEquals(service.deleteJob(jobName), GriffinOperationMessage.DELETE_JOB_FAIL);
-// }
-//
-// @Test
-// public void testDeleteJobForJobNameFailureWithTriggerNotExist() throws SchedulerException {
-// GriffinJob job = new GriffinJob(1L, 1L, "jobName", "quartzJobName", "quartzGroupName", false);
-// Scheduler scheduler = Mockito.mock(Scheduler.class);
-// JobKey jobKey = new JobKey(job.getQuartzName(), job.getQuartzGroup());
-//// given(jobRepo.findByJobNameAndDeleted(job.getJobName(), false)).willReturn(Arrays.asList(job));
-// given(factory.getObject()).willReturn(scheduler);
-// given(scheduler.checkExists(jobKey)).willReturn(false);
-// assertEquals(service.deleteJob(job.getJobName()), GriffinOperationMessage.DELETE_JOB_FAIL);
-// }
-//
-//// @Test
-//// public void testFindInstancesOfJobForSuccess() throws SchedulerException {
-//// Long jobId = 1L;
-//// int page = 0;
-//// int size = 2;
-//// GriffinJob job = new GriffinJob(1L, "jobName", "quartzJobName", "quartzGroupName", false);
-//// JobInstanceBean jobInstance = new JobInstanceBean(1L, LivySessionStates.State.dead, "app_id", "app_uri", System.currentTimeMillis(), System.currentTimeMillis());
-//// Pageable pageRequest = new PageRequest(page, size, Sort.Direction.DESC, "timestamp");
-//// given(jobRepo.findByIdAndDeleted(jobId, false)).willReturn(job);
-//// given(jobInstanceRepo.findByJobId(1L, pageRequest)).willReturn(Arrays.asList(jobInstance));
-//// assertEquals(service.findInstancesOfJob(1L, page, size).size(), 1);
-//// }
-////
-//// @Test
-//// public void testFindInstancesOfJobForNull() throws SchedulerException {
-//// Long jobId = 1L;
-//// given(jobRepo.findByIdAndDeleted(jobId, false)).willReturn(null);
-//// assertEquals(service.findInstancesOfJob(jobId, 0, 2).size(), 0);
-//// }
-////
-//// @Test
-//// public void testSyncInstancesOfJobForSuccess() {
-//// JobInstanceBean instance = createJobInstance();
-//// given(jobInstanceRepo.findByActiveState()).willReturn(Arrays.asList(instance));
-//// Whitebox.setInternalState(service, "restTemplate", restTemplate);
-//// String result = "{\"id\":1,\"state\":\"starting\",\"appId\":123,\"appInfo\":{\"driverLogUrl\":null,\"sparkUiUrl\":null},\"log\":[]}";
-//// given(restTemplate.getForObject(Matchers.anyString(), Matchers.any())).willReturn(result);
-//// service.syncInstancesOfAllJobs();
-//// }
-//
-// @Test
-// public void testSyncInstancesOfJobForRestClientException() {
-// JobInstanceBean instance = createJobInstance();
-// instance.setSessionId(1234564L);
-// String path = "/sparkJob.properties";
-// given(jobInstanceRepo.findByActiveState()).willReturn(Arrays.asList(instance));
-// given(sparkJobProps.getProperty("livy.uri")).willReturn(PropertiesUtil.getProperties(path,new ClassPathResource(path)).getProperty("livy.uri"));
-// service.syncInstancesOfAllJobs();
-// }
-//
-// @Test
-// public void testSyncInstancesOfJobForIOException() throws Exception {
-// JobInstanceBean instance = createJobInstance();
-// given(jobInstanceRepo.findByActiveState()).willReturn(Arrays.asList(instance));
-// Whitebox.setInternalState(service, "restTemplate", restTemplate);
-// given(restTemplate.getForObject(Matchers.anyString(), Matchers.any())).willReturn("result");
-// service.syncInstancesOfAllJobs();
-// }
-//
-// @Test
-// public void testSyncInstancesOfJobForIllegalArgumentException() throws Exception {
-// JobInstanceBean instance = createJobInstance();
-// given(jobInstanceRepo.findByActiveState()).willReturn(Arrays.asList(instance));
-// Whitebox.setInternalState(service, "restTemplate", restTemplate);
-// given(restTemplate.getForObject(Matchers.anyString(), Matchers.any())).willReturn("{\"state\":\"wrong\"}");
-// service.syncInstancesOfAllJobs();
-// }
-//
-//// @Test
-//// public void testGetHealthInfoWithHealthy() throws SchedulerException {
-//// Scheduler scheduler = Mockito.mock(Scheduler.class);
-//// GriffinJob job = new GriffinJob(1L, 1L, "jobName", "quartzJobName", "quartzGroupName", false);
-//// given(factory.getObject()).willReturn(scheduler);
-//// given(jobRepo.findByDeleted(false)).willReturn(Arrays.asList(job));
-//// JobKey jobKey = new JobKey(job.getQuartzJobName(), job.getQuartzGroupName());
-//// SimpleTrigger trigger = new SimpleTriggerImpl();
-//// List<Trigger> triggers = new ArrayList<>();
-//// triggers.add(trigger);
-//// given((List<Trigger>) scheduler.getTriggersOfJob(jobKey)).willReturn(triggers);
-////
-//// Pageable pageRequest = new PageRequest(0, 1, Sort.Direction.DESC, "timestamp");
-//// List<JobInstanceBean> scheduleStateList = new ArrayList<>();
-//// scheduleStateList.add(createJobInstance());
-//// given(jobInstanceRepo.findByJobId(1L, pageRequest)).willReturn(scheduleStateList);
-//// assertEquals(service.getHealthInfo().getHealthyJobCount(), 1);
-////
-//// }
-////
-//// @Test
-//// public void testGetHealthInfoWithUnhealthy() throws SchedulerException {
-//// Scheduler scheduler = Mockito.mock(Scheduler.class);
-//// GriffinJob job = new GriffinJob(1L, 1L, "jobName", "quartzJobName", "quartzGroupName", false);
-//// given(factory.getObject()).willReturn(scheduler);
-//// given(jobRepo.findByDeleted(false)).willReturn(Arrays.asList(job));
-//// JobKey jobKey = new JobKey(job.getQuartzJobName(), job.getQuartzGroupName());
-//// SimpleTrigger trigger = new SimpleTriggerImpl();
-//// List<Trigger> triggers = new ArrayList<>();
-//// triggers.add(trigger);
-//// given((List<Trigger>) scheduler.getTriggersOfJob(jobKey)).willReturn(triggers);
-////
-//// Pageable pageRequest = new PageRequest(0, 1, Sort.Direction.DESC, "timestamp");
-//// List<JobInstanceBean> scheduleStateList = new ArrayList<>();
-//// JobInstanceBean instance = createJobInstance();
-//// instance.setState(LivySessionStates.State.error);
-//// scheduleStateList.add(instance);
-//// given(jobInstanceRepo.findByJobId(1L, pageRequest)).willReturn(scheduleStateList);
-//// assertEquals(service.getHealthInfo().getHealthyJobCount(), 0);
-//// }
-//
-// private void mockJsonDataMap(Scheduler scheduler, JobKey jobKey, Boolean deleted) throws SchedulerException {
-// JobDataMap jobDataMap = mock(JobDataMap.class);
-// JobDetailImpl jobDetail = new JobDetailImpl();
-// jobDetail.setJobDataMap(jobDataMap);
-// given(scheduler.getJobDetail(jobKey)).willReturn(jobDetail);
-// given(jobDataMap.getBooleanFromString("deleted")).willReturn(deleted);
-// }
-//
-// private Trigger newTriggerInstance(String name, String group, int internalInSeconds) {
-// return newTrigger().withIdentity(TriggerKey.triggerKey(name, group)).
-// withSchedule(SimpleScheduleBuilder.simpleSchedule()
-// .withIntervalInSeconds(internalInSeconds)
-// .repeatForever()).startAt(new Date()).build();
-// }
-//
-//
-// private GriffinException.GetJobsFailureException getTriggersOfJobExpectException(Scheduler scheduler, JobKey jobKey) {
-// GriffinException.GetJobsFailureException exception = null;
-// try {
-// given(scheduler.getTriggersOfJob(jobKey)).willThrow(new GriffinException.GetJobsFailureException());
-// service.getAliveJobs();
-// } catch (GriffinException.GetJobsFailureException e) {
-// exception = e;
-// } catch (SchedulerException e) {
-// e.printStackTrace();
-// }
-// return exception;
-// }
-//
-// private JobInstanceBean createJobInstance() {
-// JobInstanceBean jobBean = new JobInstanceBean();
-// jobBean.setSessionId(1L);
-// jobBean.setState(LivySessionStates.State.starting);
-// jobBean.setAppId("app_id");
-// jobBean.setTms(System.currentTimeMillis());
-// return jobBean;
-// }
-//}
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+
+package org.apache.griffin.core.job;
+
+import org.apache.griffin.core.error.exception.GriffinException;
+import org.apache.griffin.core.job.entity.*;
+import org.apache.griffin.core.job.repo.GriffinJobRepo;
+import org.apache.griffin.core.job.repo.JobInstanceRepo;
+import org.apache.griffin.core.job.repo.JobScheduleRepo;
+import org.apache.griffin.core.measure.entity.DataConnector;
+import org.apache.griffin.core.measure.entity.GriffinMeasure;
+import org.apache.griffin.core.measure.repo.GriffinMeasureRepo;
+import org.apache.griffin.core.util.GriffinOperationMessage;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.Matchers;
+import org.mockito.Mockito;
+import org.mockito.internal.util.reflection.Whitebox;
+import org.quartz.*;
+import org.quartz.impl.triggers.SimpleTriggerImpl;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.boot.test.context.TestConfiguration;
+import org.springframework.boot.test.mock.mockito.MockBean;
+import org.springframework.context.annotation.Bean;
+import org.springframework.data.domain.PageRequest;
+import org.springframework.data.domain.Pageable;
+import org.springframework.data.domain.Sort;
+import org.springframework.scheduling.quartz.SchedulerFactoryBean;
+import org.springframework.test.context.junit4.SpringRunner;
+import org.springframework.web.client.RestClientException;
+import org.springframework.web.client.RestTemplate;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Properties;
+
+import static org.apache.griffin.core.util.EntityHelper.*;
+import static org.apache.griffin.core.util.GriffinOperationMessage.*;
+import static org.junit.Assert.assertEquals;
+import static org.mockito.BDDMockito.given;
+import static org.mockito.Mockito.doThrow;
+
+@RunWith(SpringRunner.class)
+public class JobServiceImplTest {
+
+ @TestConfiguration
+ public static class SchedulerServiceConfiguration {
+ @Bean
+ public JobServiceImpl service() {
+ return new JobServiceImpl();
+ }
+
+ @Bean
+ public SchedulerFactoryBean factoryBean() {
+ return new SchedulerFactoryBean();
+ }
+ }
+
+ @MockBean
+ private JobScheduleRepo jobScheduleRepo;
+
+ @MockBean
+ private GriffinMeasureRepo griffinMeasureRepo;
+
+ @MockBean
+ private GriffinJobRepo jobRepo;
+
+ @MockBean
+ private JobInstanceRepo jobInstanceRepo;
+
+ @MockBean
+ private SchedulerFactoryBean factory;
+
+ @MockBean(name = "livyConf")
+ private Properties sparkJobProps;
+
+ @MockBean
+ private RestTemplate restTemplate;
+
+ @Autowired
+ private JobServiceImpl service;
+
+
+ @Before
+ public void setup() {
+
+ }
+
+ @Test
+ public void testGetAliveJobsForSuccess() throws SchedulerException {
+ Scheduler scheduler = Mockito.mock(Scheduler.class);
+ GriffinJob job = new GriffinJob(1L, 1L, "jobName", "quartzJobName", "quartzGroupName", false);
+ given(factory.getObject()).willReturn(scheduler);
+ given(jobRepo.findByDeleted(false)).willReturn(Arrays.asList(job));
+ SimpleTrigger trigger = new SimpleTriggerImpl();
+ given((List<Trigger>) scheduler.getTriggersOfJob(Matchers.any(JobKey.class))).willReturn(Arrays.asList(trigger));
+ assertEquals(service.getAliveJobs().size(), 1);
+ }
+
+ @Test
+ public void testGetAliveJobsForNoJobsWithTriggerEmpty() throws SchedulerException {
+ Scheduler scheduler = Mockito.mock(Scheduler.class);
+ GriffinJob job = new GriffinJob(1L, 1L, "jobName", "quartzJobName", "quartzGroupName", false);
+ given(factory.getObject()).willReturn(scheduler);
+ given(jobRepo.findByDeleted(false)).willReturn(Arrays.asList(job));
+ given((List<Trigger>) scheduler.getTriggersOfJob(Matchers.any(JobKey.class))).willReturn(new ArrayList<>());
+ assertEquals(service.getAliveJobs().size(), 0);
+ }
+
+ @Test
+ public void testGetAliveJobsForNoJobsWithException() throws SchedulerException {
+ Scheduler scheduler = Mockito.mock(Scheduler.class);
+ GriffinJob job = new GriffinJob(1L, 1L, "jobName", "quartzJobName", "quartzGroupName", false);
+ given(factory.getObject()).willReturn(scheduler);
+ given(jobRepo.findByDeleted(false)).willReturn(Arrays.asList(job));
+ GriffinException.GetJobsFailureException exception = getExceptionForGetAliveJObs(scheduler);
+ assert exception != null;
+ }
+
+
+ @Test
+ public void testAddJobForSuccess() throws Exception {
+ JobSchedule js = createJobSchedule();
+ js.setId(1L);
+ GriffinMeasure measure = createGriffinMeasure("measureName");
+ GriffinJob job = new GriffinJob(1L, 1L, "jobName", "quartzJobName", "quartzGroupName", false);
+ Scheduler scheduler = Mockito.mock(Scheduler.class);
+ given(factory.getObject()).willReturn(scheduler);
+ given(griffinMeasureRepo.findByIdAndDeleted(js.getMeasureId(), false)).willReturn(measure);
+ given(jobRepo.countByJobNameAndDeleted(js.getJobName(), false)).willReturn(0);
+ given(jobScheduleRepo.save(js)).willReturn(js);
+ given(jobRepo.save(Matchers.any(GriffinJob.class))).willReturn(job);
+ GriffinOperationMessage message = service.addJob(js);
+ assertEquals(message, CREATE_JOB_SUCCESS);
+ }
+
+ @Test
+ public void testAddJobForFailureWithMeasureNull() throws Exception {
+ JobSchedule js = createJobSchedule();
+ given(griffinMeasureRepo.findByIdAndDeleted(js.getMeasureId(), false)).willReturn(null);
+ GriffinOperationMessage message = service.addJob(js);
+ assertEquals(message, CREATE_JOB_FAIL);
+ }
+
+ @Test
+ public void testAddJobForFailureWitJobNameRepeat() throws Exception {
+ JobSchedule js = createJobSchedule();
+ GriffinMeasure measure = createGriffinMeasure("measureName");
+ given(griffinMeasureRepo.findByIdAndDeleted(js.getMeasureId(), false)).willReturn(measure);
+ given(jobRepo.countByJobNameAndDeleted(js.getJobName(), false)).willReturn(1);
+ GriffinOperationMessage message = service.addJob(js);
+ assertEquals(message, CREATE_JOB_FAIL);
+ }
+
+ @Test
+ public void testAddJobForFailureWitJobNameNull() throws Exception {
+ JobSchedule js = createJobSchedule(null);
+ GriffinMeasure measure = createGriffinMeasure("measureName");
+ given(griffinMeasureRepo.findByIdAndDeleted(js.getMeasureId(), false)).willReturn(measure);
+ GriffinOperationMessage message = service.addJob(js);
+ assertEquals(message, CREATE_JOB_FAIL);
+ }
+
+ @Test
+ public void testAddJobForFailureWithBaselineInvalid() throws Exception {
+ JobDataSegment source = createJobDataSegment("source_name", false);
+ JobDataSegment target = createJobDataSegment("target_name", false);
+ JobSchedule js = createJobSchedule("jobName", source, target);
+ GriffinMeasure measure = createGriffinMeasure("measureName");
+ given(griffinMeasureRepo.findByIdAndDeleted(js.getMeasureId(), false)).willReturn(measure);
+ GriffinOperationMessage message = service.addJob(js);
+ assertEquals(message, CREATE_JOB_FAIL);
+ }
+
+ @Test
+ public void testAddJobForFailureWithConnectorNameInvalid() throws Exception {
+ GriffinMeasure measure = createGriffinMeasure("measureName");
+ JobDataSegment source = createJobDataSegment("source_connector_name", true);
+ JobDataSegment target = createJobDataSegment("target_name", false);
+ JobSchedule js = createJobSchedule("jobName", source, target);
+ given(griffinMeasureRepo.findByIdAndDeleted(js.getMeasureId(), false)).willReturn(measure);
+ GriffinOperationMessage message = service.addJob(js);
+ assertEquals(message, CREATE_JOB_FAIL);
+ }
+
+ @Test
+ public void testAddJobForFailureWithMeasureConnectorNameRepeat() throws Exception {
+ JobSchedule js = createJobSchedule();
+ DataConnector dcSource = createDataConnector("connector_name", "default", "test_data_src", "dt=#YYYYMMdd# AND hour=#HH#");
+ DataConnector dcTarget = createDataConnector("connector_name", "default", "test_data_tgt", "dt=#YYYYMMdd# AND hour=#HH#");
+ GriffinMeasure measure = createGriffinMeasure("measureName", dcSource, dcTarget);
+ given(griffinMeasureRepo.findByIdAndDeleted(js.getMeasureId(), false)).willReturn(measure);
+ GriffinOperationMessage message = service.addJob(js);
+ assertEquals(message, CREATE_JOB_FAIL);
+ }
+
+ @Test
+ public void testAddJobForFailureWithJobScheduleConnectorNameRepeat() throws Exception {
+ GriffinMeasure measure = createGriffinMeasure("measureName");
+ JobDataSegment source = createJobDataSegment("source_name", true);
+ JobDataSegment target = createJobDataSegment("source_name", false);
+ JobSchedule js = createJobSchedule("jobName", source, target);
+ given(griffinMeasureRepo.findByIdAndDeleted(js.getMeasureId(), false)).willReturn(measure);
+ GriffinOperationMessage message = service.addJob(js);
+ assertEquals(message, CREATE_JOB_FAIL);
+ }
+
+ @Test
+ public void testAddJobForFailureWithTriggerKeyExist() throws Exception {
+ GriffinMeasure measure = createGriffinMeasure("measureName");
+ JobDataSegment source = createJobDataSegment("source_name", true);
+ JobDataSegment target = createJobDataSegment("target_name", false);
+ JobSchedule js = createJobSchedule("jobName", source, target);
+ Scheduler scheduler = Mockito.mock(Scheduler.class);
+ given(factory.getObject()).willReturn(scheduler);
+ given(griffinMeasureRepo.findByIdAndDeleted(js.getMeasureId(), false)).willReturn(measure);
+ given(scheduler.checkExists(Matchers.any(TriggerKey.class))).willReturn(true);
+ GriffinOperationMessage message = service.addJob(js);
+ assertEquals(message, CREATE_JOB_FAIL);
+ }
+
+ @Test
+ public void testDeleteJobByIdForSuccessWithTriggerKeyExist() throws SchedulerException {
+ Long jobId = 1L;
+ GriffinJob job = new GriffinJob(1L, "jobName", "quartzJobName", "quartzGroupName", false);
+ JobInstanceBean instance = new JobInstanceBean(LivySessionStates.State.finding, "pName", "pGroup", null, null);
+ job.setJobInstances(Arrays.asList(instance));
+ Scheduler scheduler = Mockito.mock(Scheduler.class);
+ given(factory.getObject()).willReturn(scheduler);
+ given(jobRepo.findByIdAndDeleted(jobId, false)).willReturn(job);
+ given(scheduler.checkExists(Matchers.any(JobKey.class))).willReturn(true);
+ assertEquals(service.deleteJob(jobId), DELETE_JOB_SUCCESS);
+ }
+
+ @Test
+ public void testDeleteJobByIdForSuccessWithTriggerKeyNotExist() throws SchedulerException {
+ Long jobId = 1L;
+ GriffinJob job = new GriffinJob(1L, "jobName", "quartzJobName", "quartzGroupName", false);
+ JobInstanceBean instance = new JobInstanceBean(LivySessionStates.State.finding, "pName", "pGroup", null, null);
+ job.setJobInstances(Arrays.asList(instance));
+ Scheduler scheduler = Mockito.mock(Scheduler.class);
+ given(factory.getObject()).willReturn(scheduler);
+ given(jobRepo.findByIdAndDeleted(jobId, false)).willReturn(job);
+ given(scheduler.checkExists(Matchers.any(JobKey.class))).willReturn(false);
+ assertEquals(service.deleteJob(jobId), DELETE_JOB_SUCCESS);
+ }
+
+ @Test
+ public void testDeleteJobByIdForFailureWithNull() throws SchedulerException {
+ Long jobId = 1L;
+ given(jobRepo.findByIdAndDeleted(jobId, false)).willReturn(null);
+ assertEquals(service.deleteJob(jobId), DELETE_JOB_FAIL);
+ }
+
+ @Test
+ public void testDeleteJobByIdForFailureWithException() throws SchedulerException {
+ Long jobId = 1L;
+ GriffinJob job = new GriffinJob(1L, "jobName", "quartzJobName", "quartzGroupName", false);
+ Scheduler scheduler = Mockito.mock(Scheduler.class);
+ given(factory.getObject()).willReturn(scheduler);
+ given(jobRepo.findByIdAndDeleted(jobId, false)).willReturn(job);
+ given(scheduler.checkExists(Matchers.any(JobKey.class))).willReturn(true);
+ doThrow(SchedulerException.class).when(scheduler).pauseJob(Matchers.any(JobKey.class));
+ assertEquals(service.deleteJob(jobId), DELETE_JOB_FAIL);
+ }
+
+
+ @Test
+ public void testDeleteJobByNameForSuccessWithTriggerKeyExist() throws SchedulerException {
+ GriffinJob job = new GriffinJob(1L, 1L, "jobName", "quartzJobName", "quartzGroupName", false);
+ JobInstanceBean instance = new JobInstanceBean(LivySessionStates.State.finding, "pName", "pGroup", null, null);
+ job.setJobInstances(Arrays.asList(instance));
+ Scheduler scheduler = Mockito.mock(Scheduler.class);
+ given(jobRepo.findByJobNameAndDeleted(job.getJobName(), false)).willReturn(Arrays.asList(job));
+ given(factory.getObject()).willReturn(scheduler);
+ given(scheduler.checkExists(Matchers.any(JobKey.class))).willReturn(true);
+ assertEquals(service.deleteJob(job.getJobName()), DELETE_JOB_SUCCESS);
+ }
+
+ @Test
+ public void testDeleteJobByNameForSuccessWithTriggerKeyNotExist() throws SchedulerException {
+ GriffinJob job = new GriffinJob(1L, 1L, "jobName", "quartzJobName", "quartzGroupName", false);
+ JobInstanceBean instance = new JobInstanceBean(LivySessionStates.State.finding, "pName", "pGroup", null, null);
+ job.setJobInstances(Arrays.asList(instance));
+ Scheduler scheduler = Mockito.mock(Scheduler.class);
+ given(jobRepo.findByJobNameAndDeleted(job.getJobName(), false)).willReturn(Arrays.asList(job));
+ given(factory.getObject()).willReturn(scheduler);
+ given(scheduler.checkExists(Matchers.any(JobKey.class))).willReturn(false);
+ assertEquals(service.deleteJob(job.getJobName()), DELETE_JOB_SUCCESS);
+ }
+
+ @Test
+ public void testDeleteJobByJobNameForFailureWithNull() throws SchedulerException {
+ String jobName = "jobName";
+ given(jobRepo.findByJobNameAndDeleted(jobName, false)).willReturn(new ArrayList<>());
+ assertEquals(service.deleteJob(jobName), DELETE_JOB_FAIL);
+ }
+
+ @Test
+ public void testDeleteJobByJobNameForFailureWithException() throws SchedulerException {
+ Long jobId = 1L;
+ GriffinJob job = new GriffinJob(1L, "jobName", "quartzJobName", "quartzGroupName", false);
+ Scheduler scheduler = Mockito.mock(Scheduler.class);
+ given(factory.getObject()).willReturn(scheduler);
+ given(jobRepo.findByJobNameAndDeleted(job.getJobName(), false)).willReturn(Arrays.asList(job));
+ given(scheduler.checkExists(Matchers.any(JobKey.class))).willReturn(true);
+ doThrow(SchedulerException.class).when(scheduler).pauseJob(Matchers.any(JobKey.class));
+ assertEquals(service.deleteJob(jobId), DELETE_JOB_FAIL);
+ }
+
+ @Test
+ public void testDeleteJobsRelateToMeasureForSuccessWithTriggerKeyExist() throws SchedulerException {
+ Long jobId = 1L;
+ Long measureId = 1L;
+ GriffinJob job = new GriffinJob(measureId, "jobName", "quartzJobName", "quartzGroupName", false);
+ JobInstanceBean instance = new JobInstanceBean(LivySessionStates.State.finding, "pName", "pGroup", null, null);
+ job.setJobInstances(Arrays.asList(instance));
+ Scheduler scheduler = Mockito.mock(Scheduler.class);
+ given(factory.getObject()).willReturn(scheduler);
+ given(jobRepo.findByIdAndDeleted(jobId, false)).willReturn(job);
+ given(jobRepo.findByMeasureIdAndDeleted(measureId, false)).willReturn(Arrays.asList(job));
+ given(scheduler.checkExists(Matchers.any(JobKey.class))).willReturn(true);
+ assertEquals(service.deleteJobsRelateToMeasure(measureId), true);
+ }
+
+ @Test
+ public void testDeleteJobsRelateToMeasureForSuccessWithTriggerKeyNotExist() throws SchedulerException {
+ Long jobId = 1L;
+ Long measureId = 1L;
+ GriffinJob job = new GriffinJob(measureId, "jobName", "quartzJobName", "quartzGroupName", false);
+ JobInstanceBean instance = new JobInstanceBean(LivySessionStates.State.finding, "pName", "pGroup", null, null);
+ job.setJobInstances(Arrays.asList(instance));
+ Scheduler scheduler = Mockito.mock(Scheduler.class);
+ given(factory.getObject()).willReturn(scheduler);
+ given(jobRepo.findByIdAndDeleted(jobId, false)).willReturn(job);
+ given(jobRepo.findByMeasureIdAndDeleted(measureId, false)).willReturn(Arrays.asList(job));
+ given(scheduler.checkExists(Matchers.any(JobKey.class))).willReturn(false);
+ assertEquals(service.deleteJobsRelateToMeasure(measureId), true);
+ }
+
+ @Test
+ public void testDeleteJobsRelateToMeasureForSuccessWithNull() throws SchedulerException {
+ Long measureId = 1L;
+ Scheduler scheduler = Mockito.mock(Scheduler.class);
+ given(factory.getObject()).willReturn(scheduler);
+ given(jobRepo.findByMeasureIdAndDeleted(measureId, false)).willReturn(null);
+ assertEquals(service.deleteJobsRelateToMeasure(measureId), true);
+ }
+
+ @Test
+ public void testDeleteJobsRelateToMeasureForFailureWithException() throws SchedulerException {
+ Long jobId = 1L;
+ Long measureId = 1L;
+ GriffinJob job = new GriffinJob(measureId, "jobName", "quartzJobName", "quartzGroupName", false);
+ JobInstanceBean instance = new JobInstanceBean(LivySessionStates.State.finding, "pName", "pGroup", null, null);
+ job.setJobInstances(Arrays.asList(instance));
+ Scheduler scheduler = Mockito.mock(Scheduler.class);
+ given(factory.getObject()).willReturn(scheduler);
+ given(jobRepo.findByIdAndDeleted(jobId, false)).willReturn(job);
+ given(jobRepo.findByMeasureIdAndDeleted(measureId, false)).willReturn(Arrays.asList(job));
+ given(scheduler.checkExists(Matchers.any(JobKey.class))).willReturn(true);
+ doThrow(SchedulerException.class).when(scheduler).pauseJob(Matchers.any(JobKey.class));
+ assertEquals(service.deleteJobsRelateToMeasure(measureId), false);
+ }
+
+ @Test
+ public void testFindInstancesOfJobForSuccess() throws SchedulerException {
+ Long jobId = 1L;
+ int page = 0;
+ int size = 2;
+ GriffinJob job = new GriffinJob(1L, "jobName", "quartzJobName", "quartzGroupName", false);
+ JobInstanceBean jobInstance = new JobInstanceBean(1L, LivySessionStates.State.dead, "app_id", "app_uri", null, null);
+ Pageable pageRequest = new PageRequest(page, size, Sort.Direction.DESC, "tms");
+ given(jobRepo.findByIdAndDeleted(jobId, false)).willReturn(job);
+ given(jobInstanceRepo.findByJobId(1L, pageRequest)).willReturn(Arrays.asList(jobInstance));
+ assertEquals(service.findInstancesOfJob(1L, page, size).size(), 1);
+ }
+
+ @Test
+ public void testFindInstancesOfJobWithNull() throws SchedulerException {
+ Long jobId = 1L;
+ given(jobRepo.findByIdAndDeleted(jobId, false)).willReturn(null);
+ assertEquals(service.findInstancesOfJob(jobId, 0, 2).size(), 0);
+ }
+
+ @Test
+ public void testDeleteExpiredJobInstanceForSuccessWithTriggerKeyExist() throws SchedulerException {
+ JobInstanceBean jobInstance = new JobInstanceBean(LivySessionStates.State.dead, "pName", "pGroup", null, null);
+ given(jobInstanceRepo.findByExpireTmsLessThanEqual(Matchers.any())).willReturn(Arrays.asList(jobInstance));
+ Scheduler scheduler = Mockito.mock(Scheduler.class);
+ given(factory.getObject()).willReturn(scheduler);
+ given(scheduler.checkExists(Matchers.any(JobKey.class))).willReturn(true);
+ service.deleteExpiredJobInstance();
+ }
+
+ @Test
+ public void testDeleteExpiredJobInstanceForSuccessWithTriggerKeyNotExist() throws SchedulerException {
+ JobInstanceBean jobInstance = new JobInstanceBean(LivySessionStates.State.dead, "pName", "pGroup", null, null);
+ given(jobInstanceRepo.findByExpireTmsLessThanEqual(Matchers.any())).willReturn(Arrays.asList(jobInstance));
+ Scheduler scheduler = Mockito.mock(Scheduler.class);
+ given(factory.getObject()).willReturn(scheduler);
+ given(scheduler.checkExists(Matchers.any(JobKey.class))).willReturn(false);
+ service.deleteExpiredJobInstance();
+ }
+
+ @Test
+ public void testDeleteExpiredJobInstanceForSuccessWithNull() throws SchedulerException {
+ given(jobInstanceRepo.findByExpireTmsLessThanEqual(Matchers.any())).willReturn(null);
+ service.deleteExpiredJobInstance();
+ }
+
+ @Test
+ public void testDeleteExpiredJobInstanceForFailureWithException() throws SchedulerException {
+ JobInstanceBean jobInstance = new JobInstanceBean(LivySessionStates.State.dead, "pName", "pGroup", null, null);
+ given(jobInstanceRepo.findByExpireTmsLessThanEqual(Matchers.any())).willReturn(Arrays.asList(jobInstance));
+ Scheduler scheduler = Mockito.mock(Scheduler.class);
+ given(factory.getObject()).willReturn(scheduler);
+ given(scheduler.checkExists(Matchers.any(JobKey.class))).willReturn(true);
+ doThrow(SchedulerException.class).when(scheduler).pauseJob(Matchers.any(JobKey.class));
+ service.deleteExpiredJobInstance();
+ }
+
+ @Test
+ public void testSyncInstancesOfJobForSuccess() {
+ JobInstanceBean instance = createJobInstance();
+ given(jobInstanceRepo.findByActiveState()).willReturn(Arrays.asList(instance));
+ Whitebox.setInternalState(service, "restTemplate", restTemplate);
+ String result = "{\"id\":1,\"state\":\"starting\",\"appId\":123,\"appInfo\":{\"driverLogUrl\":null,\"sparkUiUrl\":null},\"log\":[]}";
+ given(restTemplate.getForObject(Matchers.anyString(), Matchers.any())).willReturn(result);
+ service.syncInstancesOfAllJobs();
+ }
+
+ @Test
+ public void testSyncInstancesOfJobForFailureWithRestClientException() {
+ JobInstanceBean instance = createJobInstance();
+ instance.setSessionId(1234564L);
+ given(jobInstanceRepo.findByActiveState()).willReturn(Arrays.asList(instance));
+ Whitebox.setInternalState(service, "restTemplate", restTemplate);
+ given(restTemplate.getForObject(Matchers.anyString(), Matchers.any())).willThrow(RestClientException.class);
+ service.syncInstancesOfAllJobs();
+ }
+
+ @Test
+ public void testSyncInstancesOfJobForFailureWithIOException() throws Exception {
+ JobInstanceBean instance = createJobInstance();
+ given(jobInstanceRepo.findByActiveState()).willReturn(Arrays.asList(instance));
+ Whitebox.setInternalState(service, "restTemplate", restTemplate);
+ given(restTemplate.getForObject(Matchers.anyString(), Matchers.any())).willReturn("result");
+ service.syncInstancesOfAllJobs();
+ }
+
+ @Test
+ public void testSyncInstancesOfJobForFailureWithIllegalArgumentException() throws Exception {
+ JobInstanceBean instance = createJobInstance();
+ given(jobInstanceRepo.findByActiveState()).willReturn(Arrays.asList(instance));
+ Whitebox.setInternalState(service, "restTemplate", restTemplate);
+ given(restTemplate.getForObject(Matchers.anyString(), Matchers.any())).willReturn("{\"state\":\"wrong\"}");
+ service.syncInstancesOfAllJobs();
+ }
+
+ @Test
+ public void testSyncInstancesOfJobForFailureWithException() throws Exception {
+ JobInstanceBean instance = createJobInstance();
+ given(jobInstanceRepo.findByActiveState()).willReturn(Arrays.asList(instance));
+ Whitebox.setInternalState(service, "restTemplate", restTemplate);
+ String result = "{\"id\":1,\"state\":\"starting\",\"appId\":123,\"appInfo\":{\"driverLogUrl\":null,\"sparkUiUrl\":null},\"log\":[]}";
+ given(restTemplate.getForObject(Matchers.anyString(), Matchers.any())).willReturn(result);
+ doThrow(Exception.class).when(jobInstanceRepo).save(Matchers.any(JobInstanceBean.class));
+ service.syncInstancesOfAllJobs();
+ }
+
+ @Test
+ public void testGetHealthInfoWithHealthy() throws SchedulerException {
+ Long jobId = 1L;
+ Scheduler scheduler = Mockito.mock(Scheduler.class);
+ GriffinJob job = new GriffinJob(jobId, 1L, "jobName", "quartzJobName", "quartzGroupName", false);
+ given(factory.getObject()).willReturn(scheduler);
+ given(jobRepo.findByDeleted(false)).willReturn(Arrays.asList(job));
+ SimpleTrigger trigger = new SimpleTriggerImpl();
+ List<Trigger> triggers = new ArrayList<>();
+ triggers.add(trigger);
+ given((List<Trigger>) scheduler.getTriggersOfJob(Matchers.any(JobKey.class))).willReturn(triggers);
+
+ Pageable pageRequest = new PageRequest(0, 1, Sort.Direction.DESC, "tms");
+ given(jobInstanceRepo.findByJobId(jobId, pageRequest)).willReturn(Arrays.asList(createJobInstance()));
+ assertEquals(service.getHealthInfo().getHealthyJobCount(), 1);
+
+ }
+
+ @Test
+ public void testGetHealthInfoWithUnhealthy() throws SchedulerException {
+ Scheduler scheduler = Mockito.mock(Scheduler.class);
+ GriffinJob job = new GriffinJob(1L, 1L, "jobName", "quartzJobName", "quartzGroupName", false);
+ given(factory.getObject()).willReturn(scheduler);
+ given(jobRepo.findByDeleted(false)).willReturn(Arrays.asList(job));
+ SimpleTrigger trigger = new SimpleTriggerImpl();
+ given((List<Trigger>) scheduler.getTriggersOfJob(Matchers.any(JobKey.class))).willReturn(Arrays.asList(trigger));
+
+ Pageable pageRequest = new PageRequest(0, 1, Sort.Direction.DESC, "tms");
+ List<JobInstanceBean> scheduleStateList = new ArrayList<>();
+ JobInstanceBean instance = createJobInstance();
+ instance.setState(LivySessionStates.State.error);
+ scheduleStateList.add(instance);
+ given(jobInstanceRepo.findByJobId(1L, pageRequest)).willReturn(scheduleStateList);
+ assertEquals(service.getHealthInfo().getHealthyJobCount(), 0);
+ }
+
+ @Test
+ public void testGetHealthInfoWithException() throws SchedulerException {
+ Scheduler scheduler = Mockito.mock(Scheduler.class);
+ GriffinJob job = new GriffinJob(1L, 1L, "jobName", "quartzJobName", "quartzGroupName", false);
+ given(factory.getObject()).willReturn(scheduler);
+ given(jobRepo.findByDeleted(false)).willReturn(Arrays.asList(job));
+ GriffinException.GetHealthInfoFailureException exception = getExceptionForHealthInfo(scheduler);
+ assert exception != null;
+ }
+
+
+ private GriffinException.GetHealthInfoFailureException getExceptionForHealthInfo(Scheduler scheduler) throws SchedulerException {
+ GriffinException.GetHealthInfoFailureException exception = null;
+ try {
+ given(scheduler.getTriggersOfJob(Matchers.any(JobKey.class))).willThrow(SchedulerException.class);
+ service.getHealthInfo();
+ } catch (GriffinException.GetHealthInfoFailureException e) {
+ exception = e;
+ }
+ return exception;
+ }
+
+ private GriffinException.GetJobsFailureException getExceptionForGetAliveJObs(Scheduler scheduler) throws SchedulerException {
+ GriffinException.GetJobsFailureException exception = null;
+ try {
+ given(scheduler.getTriggersOfJob(Matchers.any(JobKey.class))).willThrow(new GriffinException.GetJobsFailureException());
+ service.getAliveJobs();
+ } catch (GriffinException.GetJobsFailureException e) {
+ exception = e;
+ }
+ return exception;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/71fcf93b/service/src/test/java/org/apache/griffin/core/job/SparkSubmitJobTest.java
----------------------------------------------------------------------
diff --git a/service/src/test/java/org/apache/griffin/core/job/SparkSubmitJobTest.java b/service/src/test/java/org/apache/griffin/core/job/SparkSubmitJobTest.java
index 6fe64e3..ccb641b 100644
--- a/service/src/test/java/org/apache/griffin/core/job/SparkSubmitJobTest.java
+++ b/service/src/test/java/org/apache/griffin/core/job/SparkSubmitJobTest.java
@@ -19,55 +19,138 @@ under the License.
package org.apache.griffin.core.job;
+import org.apache.griffin.core.job.entity.JobInstanceBean;
+import org.apache.griffin.core.job.entity.SegmentPredicate;
+import org.apache.griffin.core.job.repo.JobInstanceRepo;
+import org.apache.griffin.core.measure.entity.GriffinMeasure;
+import org.apache.griffin.core.util.JsonUtil;
+import org.apache.griffin.core.util.PropertiesUtil;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.Matchers;
+import org.mockito.internal.util.reflection.Whitebox;
+import org.quartz.JobDetail;
+import org.quartz.JobExecutionContext;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.beans.factory.annotation.Qualifier;
+import org.springframework.boot.test.context.TestConfiguration;
+import org.springframework.boot.test.mock.mockito.MockBean;
+import org.springframework.context.annotation.Bean;
+import org.springframework.core.io.ClassPathResource;
+import org.springframework.test.context.junit4.SpringRunner;
+import org.springframework.web.client.RestTemplate;
+
+import java.util.Arrays;
+import java.util.Properties;
+
+import static org.apache.griffin.core.util.EntityHelper.*;
import static org.junit.Assert.assertTrue;
+import static org.mockito.BDDMockito.given;
import static org.mockito.Mockito.mock;
-//@RunWith(SpringRunner.class)
-//public class SparkSubmitJobTest {
-
-// @TestConfiguration
-// public static class SchedulerServiceConfiguration {
-// @Bean
-// public SparkSubmitJob sparkSubmitJobBean() {
-// return new SparkSubmitJob();
-// }
-//
-// @Bean
-// public Properties sparkJobProps() {
-// return PropertiesUtil.getProperties("/sparkJob.properties");
-// }
-//
-// }
-//
-// @Autowired
-// private SparkSubmitJob sparkSubmitJob;
-//
-// @MockBean
-// private MeasureRepo measureRepo;
-//
-// @MockBean
-// private RestTemplate restTemplate;
-//
-// @MockBean
-// private JobInstanceRepo jobInstanceRepo;
-//
-// @Before
-// public void setUp() {
-// }
-//
-// @Test
-// public void testExecute() throws Exception {
-// String result = "{\"id\":1,\"state\":\"starting\",\"appId\":null,\"appInfo\":{\"driverLogUrl\":null,\"sparkUiUrl\":null},\"log\":[]}";
-// JobExecutionContext context = mock(JobExecutionContext.class);
-// JobDetail jd = createJobDetail();
-// given(context.getJobDetail()).willReturn(jd);
-// given(measureRepo.findOne(Long.valueOf(jd.getJobDataMap().getString("measureId")))).willReturn(createATestMeasure("view_item_hourly", "ebay"));
-// Whitebox.setInternalState(sparkSubmitJob, "restTemplate", restTemplate);
-// given(restTemplate.postForObject(Matchers.anyString(), Matchers.any(), Matchers.any())).willReturn(result);
-// given(jobInstanceRepo.save(new JobInstanceBean())).willReturn(new JobInstanceBean());
-// sparkSubmitJob.execute(context);
-// assertTrue(true);
-// }
-
-//}
+@RunWith(SpringRunner.class)
+public class SparkSubmitJobTest {
+
+ @TestConfiguration
+ public static class SchedulerServiceConfiguration {
+ @Bean
+ public SparkSubmitJob sparkSubmitJobBean() {
+ return new SparkSubmitJob();
+ }
+
+ @Bean(name = "livyConf")
+ public Properties sparkJobProps() {
+ String path = "sparkJob.properties";
+ return PropertiesUtil.getProperties(path, new ClassPathResource(path));
+ }
+
+ }
+
+ @Autowired
+ private SparkSubmitJob sparkSubmitJob;
+
+ @Autowired
+ @Qualifier("livyConf")
+ private Properties livyConfProps;
+
+ @MockBean
+ private RestTemplate restTemplate;
+
+ @MockBean
+ private JobInstanceRepo jobInstanceRepo;
+
+ @MockBean
+ private JobServiceImpl jobService;
+
+
+ @Before
+ public void setUp() {
+ }
+
+ @Test
+ public void testExecuteWithPredicateTriggerGreaterThanRepeat() throws Exception {
+ JobExecutionContext context = mock(JobExecutionContext.class);
+ JobInstanceBean instance = createJobInstance();
+ GriffinMeasure measure = createGriffinMeasure("measureName");
+ SegmentPredicate predicate = createFileExistPredicate();
+ JobDetail jd = createJobDetail(JsonUtil.toJson(measure), JsonUtil.toJson(Arrays.asList(predicate)));
+ given(context.getJobDetail()).willReturn(jd);
+ given(context.getTrigger()).willReturn(createSimpleTrigger(4, 5));
+ given(jobInstanceRepo.findByPredicateName(Matchers.anyString())).willReturn(instance);
+ sparkSubmitJob.execute(context);
+ assertTrue(true);
+ }
+
+ @Test
+ public void testExecuteWithPredicateTriggerLessThanRepeat() throws Exception {
+ JobExecutionContext context = mock(JobExecutionContext.class);
+ JobInstanceBean instance = createJobInstance();
+ GriffinMeasure measure = createGriffinMeasure("measureName");
+ SegmentPredicate predicate = createFileExistPredicate();
+ JobDetail jd = createJobDetail(JsonUtil.toJson(measure), JsonUtil.toJson(Arrays.asList(predicate)));
+ given(context.getJobDetail()).willReturn(jd);
+ given(context.getTrigger()).willReturn(createSimpleTrigger(4, 4));
+ given(jobInstanceRepo.findByPredicateName(Matchers.anyString())).willReturn(instance);
+ sparkSubmitJob.execute(context);
+ assertTrue(true);
+ }
+
+ @Test
+ public void testExecuteWithNoPredicateSuccess() throws Exception {
+ String result = "{\"id\":1,\"state\":\"starting\",\"appId\":null,\"appInfo\":{\"driverLogUrl\":null,\"sparkUiUrl\":null},\"log\":[]}";
+ JobExecutionContext context = mock(JobExecutionContext.class);
+ JobInstanceBean instance = createJobInstance();
+ GriffinMeasure measure = createGriffinMeasure("measureName");
+ JobDetail jd = createJobDetail(JsonUtil.toJson(measure), "");
+ given(context.getJobDetail()).willReturn(jd);
+ given(jobInstanceRepo.findByPredicateName(Matchers.anyString())).willReturn(instance);
+ Whitebox.setInternalState(sparkSubmitJob, "restTemplate", restTemplate);
+ given(restTemplate.postForObject(Matchers.anyString(), Matchers.any(), Matchers.any())).willReturn(result);
+ given(jobService.pauseJob(Matchers.any(), Matchers.any())).willReturn(true);
+ sparkSubmitJob.execute(context);
+ assertTrue(true);
+ }
+
+ @Test
+ public void testExecuteWithPost2LivyException() throws Exception {
+ JobExecutionContext context = mock(JobExecutionContext.class);
+ JobInstanceBean instance = createJobInstance();
+ GriffinMeasure measure = createGriffinMeasure("measureName");
+ JobDetail jd = createJobDetail(JsonUtil.toJson(measure), "");
+ given(context.getJobDetail()).willReturn(jd);
+ given(jobInstanceRepo.findByPredicateName(Matchers.anyString())).willReturn(instance);
+ given(jobService.pauseJob(Matchers.any(), Matchers.any())).willReturn(true);
+ sparkSubmitJob.execute(context);
+ assertTrue(true);
+ }
+
+ @Test
+ public void testExecuteWithNullException() throws Exception {
+ JobExecutionContext context = mock(JobExecutionContext.class);
+ sparkSubmitJob.execute(context);
+ assertTrue(true);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/71fcf93b/service/src/test/java/org/apache/griffin/core/job/repo/JobRepoTest.java
----------------------------------------------------------------------
diff --git a/service/src/test/java/org/apache/griffin/core/job/repo/JobRepoTest.java b/service/src/test/java/org/apache/griffin/core/job/repo/JobRepoTest.java
new file mode 100644
index 0000000..96a27e4
--- /dev/null
+++ b/service/src/test/java/org/apache/griffin/core/job/repo/JobRepoTest.java
@@ -0,0 +1,94 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+
+package org.apache.griffin.core.job.repo;
+
+import org.apache.griffin.core.job.entity.AbstractJob;
+import org.apache.griffin.core.job.entity.GriffinJob;
+import org.apache.griffin.core.job.entity.VirtualJob;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.boot.test.autoconfigure.orm.jpa.DataJpaTest;
+import org.springframework.boot.test.autoconfigure.orm.jpa.TestEntityManager;
+import org.springframework.test.context.junit4.SpringRunner;
+
+import java.util.List;
+
+import static org.junit.Assert.assertEquals;
+
+@RunWith(SpringRunner.class)
+@DataJpaTest
+public class JobRepoTest {
+
+ @Autowired
+ private TestEntityManager entityManager;
+
+ @Autowired
+ private JobRepo jobRepo;
+
+ @Before
+ public void setup() throws Exception {
+ entityManager.clear();
+ entityManager.flush();
+ setEntityManager();
+ }
+
+ @Test
+ public void testCountByJobNameAndDeleted() throws Exception {
+ int count = jobRepo.countByJobNameAndDeleted("griffinJobName1", false);
+ assertEquals(count, 1);
+ }
+
+ @Test
+ public void testFindByDeleted() throws Exception {
+ List<AbstractJob> jobs = jobRepo.findByDeleted(false);
+ assertEquals(jobs.size(), 4);
+ }
+
+ @Test
+ public void findByJobNameAndDeleted() throws Exception {
+ List<AbstractJob> jobs = jobRepo.findByJobNameAndDeleted("griffinJobName1", false);
+ assertEquals(jobs.size(), 1);
+ }
+
+ @Test
+ public void findByMeasureIdAndDeleted() throws Exception {
+ List<AbstractJob> jobs = jobRepo.findByMeasureIdAndDeleted(1L, false);
+ assertEquals(jobs.size(), 4);
+ }
+
+ @Test
+ public void findByIdAndDeleted() throws Exception {
+ AbstractJob job = jobRepo.findByIdAndDeleted(1L, true);
+ assert job == null;
+ }
+
+ public void setEntityManager() throws Exception {
+ AbstractJob job1 = new GriffinJob(1L, "griffinJobName1", "qName1", "qGroup1", false);
+ AbstractJob job2 = new GriffinJob(1L, "griffinJobName2", "qName2", "qGroup2", false);
+ AbstractJob job3 = new VirtualJob("virtualJobName1", 1L, "metricName1");
+ AbstractJob job4 = new VirtualJob("virtualJobName2", 1L, "metricName2");
+ entityManager.persistAndFlush(job1);
+ entityManager.persistAndFlush(job2);
+ entityManager.persistAndFlush(job3);
+ entityManager.persistAndFlush(job4);
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/71fcf93b/service/src/test/java/org/apache/griffin/core/measure/MeasureControllerTest.java
----------------------------------------------------------------------
diff --git a/service/src/test/java/org/apache/griffin/core/measure/MeasureControllerTest.java b/service/src/test/java/org/apache/griffin/core/measure/MeasureControllerTest.java
index 82a9ff9..5eb5a11 100644
--- a/service/src/test/java/org/apache/griffin/core/measure/MeasureControllerTest.java
+++ b/service/src/test/java/org/apache/griffin/core/measure/MeasureControllerTest.java
@@ -33,9 +33,11 @@ import org.springframework.http.MediaType;
import org.springframework.test.context.junit4.SpringRunner;
import org.springframework.test.web.servlet.MockMvc;
-import java.util.*;
+import java.util.Arrays;
+import java.util.LinkedList;
+import java.util.List;
-import static org.apache.griffin.core.util.EntityHelper.createATestGriffinMeasure;
+import static org.apache.griffin.core.util.EntityHelper.createGriffinMeasure;
import static org.apache.griffin.core.util.GriffinOperationMessage.*;
import static org.hamcrest.CoreMatchers.is;
import static org.mockito.BDDMockito.given;
@@ -60,7 +62,7 @@ public class MeasureControllerTest {
@Test
public void testGetAllMeasures() throws Exception {
- Measure measure = createATestGriffinMeasure("view_item_hourly", "test");
+ Measure measure = createGriffinMeasure("view_item_hourly");
given(service.getAllAliveMeasures()).willReturn(Arrays.asList(measure));
mvc.perform(get(URLHelper.API_VERSION_PATH + "/measures"))
@@ -71,7 +73,7 @@ public class MeasureControllerTest {
@Test
public void testGetMeasuresById() throws Exception {
- Measure measure = createATestGriffinMeasure("view_item_hourly", "test");
+ Measure measure = createGriffinMeasure("view_item_hourly");
given(service.getMeasureById(1L)).willReturn(measure);
mvc.perform(get(URLHelper.API_VERSION_PATH + "/measures/1"))
@@ -109,7 +111,7 @@ public class MeasureControllerTest {
@Test
public void testUpdateMeasureForSuccess() throws Exception {
- Measure measure = createATestGriffinMeasure("view_item_hourly", "test");
+ Measure measure = createGriffinMeasure("view_item_hourly");
String measureJson = JsonUtil.toJson(measure);
given(service.updateMeasure(measure)).willReturn(UPDATE_MEASURE_SUCCESS);
@@ -121,7 +123,7 @@ public class MeasureControllerTest {
@Test
public void testUpdateMeasureForNotFound() throws Exception {
- Measure measure = createATestGriffinMeasure("view_item_hourly", "test");
+ Measure measure = createGriffinMeasure("view_item_hourly");
String measureJson = JsonUtil.toJson(measure);
given(service.updateMeasure(measure)).willReturn(RESOURCE_NOT_FOUND);
@@ -134,7 +136,7 @@ public class MeasureControllerTest {
@Test
public void testUpdateMeasureForFail() throws Exception {
- Measure measure = createATestGriffinMeasure("view_item_hourly", "test");
+ Measure measure = createGriffinMeasure("view_item_hourly");
String measureJson = JsonUtil.toJson(measure);
given(service.updateMeasure(measure)).willReturn(UPDATE_MEASURE_FAIL);
@@ -148,7 +150,7 @@ public class MeasureControllerTest {
public void testGetAllMeasuresByOwner() throws Exception {
String owner = "test";
List<Measure> measureList = new LinkedList<>();
- Measure measure = createATestGriffinMeasure("view_item_hourly", owner);
+ Measure measure = createGriffinMeasure("view_item_hourly");
measureList.add(measure);
given(service.getAliveMeasuresByOwner(owner)).willReturn(measureList);
@@ -161,7 +163,7 @@ public class MeasureControllerTest {
@Test
public void testCreateNewMeasureForSuccess() throws Exception {
- Measure measure = createATestGriffinMeasure("view_item_hourly", "test");
+ Measure measure = createGriffinMeasure("view_item_hourly");
String measureJson = JsonUtil.toJson(measure);
given(service.createMeasure(measure)).willReturn(CREATE_MEASURE_SUCCESS);
@@ -173,7 +175,7 @@ public class MeasureControllerTest {
@Test
public void testCreateNewMeasureForFailWithDuplicate() throws Exception {
- Measure measure = createATestGriffinMeasure("view_item_hourly", "test");
+ Measure measure = createGriffinMeasure("view_item_hourly");
String measureJson = JsonUtil.toJson(measure);
given(service.createMeasure(measure)).willReturn(CREATE_MEASURE_FAIL_DUPLICATE);
@@ -185,7 +187,7 @@ public class MeasureControllerTest {
@Test
public void testCreateNewMeasureForFailWithSaveException() throws Exception {
- Measure measure = createATestGriffinMeasure("view_item_hourly", "test");
+ Measure measure = createGriffinMeasure("view_item_hourly");
String measureJson = JsonUtil.toJson(measure);
given(service.createMeasure(measure)).willReturn(GriffinOperationMessage.CREATE_MEASURE_FAIL);