You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@griffin.apache.org by gu...@apache.org on 2019/03/15 08:12:50 UTC

[griffin] branch master updated: [GRIFFIN-232] add support pluggable predicates

This is an automated email from the ASF dual-hosted git repository.

guoyp pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/griffin.git


The following commit(s) were added to refs/heads/master by this push:
     new 11a3011  [GRIFFIN-232] add support pluggable predicates
11a3011 is described below

commit 11a301190ac087e72880d01251a24251e5a0c8bb
Author: Borgatin Alexandr <ab...@griddynamics.com>
AuthorDate: Fri Mar 15 16:12:42 2019 +0800

    [GRIFFIN-232] add support pluggable predicates
    
    Classifier "exec" added to spring-boot-maven-plugin for possibility use service as a library. This will allow to create custom predicates without recompiling Griffin-service module.
    
    The approach to deserialisation in SparkSubmit changed for possibility to use all fields of SegmentPredicate in custom predicates.
    
    Author: Borgatin Alexandr <ab...@griddynamics.com>
    
    Closes #484 from aborgatin/feature/GRIFFIN-232.
---
 griffin-doc/measure/predicates.md                  | 100 +++++++++++++++++++++
 service/pom.xml                                    |  17 +++-
 .../core/exception/GriffinExceptionMessage.java    |   4 +
 .../apache/griffin/core/job/SparkSubmitJob.java    |  12 +--
 .../core/job/factory/PredicatorFactory.java        |  38 +++++++-
 .../org/apache/griffin/core/util/MeasureUtil.java  |  27 +++++-
 .../griffin/core/job/SparkSubmitJobTest.java       |  34 +++++--
 .../core/job/factory/PredicatorFactoryTest.java    |  47 ++++++++++
 .../griffin/core/util/EntityMocksHelper.java       |  18 +++-
 .../apache/griffin/core/util/PredicatorMock.java   |  16 ++++
 10 files changed, 289 insertions(+), 24 deletions(-)

diff --git a/griffin-doc/measure/predicates.md b/griffin-doc/measure/predicates.md
new file mode 100644
index 0000000..389edbe
--- /dev/null
+++ b/griffin-doc/measure/predicates.md
@@ -0,0 +1,100 @@
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+#About predicates
+
+##Overview
+The purpose of predicates is obligate Griffin to check certain conditions before starting SparkSubmitJob. 
+Depending on these conditions Griffin need to start or not start the measurement.
+
+##Configure predicates
+
+For configuring predicates need add property to measure json:
+```
+{
+    ...
+     "data.sources": [
+        ...
+         "connectors": [
+                   "predicates": [
+                       {
+                         "type": "file.exist",
+                         "config": {
+                           "root.path": "/path/to/",
+                           "path": "file.ext,file2.txt"
+                         }
+                       }
+                   ],
+         ...
+         
+     ]
+}
+```
+
+Possible values for predicates.type:
+- "file.exist" - in this case creates predicate with class org.apache.griffin.core.job.FileExistPredicator. This predicate checks existence of files before starting Spark jobs.
+ ```
+                     {
+                         "type": "file.exist",
+                         "config": {
+                           "root.path": "/path/to/",
+                           "path": "file.ext,file2.txt"
+                         }
+                       }
+```
+
+- "custom" - in this case required transmit class name in the property "class" in config. 
+This example creates same predicate like in previous example
+```
+                     {
+                         "type": "custom",
+                         "config": {
+                           "class": "org.apache.griffin.core.job.FileExistPredicator",
+                           "root.path": "/path/to/",
+                           "path": "file.ext,file2.txt"
+                         }
+                       }
+```
+It important to notice that predicate class must satisfy follow conditions:
+- implement interface **org.apache.griffin.core.job.Predicator**
+- have constructor with argument of type **org.apache.griffin.core.job.entity.SegmentPredicate**
+
+##Deployment custom predicates
+For the creating custom predicate you need 
+1. Build the Griffin service using command
+As a result, two artifacts will be built  
+- **service-VERSION.jar** - executable Spring-Boot application
+- **service-VERSION-lib.jar** - jar, which we can use as a dependency
+This step is necessary because we can't use executable Spring-Boot application as a dependency in our plugin. 
+2. Create module and add dependency that was built in previous step
+```
+         <dependency>
+             <groupId>org.apache.griffin</groupId>
+             <artifactId>service</artifactId>
+             <classifier>lib</classifier>
+             <version>${griffin.version}</version>
+             <scope>provided</scope>
+         </dependency>
+```
+3. Create a Predicate class, which should, as mentioned earlier, implement the Predicator interface and have a constructor with an argument of type SegmentPredicate
+4. Build the module into a jar file and put it in any folder (for example /path-to-jar)
+5. Start the Griffin service application using command 
+```
+java -cp target/service-VERSION.jar -Dloader.path=/path-to-jar/ org.springframework.boot.loader.PropertiesLauncher
+```
\ No newline at end of file
diff --git a/service/pom.xml b/service/pom.xml
index fed98bc..5647253 100644
--- a/service/pom.xml
+++ b/service/pom.xml
@@ -280,6 +280,22 @@ under the License.
                 </dependencies>
             </plugin>
             <plugin>
