You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@griffin.apache.org by gu...@apache.org on 2019/03/15 08:12:50 UTC
[griffin] branch master updated: [GRIFFIN-232] add support
pluggable predicates
This is an automated email from the ASF dual-hosted git repository.
guoyp pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/griffin.git
The following commit(s) were added to refs/heads/master by this push:
new 11a3011 [GRIFFIN-232] add support pluggable predicates
11a3011 is described below
commit 11a301190ac087e72880d01251a24251e5a0c8bb
Author: Borgatin Alexandr <ab...@griddynamics.com>
AuthorDate: Fri Mar 15 16:12:42 2019 +0800
[GRIFFIN-232] add support pluggable predicates
Classifier "exec" added to spring-boot-maven-plugin for possibility use service as a library. This will allow to create custom predicates without recompiling Griffin-service module.
The approach to deserialisation in SparkSubmit changed for possibility to use all fields of SegmentPredicate in custom predicates.
Author: Borgatin Alexandr <ab...@griddynamics.com>
Closes #484 from aborgatin/feature/GRIFFIN-232.
---
griffin-doc/measure/predicates.md | 100 +++++++++++++++++++++
service/pom.xml | 17 +++-
.../core/exception/GriffinExceptionMessage.java | 4 +
.../apache/griffin/core/job/SparkSubmitJob.java | 12 +--
.../core/job/factory/PredicatorFactory.java | 38 +++++++-
.../org/apache/griffin/core/util/MeasureUtil.java | 27 +++++-
.../griffin/core/job/SparkSubmitJobTest.java | 34 +++++--
.../core/job/factory/PredicatorFactoryTest.java | 47 ++++++++++
.../griffin/core/util/EntityMocksHelper.java | 18 +++-
.../apache/griffin/core/util/PredicatorMock.java | 16 ++++
10 files changed, 289 insertions(+), 24 deletions(-)
diff --git a/griffin-doc/measure/predicates.md b/griffin-doc/measure/predicates.md
new file mode 100644
index 0000000..389edbe
--- /dev/null
+++ b/griffin-doc/measure/predicates.md
@@ -0,0 +1,100 @@
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+#About predicates
+
+##Overview
+The purpose of predicates is obligate Griffin to check certain conditions before starting SparkSubmitJob.
+Depending on these conditions Griffin need to start or not start the measurement.
+
+##Configure predicates
+
+For configuring predicates need add property to measure json:
+```
+{
+ ...
+ "data.sources": [
+ ...
+ "connectors": [
+ "predicates": [
+ {
+ "type": "file.exist",
+ "config": {
+ "root.path": "/path/to/",
+ "path": "file.ext,file2.txt"
+ }
+ }
+ ],
+ ...
+
+ ]
+}
+```
+
+Possible values for predicates.type:
+- "file.exist" - in this case creates predicate with class org.apache.griffin.core.job.FileExistPredicator. This predicate checks existence of files before starting Spark jobs.
+ ```
+ {
+ "type": "file.exist",
+ "config": {
+ "root.path": "/path/to/",
+ "path": "file.ext,file2.txt"
+ }
+ }
+```
+
+- "custom" - in this case required transmit class name in the property "class" in config.
+This example creates same predicate like in previous example
+```
+ {
+ "type": "custom",
+ "config": {
+ "class": "org.apache.griffin.core.job.FileExistPredicator",
+ "root.path": "/path/to/",
+ "path": "file.ext,file2.txt"
+ }
+ }
+```
+It important to notice that predicate class must satisfy follow conditions:
+- implement interface **org.apache.griffin.core.job.Predicator**
+- have constructor with argument of type **org.apache.griffin.core.job.entity.SegmentPredicate**
+
+##Deployment custom predicates
+For the creating custom predicate you need
+1. Build the Griffin service using command
+As a result, two artifacts will be built
+- **service-VERSION.jar** - executable Spring-Boot application
+- **service-VERSION-lib.jar** - jar, which we can use as a dependency
+This step is necessary because we can't use executable Spring-Boot application as a dependency in our plugin.
+2. Create module and add dependency that was built in previous step
+```
+ <dependency>
+ <groupId>org.apache.griffin</groupId>
+ <artifactId>service</artifactId>
+ <classifier>lib</classifier>
+ <version>${griffin.version}</version>
+ <scope>provided</scope>
+ </dependency>
+```
+3. Create a Predicate class, which should, as mentioned earlier, implement the Predicator interface and have a constructor with an argument of type SegmentPredicate
+4. Build the module into a jar file and put it in any folder (for example /path-to-jar)
+5. Start the Griffin service application using command
+```
+java -cp target/service-VERSION.jar -Dloader.path=/path-to-jar/ org.springframework.boot.loader.PropertiesLauncher
+```
\ No newline at end of file
diff --git a/service/pom.xml b/service/pom.xml
index fed98bc..5647253 100644
--- a/service/pom.xml
+++ b/service/pom.xml
@@ -280,6 +280,22 @@ under the License.
</dependencies>
</plugin>
<plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-jar-plugin</artifactId>
+ <version>3.1.1</version>
+ <executions>
+ <execution>
+ <phase>package</phase>
+ <goals>
+ <goal>jar</goal>
+ </goals>
+ <configuration>
+ <classifier>lib</classifier>
+ </configuration>
+ </execution>
+ </executions>
+ </plugin>
+ <plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
<version>${spring-boot-maven-plugin.version}</version>
@@ -304,7 +320,6 @@ under the License.
<source>1.8</source>
<target>1.8</target>
</configuration>
-
</plugin>
</plugins>
</build>
diff --git a/service/src/main/java/org/apache/griffin/core/exception/GriffinExceptionMessage.java b/service/src/main/java/org/apache/griffin/core/exception/GriffinExceptionMessage.java
index ae6a0ea..edfe8e9 100644
--- a/service/src/main/java/org/apache/griffin/core/exception/GriffinExceptionMessage.java
+++ b/service/src/main/java/org/apache/griffin/core/exception/GriffinExceptionMessage.java
@@ -54,6 +54,8 @@ public enum GriffinExceptionMessage {
JOB_IS_IN_PAUSED_STATUS(40015, "The job is already in paused status."),
+ INVALID_MEASURE_PREDICATE(40016, "The measure predicate is invalid"),
+
//404, "Not Found"
MEASURE_ID_DOES_NOT_EXIST(40401, "Measure id does not exist"),
@@ -70,6 +72,8 @@ public enum GriffinExceptionMessage {
HDFS_FILE_NOT_EXIST(40407, "Hadoop data file not exist"),
+ PREDICATE_TYPE_NOT_FOUND(40408, "Unknown predicate type"),
+
//409, "Conflict"
MEASURE_NAME_ALREADY_EXIST(40901, "Measure name already exists"),
diff --git a/service/src/main/java/org/apache/griffin/core/job/SparkSubmitJob.java b/service/src/main/java/org/apache/griffin/core/job/SparkSubmitJob.java
index 28ead85..2f2a3f5 100644
--- a/service/src/main/java/org/apache/griffin/core/job/SparkSubmitJob.java
+++ b/service/src/main/java/org/apache/griffin/core/job/SparkSubmitJob.java
@@ -185,17 +185,13 @@ public class SparkSubmitJob implements Job {
if (StringUtils.isEmpty(json)) {
return;
}
- List<Map<String, Object>> maps = toEntity(json,
- new TypeReference<List<Map>>() {
+ List<SegmentPredicate> predicates = toEntity(json,
+ new TypeReference<List<SegmentPredicate>>() {
});
- for (Map<String, Object> map : maps) {
- SegmentPredicate sp = new SegmentPredicate();
- sp.setType((String) map.get("type"));
- sp.setConfigMap((Map<String, Object>) map.get("config"));
- mPredicates.add(sp);
+ if (predicates != null) {
+ mPredicates.addAll(predicates);
}
}
-
private String escapeCharacter(String str, String regex) {
if (StringUtils.isEmpty(str)) {
return str;
diff --git a/service/src/main/java/org/apache/griffin/core/job/factory/PredicatorFactory.java b/service/src/main/java/org/apache/griffin/core/job/factory/PredicatorFactory.java
index d65e49d..d9d4473 100644
--- a/service/src/main/java/org/apache/griffin/core/job/factory/PredicatorFactory.java
+++ b/service/src/main/java/org/apache/griffin/core/job/factory/PredicatorFactory.java
@@ -19,25 +19,57 @@ under the License.
package org.apache.griffin.core.job.factory;
+import org.apache.griffin.core.exception.GriffinException;
import org.apache.griffin.core.job.FileExistPredicator;
import org.apache.griffin.core.job.Predicator;
import org.apache.griffin.core.job.entity.SegmentPredicate;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.lang.reflect.Constructor;
+import java.lang.reflect.InvocationTargetException;
+
+import static org.apache.griffin.core.exception.GriffinExceptionMessage.PREDICATE_TYPE_NOT_FOUND;
+
public class PredicatorFactory {
private static final Logger LOGGER = LoggerFactory
.getLogger(PredicatorFactory.class);
public static Predicator newPredicateInstance(SegmentPredicate segPredicate) {
- Predicator predicate = null;
+ Predicator predicate;
switch (segPredicate.getType()) {
case "file.exist":
predicate = new FileExistPredicator(segPredicate);
break;
- default:
- LOGGER.warn("There is no predicate type that you input.");
+ case "custom":
+ predicate = getPredicateBean(segPredicate);
break;
+ default:
+ throw new GriffinException.NotFoundException(PREDICATE_TYPE_NOT_FOUND);
+ }
+ return predicate;
+ }
+
+ private static Predicator getPredicateBean(SegmentPredicate segmentPredicate) {
+ Predicator predicate;
+ String predicateClassName = (String) segmentPredicate.getConfigMap().get("class");
+ try {
+ Class clazz = Class.forName(predicateClassName);
+ Constructor<Predicator> constructor = clazz.getConstructor(SegmentPredicate.class);
+ predicate = constructor.newInstance(segmentPredicate);
+ } catch (ClassNotFoundException e) {
+ String message = "There is no predicate type that you input.";
+ LOGGER.error(message, e);
+ throw new GriffinException.ServiceException(message, e);
+ } catch (NoSuchMethodException e) {
+ String message = "For predicate with type " + predicateClassName +
+ " constructor with parameter of type " + SegmentPredicate.class.getName() + " not found";
+ LOGGER.error(message, e);
+ throw new GriffinException.ServiceException(message, e);
+ } catch (InstantiationException | IllegalAccessException | InvocationTargetException e) {
+ String message = "Error creating predicate bean";
+ LOGGER.error(message, e);
+ throw new GriffinException.ServiceException(message, e);
}
return predicate;
}
diff --git a/service/src/main/java/org/apache/griffin/core/util/MeasureUtil.java b/service/src/main/java/org/apache/griffin/core/util/MeasureUtil.java
index 56ef204..17872f9 100644
--- a/service/src/main/java/org/apache/griffin/core/util/MeasureUtil.java
+++ b/service/src/main/java/org/apache/griffin/core/util/MeasureUtil.java
@@ -20,6 +20,7 @@ under the License.
package org.apache.griffin.core.util;
import static org.apache.griffin.core.exception.GriffinExceptionMessage.INVALID_CONNECTOR_NAME;
+import static org.apache.griffin.core.exception.GriffinExceptionMessage.INVALID_MEASURE_PREDICATE;
import static org.apache.griffin.core.exception.GriffinExceptionMessage.MISSING_METRIC_NAME;
import java.util.ArrayList;
@@ -29,10 +30,10 @@ import java.util.Set;
import org.apache.commons.lang.StringUtils;
import org.apache.griffin.core.exception.GriffinException;
-import org.apache.griffin.core.measure.entity.DataSource;
-import org.apache.griffin.core.measure.entity.ExternalMeasure;
-import org.apache.griffin.core.measure.entity.GriffinMeasure;
-import org.apache.griffin.core.measure.entity.Measure;
+import org.apache.griffin.core.job.Predicator;
+import org.apache.griffin.core.job.entity.SegmentPredicate;
+import org.apache.griffin.core.job.factory.PredicatorFactory;
+import org.apache.griffin.core.measure.entity.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -54,6 +55,24 @@ public class MeasureUtil {
throw new GriffinException.BadRequestException
(INVALID_CONNECTOR_NAME);
}
+ if (!validatePredicates(measure)) {
+ throw new GriffinException.BadRequestException(INVALID_MEASURE_PREDICATE);
+ }
+ }
+
+ private static boolean validatePredicates(GriffinMeasure measure) {
+ for (DataSource dataSource : measure.getDataSources()) {
+ for (DataConnector dataConnector: dataSource.getConnectors()) {
+ for (SegmentPredicate segmentPredicate : dataConnector.getPredicates()) {
+ try {
+ PredicatorFactory.newPredicateInstance(segmentPredicate);
+ } catch (Exception e) {
+ return false;
+ }
+ }
+ }
+ }
+ return true;
}
private static void validateExternalMeasure(ExternalMeasure measure) {
diff --git a/service/src/test/java/org/apache/griffin/core/job/SparkSubmitJobTest.java b/service/src/test/java/org/apache/griffin/core/job/SparkSubmitJobTest.java
index 3d8fe26..71b0887 100644
--- a/service/src/test/java/org/apache/griffin/core/job/SparkSubmitJobTest.java
+++ b/service/src/test/java/org/apache/griffin/core/job/SparkSubmitJobTest.java
@@ -19,19 +19,17 @@ under the License.
package org.apache.griffin.core.job;
-import static org.apache.griffin.core.util.EntityMocksHelper.createFileExistPredicate;
-import static org.apache.griffin.core.util.EntityMocksHelper.createGriffinMeasure;
-import static org.apache.griffin.core.util.EntityMocksHelper.createJobDetail;
-import static org.apache.griffin.core.util.EntityMocksHelper.createJobInstance;
-import static org.apache.griffin.core.util.EntityMocksHelper.createSimpleTrigger;
+import static org.apache.griffin.core.util.EntityMocksHelper.*;
import static org.mockito.BDDMockito.given;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
+import java.util.Arrays;
import java.util.Collections;
import java.util.Properties;
+import org.apache.griffin.core.config.PropertiesConfig;
import org.apache.griffin.core.job.entity.JobInstanceBean;
import org.apache.griffin.core.job.entity.SegmentPredicate;
import org.apache.griffin.core.job.repo.JobInstanceRepo;
@@ -70,7 +68,10 @@ public class SparkSubmitJobTest {
return PropertiesUtil.getProperties(path,
new ClassPathResource(path));
}
-
+ @Bean
+ public PropertiesConfig sparkConf() {
+ return new PropertiesConfig("src/test/resources", null);
+ }
}
@Autowired
@@ -185,4 +186,25 @@ public class SparkSubmitJobTest {
sparkSubmitJob.execute(context);
}
+ @Test
+ public void testMultiplePredicatesWhichReturnsTrue() throws Exception {
+ JobExecutionContext context = mock(JobExecutionContext.class);
+ JobInstanceBean instance = createJobInstance();
+ GriffinMeasure measure = createGriffinMeasure("measureName");
+ SegmentPredicate predicate = createMockPredicate();
+ SegmentPredicate secondPredicate = createMockPredicate();
+ JobDetail jd = createJobDetail(JsonUtil.toJson(measure), JsonUtil.toJson
+ (Arrays.asList(predicate, secondPredicate)));
+ given(context.getJobDetail()).willReturn(jd);
+ given(context.getTrigger()).willReturn(createSimpleTrigger(4, 5));
+ given(jobInstanceRepo.findByPredicateName(Matchers.anyString()))
+ .willReturn(instance);
+ sparkSubmitJob.execute(context);
+
+ verify(context, times(1)).getJobDetail();
+ verify(jobInstanceRepo, times(1)).findByPredicateName(
+ Matchers.anyString());
+ verify(jobInstanceRepo, times(1)).save(instance);
+ }
+
}
diff --git a/service/src/test/java/org/apache/griffin/core/job/factory/PredicatorFactoryTest.java b/service/src/test/java/org/apache/griffin/core/job/factory/PredicatorFactoryTest.java
new file mode 100644
index 0000000..9cbab39
--- /dev/null
+++ b/service/src/test/java/org/apache/griffin/core/job/factory/PredicatorFactoryTest.java
@@ -0,0 +1,47 @@
+package org.apache.griffin.core.job.factory;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import org.apache.griffin.core.exception.GriffinException;
+import org.apache.griffin.core.job.FileExistPredicator;
+import org.apache.griffin.core.job.Predicator;
+import org.apache.griffin.core.job.entity.SegmentPredicate;
+import org.apache.griffin.core.util.PredicatorMock;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.springframework.test.context.junit4.SpringRunner;
+
+import java.io.IOException;
+import java.util.HashMap;
+
+import static org.apache.griffin.core.util.EntityMocksHelper.createFileExistPredicate;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+
+@RunWith(SpringRunner.class)
+public class PredicatorFactoryTest {
+
+ @Test
+ public void testFileExistPredicatorCreation() throws IOException {
+ Predicator predicator = PredicatorFactory.newPredicateInstance(createFileExistPredicate());
+ assertNotNull(predicator);
+ assertTrue(predicator instanceof FileExistPredicator);
+ }
+
+ @Test(expected = GriffinException.NotFoundException.class)
+ public void testUnknownPredicator() throws JsonProcessingException {
+ PredicatorFactory.newPredicateInstance(
+ new SegmentPredicate("unknown", null));
+ }
+
+ @Test
+ public void testPluggablePredicator() throws JsonProcessingException {
+ String predicatorClass = "org.apache.griffin.core.util.PredicatorMock";
+ HashMap<String, Object> map = new HashMap<>();
+ map.put("class", predicatorClass);
+ SegmentPredicate segmentPredicate = new SegmentPredicate("custom", null);
+ segmentPredicate.setConfigMap(map);
+ Predicator predicator = PredicatorFactory.newPredicateInstance(segmentPredicate);
+ assertNotNull(predicator);
+ assertTrue(predicator instanceof PredicatorMock);
+ }
+}
diff --git a/service/src/test/java/org/apache/griffin/core/util/EntityMocksHelper.java b/service/src/test/java/org/apache/griffin/core/util/EntityMocksHelper.java
index e34b39f..563210d 100644
--- a/service/src/test/java/org/apache/griffin/core/util/EntityMocksHelper.java
+++ b/service/src/test/java/org/apache/griffin/core/util/EntityMocksHelper.java
@@ -217,6 +217,7 @@ public class EntityMocksHelper {
jobDataMap.put(MEASURE_KEY, measureJson);
jobDataMap.put(PREDICATES_KEY, predicatesJson);
jobDataMap.put(JOB_NAME, "jobName");
+ jobDataMap.put("jobName", "jobName");
jobDataMap.put(PREDICATE_JOB_NAME, "predicateJobName");
jobDataMap.put(GRIFFIN_JOB_ID, 1L);
jobDetail.setJobDataMap(jobDataMap);
@@ -224,11 +225,24 @@ public class EntityMocksHelper {
}
public static SegmentPredicate createFileExistPredicate()
- throws JsonProcessingException {
+ throws IOException {
Map<String, String> config = new HashMap<>();
config.put("root.path", "hdfs:///griffin/demo_src");
config.put("path", "/dt=#YYYYMMdd#/hour=#HH#/_DONE");
- return new SegmentPredicate("file.exist", config);
+ SegmentPredicate segmentPredicate = new SegmentPredicate("file.exist", config);
+ segmentPredicate.setId(1L);
+ segmentPredicate.load();
+ return segmentPredicate;
+ }
+
+ public static SegmentPredicate createMockPredicate()
+ throws IOException {
+ Map<String, String> config = new HashMap<>();
+ config.put("class", "org.apache.griffin.core.util.PredicatorMock");
+ SegmentPredicate segmentPredicate = new SegmentPredicate("custom", config);
+ segmentPredicate.setId(1L);
+ segmentPredicate.load();
+ return segmentPredicate;
}
public static Map<String, Object> createJobDetailMap() {
diff --git a/service/src/test/java/org/apache/griffin/core/util/PredicatorMock.java b/service/src/test/java/org/apache/griffin/core/util/PredicatorMock.java
new file mode 100644
index 0000000..950bb68
--- /dev/null
+++ b/service/src/test/java/org/apache/griffin/core/util/PredicatorMock.java
@@ -0,0 +1,16 @@
+package org.apache.griffin.core.util;
+
+import org.apache.griffin.core.job.Predicator;
+import org.apache.griffin.core.job.entity.SegmentPredicate;
+
+import java.io.IOException;
+
+public class PredicatorMock implements Predicator {
+ public PredicatorMock(SegmentPredicate segmentPredicate) {
+ }
+
+ @Override
+ public boolean predicate() throws IOException {
+ return true;
+ }
+}