+                <groupId>org.apache.maven.plugins</groupId>
+                <artifactId>maven-jar-plugin</artifactId>
+                <version>3.1.1</version>
+                <executions>
+                    <execution>
+                        <phase>package</phase>
+                        <goals>
+                            <goal>jar</goal>
+                        </goals>
+                        <configuration>
+                            <classifier>lib</classifier>
+                        </configuration>
+                    </execution>
+                </executions>
+            </plugin>
+            <plugin>
                 <groupId>org.springframework.boot</groupId>
                 <artifactId>spring-boot-maven-plugin</artifactId>
                 <version>${spring-boot-maven-plugin.version}</version>
@@ -304,7 +320,6 @@ under the License.
                     <source>1.8</source>
                     <target>1.8</target>
                 </configuration>
-
             </plugin>
         </plugins>
     </build>
diff --git a/service/src/main/java/org/apache/griffin/core/exception/GriffinExceptionMessage.java b/service/src/main/java/org/apache/griffin/core/exception/GriffinExceptionMessage.java
index ae6a0ea..edfe8e9 100644
--- a/service/src/main/java/org/apache/griffin/core/exception/GriffinExceptionMessage.java
+++ b/service/src/main/java/org/apache/griffin/core/exception/GriffinExceptionMessage.java
@@ -54,6 +54,8 @@ public enum GriffinExceptionMessage {
 
     JOB_IS_IN_PAUSED_STATUS(40015, "The job is already in paused status."),
 
+    INVALID_MEASURE_PREDICATE(40016, "The measure predicate is invalid"),
+
     //404, "Not Found"
     MEASURE_ID_DOES_NOT_EXIST(40401, "Measure id does not exist"),
 
@@ -70,6 +72,8 @@ public enum GriffinExceptionMessage {
 
     HDFS_FILE_NOT_EXIST(40407, "Hadoop data file not exist"),
 
+    PREDICATE_TYPE_NOT_FOUND(40408, "Unknown predicate type"),
+
     //409, "Conflict"
     MEASURE_NAME_ALREADY_EXIST(40901, "Measure name already exists"),
 
diff --git a/service/src/main/java/org/apache/griffin/core/job/SparkSubmitJob.java b/service/src/main/java/org/apache/griffin/core/job/SparkSubmitJob.java
index 28ead85..2f2a3f5 100644
--- a/service/src/main/java/org/apache/griffin/core/job/SparkSubmitJob.java
+++ b/service/src/main/java/org/apache/griffin/core/job/SparkSubmitJob.java
@@ -185,17 +185,13 @@ public class SparkSubmitJob implements Job {
         if (StringUtils.isEmpty(json)) {
             return;
         }
-        List<Map<String, Object>> maps = toEntity(json,
-                new TypeReference<List<Map>>() {
+        List<SegmentPredicate> predicates = toEntity(json,
+                new TypeReference<List<SegmentPredicate>>() {
                 });
-        for (Map<String, Object> map : maps) {
-            SegmentPredicate sp = new SegmentPredicate();
-            sp.setType((String) map.get("type"));
-            sp.setConfigMap((Map<String, Object>) map.get("config"));
-            mPredicates.add(sp);
+        if (predicates != null) {
+            mPredicates.addAll(predicates);
         }
     }
-
     private String escapeCharacter(String str, String regex) {
         if (StringUtils.isEmpty(str)) {
             return str;
diff --git a/service/src/main/java/org/apache/griffin/core/job/factory/PredicatorFactory.java b/service/src/main/java/org/apache/griffin/core/job/factory/PredicatorFactory.java
index d65e49d..d9d4473 100644
--- a/service/src/main/java/org/apache/griffin/core/job/factory/PredicatorFactory.java
+++ b/service/src/main/java/org/apache/griffin/core/job/factory/PredicatorFactory.java
@@ -19,25 +19,57 @@ under the License.
 
 package org.apache.griffin.core.job.factory;
 
+import org.apache.griffin.core.exception.GriffinException;
 import org.apache.griffin.core.job.FileExistPredicator;
 import org.apache.griffin.core.job.Predicator;
 import org.apache.griffin.core.job.entity.SegmentPredicate;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.lang.reflect.Constructor;
+import java.lang.reflect.InvocationTargetException;
+
+import static org.apache.griffin.core.exception.GriffinExceptionMessage.PREDICATE_TYPE_NOT_FOUND;
+
 public class PredicatorFactory {
     private static final Logger LOGGER = LoggerFactory
             .getLogger(PredicatorFactory.class);
 
     public static Predicator newPredicateInstance(SegmentPredicate segPredicate) {
-        Predicator predicate = null;
+        Predicator predicate;
         switch (segPredicate.getType()) {
             case "file.exist":
                 predicate = new FileExistPredicator(segPredicate);
                 break;
-            default:
-                LOGGER.warn("There is no predicate type that you input.");
+            case "custom":
+                predicate = getPredicateBean(segPredicate);
                 break;
+            default:
+                throw new GriffinException.NotFoundException(PREDICATE_TYPE_NOT_FOUND);
+        }
+        return predicate;
+    }
+
+    private static Predicator getPredicateBean(SegmentPredicate segmentPredicate) {
+        Predicator predicate;
+        String predicateClassName = (String) segmentPredicate.getConfigMap().get("class");
+        try {
+            Class clazz = Class.forName(predicateClassName);
+            Constructor<Predicator> constructor = clazz.getConstructor(SegmentPredicate.class);
+            predicate = constructor.newInstance(segmentPredicate);
+        } catch (ClassNotFoundException e) {
+            String message = "There is no predicate type that you input.";
+            LOGGER.error(message, e);
+            throw new GriffinException.ServiceException(message, e);
+        } catch (NoSuchMethodException e) {
+            String message = "For predicate with type " + predicateClassName +
+                    " constructor with parameter of type " + SegmentPredicate.class.getName() + " not found";
+            LOGGER.error(message, e);
+            throw new GriffinException.ServiceException(message, e);
+        } catch (InstantiationException | IllegalAccessException | InvocationTargetException e) {
+            String message = "Error creating predicate bean";
+            LOGGER.error(message, e);
+            throw new GriffinException.ServiceException(message, e);
         }
         return predicate;
     }
diff --git a/service/src/main/java/org/apache/griffin/core/util/MeasureUtil.java b/service/src/main/java/org/apache/griffin/core/util/MeasureUtil.java
index 56ef204..17872f9 100644
--- a/service/src/main/java/org/apache/griffin/core/util/MeasureUtil.java
+++ b/service/src/main/java/org/apache/griffin/core/util/MeasureUtil.java
@@ -20,6 +20,7 @@ under the License.
 package org.apache.griffin.core.util;
 
 import static org.apache.griffin.core.exception.GriffinExceptionMessage.INVALID_CONNECTOR_NAME;
+import static org.apache.griffin.core.exception.GriffinExceptionMessage.INVALID_MEASURE_PREDICATE;
 import static org.apache.griffin.core.exception.GriffinExceptionMessage.MISSING_METRIC_NAME;
 
 import java.util.ArrayList;
@@ -29,10 +30,10 @@ import java.util.Set;
 
 import org.apache.commons.lang.StringUtils;
 import org.apache.griffin.core.exception.GriffinException;
-import org.apache.griffin.core.measure.entity.DataSource;
-import org.apache.griffin.core.measure.entity.ExternalMeasure;
-import org.apache.griffin.core.measure.entity.GriffinMeasure;
-import org.apache.griffin.core.measure.entity.Measure;
+import org.apache.griffin.core.job.Predicator;
+import org.apache.griffin.core.job.entity.SegmentPredicate;
+import org.apache.griffin.core.job.factory.PredicatorFactory;
+import org.apache.griffin.core.measure.entity.*;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -54,6 +55,24 @@ public class MeasureUtil {
             throw new GriffinException.BadRequestException
                     (INVALID_CONNECTOR_NAME);
         }
+        if (!validatePredicates(measure)) {
+            throw new GriffinException.BadRequestException(INVALID_MEASURE_PREDICATE);
+        }
+    }
+
+    private static boolean validatePredicates(GriffinMeasure measure) {
+        for (DataSource dataSource : measure.getDataSources()) {
+            for (DataConnector dataConnector: dataSource.getConnectors()) {
+                for (SegmentPredicate segmentPredicate : dataConnector.getPredicates()) {
+                    try {
+                        PredicatorFactory.newPredicateInstance(segmentPredicate);
+                    } catch (Exception e) {
+                        return false;
+                    }
+                }
+            }
+        }
+        return true;
     }
 
     private static void validateExternalMeasure(ExternalMeasure measure) {
diff --git a/service/src/test/java/org/apache/griffin/core/job/SparkSubmitJobTest.java b/service/src/test/java/org/apache/griffin/core/job/SparkSubmitJobTest.java
index 3d8fe26..71b0887 100644
--- a/service/src/test/java/org/apache/griffin/core/job/SparkSubmitJobTest.java
+++ b/service/src/test/java/org/apache/griffin/core/job/SparkSubmitJobTest.java
@@ -19,19 +19,17 @@ under the License.
 
 package org.apache.griffin.core.job;
 
-import static org.apache.griffin.core.util.EntityMocksHelper.createFileExistPredicate;
-import static org.apache.griffin.core.util.EntityMocksHelper.createGriffinMeasure;
-import static org.apache.griffin.core.util.EntityMocksHelper.createJobDetail;
-import static org.apache.griffin.core.util.EntityMocksHelper.createJobInstance;
-import static org.apache.griffin.core.util.EntityMocksHelper.createSimpleTrigger;
+import static org.apache.griffin.core.util.EntityMocksHelper.*;
 import static org.mockito.BDDMockito.given;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
 
+import java.util.Arrays;
 import java.util.Collections;
 import java.util.Properties;
 
+import org.apache.griffin.core.config.PropertiesConfig;
 import org.apache.griffin.core.job.entity.JobInstanceBean;
 import org.apache.griffin.core.job.entity.SegmentPredicate;
 import org.apache.griffin.core.job.repo.JobInstanceRepo;
@@ -70,7 +68,10 @@ public class SparkSubmitJobTest {
             return PropertiesUtil.getProperties(path,
                     new ClassPathResource(path));
         }
-
+        @Bean
+        public PropertiesConfig sparkConf() {
+            return new PropertiesConfig("src/test/resources", null);
+        }
     }
 
     @Autowired
@@ -185,4 +186,25 @@ public class SparkSubmitJobTest {
         sparkSubmitJob.execute(context);
     }
 
+    @Test
+    public void testMultiplePredicatesWhichReturnsTrue() throws Exception {
+        JobExecutionContext context = mock(JobExecutionContext.class);
+        JobInstanceBean instance = createJobInstance();
+        GriffinMeasure measure = createGriffinMeasure("measureName");
+        SegmentPredicate predicate = createMockPredicate();
+        SegmentPredicate secondPredicate = createMockPredicate();
+        JobDetail jd = createJobDetail(JsonUtil.toJson(measure), JsonUtil.toJson
+                (Arrays.asList(predicate, secondPredicate)));
+        given(context.getJobDetail()).willReturn(jd);
+        given(context.getTrigger()).willReturn(createSimpleTrigger(4, 5));
+        given(jobInstanceRepo.findByPredicateName(Matchers.anyString()))
+                .willReturn(instance);
+        sparkSubmitJob.execute(context);
+
+        verify(context, times(1)).getJobDetail();
+        verify(jobInstanceRepo, times(1)).findByPredicateName(
+                Matchers.anyString());
+        verify(jobInstanceRepo, times(1)).save(instance);
+    }
+
 }
diff --git a/service/src/test/java/org/apache/griffin/core/job/factory/PredicatorFactoryTest.java b/service/src/test/java/org/apache/griffin/core/job/factory/PredicatorFactoryTest.java
new file mode 100644
index 0000000..9cbab39
--- /dev/null
+++ b/service/src/test/java/org/apache/griffin/core/job/factory/PredicatorFactoryTest.java
@@ -0,0 +1,47 @@
+package org.apache.griffin.core.job.factory;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import org.apache.griffin.core.exception.GriffinException;
+import org.apache.griffin.core.job.FileExistPredicator;
+import org.apache.griffin.core.job.Predicator;
+import org.apache.griffin.core.job.entity.SegmentPredicate;
+import org.apache.griffin.core.util.PredicatorMock;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.springframework.test.context.junit4.SpringRunner;
+
+import java.io.IOException;
+import java.util.HashMap;
+
+import static org.apache.griffin.core.util.EntityMocksHelper.createFileExistPredicate;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+
+@RunWith(SpringRunner.class)
+public class PredicatorFactoryTest {
+
+    @Test
+    public void testFileExistPredicatorCreation() throws IOException {
+        Predicator predicator = PredicatorFactory.newPredicateInstance(createFileExistPredicate());
+        assertNotNull(predicator);
+        assertTrue(predicator instanceof FileExistPredicator);
+    }
+
+    @Test(expected = GriffinException.NotFoundException.class)
+    public void testUnknownPredicator() throws JsonProcessingException {
+        PredicatorFactory.newPredicateInstance(
+                new SegmentPredicate("unknown", null));
+    }
+
+    @Test
+    public void testPluggablePredicator() throws JsonProcessingException {
+        String predicatorClass = "org.apache.griffin.core.util.PredicatorMock";
+        HashMap<String, Object> map = new HashMap<>();
+        map.put("class", predicatorClass);
+        SegmentPredicate segmentPredicate = new SegmentPredicate("custom", null);
+        segmentPredicate.setConfigMap(map);
+        Predicator predicator = PredicatorFactory.newPredicateInstance(segmentPredicate);
+        assertNotNull(predicator);
+        assertTrue(predicator instanceof PredicatorMock);
+    }
+}
diff --git a/service/src/test/java/org/apache/griffin/core/util/EntityMocksHelper.java b/service/src/test/java/org/apache/griffin/core/util/EntityMocksHelper.java
index e34b39f..563210d 100644
--- a/service/src/test/java/org/apache/griffin/core/util/EntityMocksHelper.java
+++ b/service/src/test/java/org/apache/griffin/core/util/EntityMocksHelper.java
@@ -217,6 +217,7 @@ public class EntityMocksHelper {
         jobDataMap.put(MEASURE_KEY, measureJson);
         jobDataMap.put(PREDICATES_KEY, predicatesJson);
         jobDataMap.put(JOB_NAME, "jobName");
+        jobDataMap.put("jobName", "jobName");
         jobDataMap.put(PREDICATE_JOB_NAME, "predicateJobName");
         jobDataMap.put(GRIFFIN_JOB_ID, 1L);
         jobDetail.setJobDataMap(jobDataMap);
@@ -224,11 +225,24 @@ public class EntityMocksHelper {
     }
 
     public static SegmentPredicate createFileExistPredicate()
-        throws JsonProcessingException {
+            throws IOException {
         Map<String, String> config = new HashMap<>();
         config.put("root.path", "hdfs:///griffin/demo_src");
         config.put("path", "/dt=#YYYYMMdd#/hour=#HH#/_DONE");
-        return new SegmentPredicate("file.exist", config);
+        SegmentPredicate segmentPredicate = new SegmentPredicate("file.exist", config);
+        segmentPredicate.setId(1L);
+        segmentPredicate.load();
+        return segmentPredicate;
+    }
+
+    public static SegmentPredicate createMockPredicate()
+            throws IOException {
+        Map<String, String> config = new HashMap<>();
+        config.put("class", "org.apache.griffin.core.util.PredicatorMock");
+        SegmentPredicate segmentPredicate = new SegmentPredicate("custom", config);
+        segmentPredicate.setId(1L);
+        segmentPredicate.load();
+        return segmentPredicate;
     }
 
     public static Map<String, Object> createJobDetailMap() {
diff --git a/service/src/test/java/org/apache/griffin/core/util/PredicatorMock.java b/service/src/test/java/org/apache/griffin/core/util/PredicatorMock.java
new file mode 100644
index 0000000..950bb68
--- /dev/null
+++ b/service/src/test/java/org/apache/griffin/core/util/PredicatorMock.java
@@ -0,0 +1,16 @@
+package org.apache.griffin.core.util;
+
+import org.apache.griffin.core.job.Predicator;
+import org.apache.griffin.core.job.entity.SegmentPredicate;
+
+import java.io.IOException;
+
+public class PredicatorMock implements Predicator {
+    public PredicatorMock(SegmentPredicate segmentPredicate) {
+    }
+
+    @Override
+    public boolean predicate() throws IOException {
+        return true;
+    }
+}