You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@griffin.apache.org by gu...@apache.org on 2018/08/16 10:57:29 UTC

[4/4] incubator-griffin git commit: Fix bugs about code style violation against Griffin rules

Fix bugs about code style violation against Griffin rules

Author: Eugene <to...@163.com>

Closes #388 from toyboxman/wrap.


Project: http://git-wip-us.apache.org/repos/asf/incubator-griffin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-griffin/commit/3bbbcb32
Tree: http://git-wip-us.apache.org/repos/asf/incubator-griffin/tree/3bbbcb32
Diff: http://git-wip-us.apache.org/repos/asf/incubator-griffin/diff/3bbbcb32

Branch: refs/heads/master
Commit: 3bbbcb32686bb691bdef9af71ef48685d04ea63f
Parents: 10f8103
Author: Eugene <to...@163.com>
Authored: Thu Aug 16 18:57:19 2018 +0800
Committer: William Guo <gu...@apache.org>
Committed: Thu Aug 16 18:57:19 2018 +0800

----------------------------------------------------------------------
 .../griffin/core/GriffinWebApplication.java     |   5 +-
 .../core/config/EclipseLinkJpaConfig.java       |  10 +-
 .../apache/griffin/core/config/EnvConfig.java   |  10 +-
 .../apache/griffin/core/config/LoginConfig.java |   3 +-
 .../griffin/core/config/PropertiesConfig.java   |  32 ++--
 .../griffin/core/config/SchedulerConfig.java    |   6 +-
 .../core/exception/GriffinExceptionHandler.java |  19 +-
 .../core/exception/GriffinExceptionMessage.java |  37 +++-
 .../exception/GriffinExceptionResponse.java     |   6 +-
 .../griffin/core/job/BatchJobOperatorImpl.java  | 109 ++++++++----
 .../griffin/core/job/FileExistPredicator.java   |  12 +-
 .../apache/griffin/core/job/JobController.java  |  29 ++-
 .../apache/griffin/core/job/JobInstance.java    |  94 ++++++----
 .../apache/griffin/core/job/JobOperator.java    |   9 +-
 .../apache/griffin/core/job/JobServiceImpl.java | 175 +++++++++++++------
 .../apache/griffin/core/job/SparkSubmitJob.java |  50 ++++--
 .../core/job/StreamingJobOperatorImpl.java      |  97 ++++++----
 .../griffin/core/job/entity/AbstractJob.java    |  29 ++-
 .../griffin/core/job/entity/BatchJob.java       |  10 +-
 .../griffin/core/job/entity/JobDataSegment.java |  12 +-
 .../core/job/entity/JobInstanceBean.java        |  12 +-
 .../griffin/core/job/entity/JobState.java       |   3 +-
 .../core/job/entity/LivySessionStates.java      |  15 +-
 .../core/job/entity/SegmentPredicate.java       |   8 +-
 .../griffin/core/job/entity/StreamingJob.java   |   3 +-
 .../factory/AutowiringSpringBeanJobFactory.java |   3 +-
 .../core/job/factory/PredicatorFactory.java     |   3 +-
 .../griffin/core/job/repo/JobInstanceRepo.java  |   9 +-
 .../apache/griffin/core/job/repo/JobRepo.java   |   3 +-
 .../griffin/core/login/LoginController.java     |   3 +-
 .../core/login/LoginServiceDefaultImpl.java     |   2 +-
 .../core/login/LoginServiceLdapImpl.java        |  24 ++-
 .../measure/ExternalMeasureOperatorImpl.java    |   6 +-
 .../measure/GriffinMeasureOperatorImpl.java     |   3 +-
 .../griffin/core/measure/MeasureController.java |  12 +-
 .../core/measure/MeasureOrgController.java      |   4 +-
 .../griffin/core/measure/MeasureOrgService.java |   5 +-
 .../core/measure/MeasureOrgServiceImpl.java     |  19 +-
 .../core/measure/MeasureServiceImpl.java        |  27 ++-
 .../core/measure/entity/DataConnector.java      |  32 ++--
 .../griffin/core/measure/entity/DataSource.java |   8 +-
 .../core/measure/entity/EvaluateRule.java       |   3 +-
 .../core/measure/entity/ExternalMeasure.java    |   3 +-
 .../core/measure/entity/GriffinMeasure.java     |  22 ++-
 .../griffin/core/measure/entity/Measure.java    |  10 +-
 .../griffin/core/measure/entity/Rule.java       |  25 ++-
 .../measure/entity/StreamingPreProcess.java     |   5 +-
 .../metastore/hive/HiveMetaStoreController.java |   3 +-
 .../core/metastore/hive/HiveMetaStoreProxy.java |  13 +-
 .../hive/HiveMetaStoreServiceImpl.java          |  32 ++--
 .../metastore/kafka/KafkaSchemaController.java  |   6 +-
 .../metastore/kafka/KafkaSchemaServiceImpl.java |  21 ++-
 .../griffin/core/metric/MetricController.java   |  17 +-
 .../griffin/core/metric/MetricService.java      |   3 +-
 .../griffin/core/metric/MetricServiceImpl.java  |  48 +++--
 .../apache/griffin/core/metric/MetricStore.java |   6 +-
 .../griffin/core/metric/MetricStoreImpl.java    |  81 ++++++---
 .../griffin/core/metric/model/Metric.java       |   3 +-
 .../org/apache/griffin/core/util/FSUtil.java    |  33 ++--
 .../org/apache/griffin/core/util/FileUtil.java  |   9 +-
 .../org/apache/griffin/core/util/JsonUtil.java  |  24 ++-
 .../apache/griffin/core/util/MeasureUtil.java   |  12 +-
 .../griffin/core/util/PropertiesUtil.java       |  10 +-
 .../org/apache/griffin/core/util/TimeUtil.java  |  23 ++-
 .../apache/griffin/core/util/YarnNetUtil.java   |   7 +-
 .../config/EclipseLinkJpaConfigForTest.java     |  11 +-
 .../core/config/PropertiesConfigTest.java       |   8 +-
 .../core/info/GriffinInfoControllerTest.java    |   2 +-
 .../griffin/core/job/JobControllerTest.java     |  35 ++--
 .../core/job/JobInstanceBeanRepoTest.java       |  27 ++-
 .../griffin/core/job/JobInstanceTest.java       |  44 +++--
 .../griffin/core/job/SparkSubmitJobTest.java    |  44 +++--
 .../core/job/repo/JobInstanceRepoTest.java      |   8 +-
 .../griffin/core/job/repo/JobRepoTest.java      |  11 +-
 .../ExternalMeasureOperatorImplTest.java        |   9 +-
 .../measure/GriffinMeasureOperatorImplTest.java |  12 +-
 .../core/measure/MeasureControllerTest.java     |  36 ++--
 .../core/measure/MeasureOrgControllerTest.java  |   3 +-
 .../core/measure/MeasureOrgServiceImplTest.java |  23 ++-
 .../core/measure/MeasureServiceImplTest.java    |  69 +++++---
 .../measure/repo/DataConnectorRepoTest.java     |  17 +-
 .../core/measure/repo/MeasureRepoTest.java      |   3 +-
 .../hive/HiveMetaStoreControllerTest.java       |  28 ++-
 .../hive/HiveMetaStoreServiceImplTest.java      |   9 +-
 .../kafka/KafkaSchemaControllerTest.java        |  12 +-
 .../core/metric/MetricControllerTest.java       |  49 ++++--
 .../core/metric/MetricServiceImplTest.java      |  54 ++++--
 .../core/metric/MetricStoreImplTest.java        |   9 +-
 .../apache/griffin/core/util/EntityHelper.java  | 115 +++++++-----
 .../apache/griffin/core/util/JsonUtilTest.java  |  15 +-
 .../griffin/core/util/PropertiesUtilTest.java   |  11 +-
 .../apache/griffin/core/util/TimeUtilTest.java  |   8 +-
 92 files changed, 1374 insertions(+), 675 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/3bbbcb32/service/src/main/java/org/apache/griffin/core/GriffinWebApplication.java
----------------------------------------------------------------------
diff --git a/service/src/main/java/org/apache/griffin/core/GriffinWebApplication.java b/service/src/main/java/org/apache/griffin/core/GriffinWebApplication.java
index e5f877b..4ed4773 100644
--- a/service/src/main/java/org/apache/griffin/core/GriffinWebApplication.java
+++ b/service/src/main/java/org/apache/griffin/core/GriffinWebApplication.java
@@ -31,7 +31,8 @@ import org.springframework.scheduling.annotation.EnableScheduling;
 @SpringBootApplication
 @EnableScheduling
 public class GriffinWebApplication {
-    private static final Logger LOGGER = LoggerFactory.getLogger(GriffinWebApplication.class);
+    private static final Logger LOGGER = LoggerFactory
+            .getLogger(GriffinWebApplication.class);
 
     public static void main(String[] args) {
         LOGGER.info("application start");
@@ -43,4 +44,4 @@ public class GriffinWebApplication {
         return new SimpleCORSFilter();
     }
 
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/3bbbcb32/service/src/main/java/org/apache/griffin/core/config/EclipseLinkJpaConfig.java
----------------------------------------------------------------------
diff --git a/service/src/main/java/org/apache/griffin/core/config/EclipseLinkJpaConfig.java b/service/src/main/java/org/apache/griffin/core/config/EclipseLinkJpaConfig.java
index ffb013a..1493569 100644
--- a/service/src/main/java/org/apache/griffin/core/config/EclipseLinkJpaConfig.java
+++ b/service/src/main/java/org/apache/griffin/core/config/EclipseLinkJpaConfig.java
@@ -36,9 +36,10 @@ import org.springframework.transaction.jta.JtaTransactionManager;
 @Configuration
 @ComponentScan("org.apache.griffin.core")
 public class EclipseLinkJpaConfig extends JpaBaseConfiguration {
-    protected EclipseLinkJpaConfig(DataSource ds, JpaProperties properties,
-                                   ObjectProvider<JtaTransactionManager> jtm,
-                                   ObjectProvider<TransactionManagerCustomizers> tmc) {
+    protected EclipseLinkJpaConfig(
+            DataSource ds, JpaProperties properties,
+            ObjectProvider<JtaTransactionManager> jtm,
+            ObjectProvider<TransactionManagerCustomizers> tmc) {
         super(ds, properties, jtm, tmc);
     }
 
@@ -51,7 +52,8 @@ public class EclipseLinkJpaConfig extends JpaBaseConfiguration {
     protected Map<String, Object> getVendorProperties() {
         Map<String, Object> map = new HashMap<>();
         map.put(PersistenceUnitProperties.WEAVING, "false");
-        map.put(PersistenceUnitProperties.DDL_GENERATION, "create-or-extend-tables");
+        map.put(PersistenceUnitProperties.DDL_GENERATION,
+                "create-or-extend-tables");
         return map;
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/3bbbcb32/service/src/main/java/org/apache/griffin/core/config/EnvConfig.java
----------------------------------------------------------------------
diff --git a/service/src/main/java/org/apache/griffin/core/config/EnvConfig.java b/service/src/main/java/org/apache/griffin/core/config/EnvConfig.java
index ebd5a46..8c075a4 100644
--- a/service/src/main/java/org/apache/griffin/core/config/EnvConfig.java
+++ b/service/src/main/java/org/apache/griffin/core/config/EnvConfig.java
@@ -74,7 +74,8 @@ public class EnvConfig {
      * @return String
      * @throws IOException io exception
      */
-    private static String readEnvFromAbsolutePath(String path) throws IOException {
+    private static String readEnvFromAbsolutePath(String path)
+            throws IOException {
         if (path == null) {
             LOGGER.warn("Parameter path is null.");
             return null;
@@ -101,7 +102,8 @@ public class EnvConfig {
      * @return String
      * @throws IOException io exception
      */
-    static String getBatchEnv(String name, String defaultPath, String location) throws IOException {
+    static String getBatchEnv(String name, String defaultPath, String location)
+            throws IOException {
         if (ENV_BATCH != null) {
             return ENV_BATCH;
         }
@@ -116,7 +118,9 @@ public class EnvConfig {
         return ENV_BATCH;
     }
 
-    static String getStreamingEnv(String name, String defaultPath, String location)
+    static String getStreamingEnv(String name,
+                                  String defaultPath,
+                                  String location)
             throws IOException {
         if (ENV_STREAMING != null) {
             return ENV_STREAMING;

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/3bbbcb32/service/src/main/java/org/apache/griffin/core/config/LoginConfig.java
----------------------------------------------------------------------
diff --git a/service/src/main/java/org/apache/griffin/core/config/LoginConfig.java b/service/src/main/java/org/apache/griffin/core/config/LoginConfig.java
index 57066a8..ca43751 100644
--- a/service/src/main/java/org/apache/griffin/core/config/LoginConfig.java
+++ b/service/src/main/java/org/apache/griffin/core/config/LoginConfig.java
@@ -47,7 +47,8 @@ public class LoginConfig {
             case "default":
                 return new LoginServiceDefaultImpl();
             case "ldap":
-                return new LoginServiceLdapImpl(url, email, searchBase, searchPattern);
+                return new LoginServiceLdapImpl(url, email, searchBase,
+                        searchPattern);
             default:
                 return null;
         }

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/3bbbcb32/service/src/main/java/org/apache/griffin/core/config/PropertiesConfig.java
----------------------------------------------------------------------
diff --git a/service/src/main/java/org/apache/griffin/core/config/PropertiesConfig.java b/service/src/main/java/org/apache/griffin/core/config/PropertiesConfig.java
index 16e5525..3bdb02b 100644
--- a/service/src/main/java/org/apache/griffin/core/config/PropertiesConfig.java
+++ b/service/src/main/java/org/apache/griffin/core/config/PropertiesConfig.java
@@ -45,7 +45,8 @@ import org.springframework.core.io.ClassPathResource;
 @Configuration
 public class PropertiesConfig {
 
-    private static final Logger LOGGER = LoggerFactory.getLogger(PropertiesConfig.class);
+    private static final Logger LOGGER = LoggerFactory
+            .getLogger(PropertiesConfig.class);
 
     public static Map<String, Object> livyConfMap;
 
@@ -53,9 +54,13 @@ public class PropertiesConfig {
 
     private String envLocation;
 
-    public PropertiesConfig(@Value("${external.config.location}") String configLocation, @Value("${external.env.location}") String envLocation) {
-        LOGGER.info("external.config.location : {}", configLocation != null ? configLocation : "null");
-        LOGGER.info("external.env.location : {}", envLocation != null ? envLocation : "null");
+    public PropertiesConfig(
+            @Value("${external.config.location}") String configLocation,
+            @Value("${external.env .location}") String envLocation) {
+        LOGGER.info("external.config.location : {}",
+                configLocation != null ? configLocation : "null");
+        LOGGER.info("external.env.location : {}",
+                envLocation != null ? envLocation : "null");
         this.configLocation = configLocation;
         this.envLocation = envLocation;
     }
@@ -73,10 +78,12 @@ public class PropertiesConfig {
     }
 
     /**
-     * Config quartz.properties will be replaced if it's found in external.config.location setting.
+     * Config quartz.properties will be replaced if it's found in external
+     * .config.location setting.
      *
      * @return Properties
-     * @throws FileNotFoundException It'll throw FileNotFoundException when path is wrong.
+     * @throws FileNotFoundException It'll throw FileNotFoundException
+     *                               when path is wrong.
      */
     @Bean(name = "quartzConf")
     public Properties quartzConf() throws FileNotFoundException {
@@ -85,7 +92,10 @@ public class PropertiesConfig {
         return getConf(name, defaultPath, configLocation);
     }
 
-    private static void genLivyConf(String name, String defaultPath, String location) throws IOException {
+    private static void genLivyConf(
+            String name,
+            String defaultPath,
+            String location) throws IOException {
         if (livyConfMap != null) {
             return;
         }
@@ -106,13 +116,15 @@ public class PropertiesConfig {
      * @return Map
      * @throws IOException io exception
      */
-    private static Map<String, Object> readPropertiesFromResource(String path) throws IOException {
+    private static Map<String, Object> readPropertiesFromResource(String path)
+            throws IOException {
         if (path == null) {
             LOGGER.warn("Parameter path is null.");
             return null;
         }
-        // Be careful, here we use getInputStream() to convert path file to stream.
-        // It'll cause FileNotFoundException if you use  getFile() to convert path file to File Object
+        // Be careful, here we use getInputStream() to convert path file to
+        // stream. It'll cause FileNotFoundException if you use  getFile()
+        // to convert path file to File Object
         InputStream in = new ClassPathResource(path).getInputStream();
         return toEntity(in, new TypeReference<Map<String, Object>>() {
         });

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/3bbbcb32/service/src/main/java/org/apache/griffin/core/config/SchedulerConfig.java
----------------------------------------------------------------------
diff --git a/service/src/main/java/org/apache/griffin/core/config/SchedulerConfig.java b/service/src/main/java/org/apache/griffin/core/config/SchedulerConfig.java
index 8c71e89..7b6af51 100644
--- a/service/src/main/java/org/apache/griffin/core/config/SchedulerConfig.java
+++ b/service/src/main/java/org/apache/griffin/core/config/SchedulerConfig.java
@@ -43,13 +43,15 @@ public class SchedulerConfig {
 
     @Bean
     public JobFactory jobFactory(ApplicationContext applicationContext) {
-        AutowiringSpringBeanJobFactory jobFactory = new AutowiringSpringBeanJobFactory();
+        AutowiringSpringBeanJobFactory jobFactory =
+                new AutowiringSpringBeanJobFactory();
         jobFactory.setApplicationContext(applicationContext);
         return jobFactory;
     }
 
     @Bean
-    public SchedulerFactoryBean schedulerFactoryBean(DataSource dataSource, JobFactory jobFactory) {
+    public SchedulerFactoryBean schedulerFactoryBean(DataSource dataSource,
+                                                     JobFactory jobFactory) {
         SchedulerFactoryBean factory = new SchedulerFactoryBean();
         factory.setOverwriteExistingJobs(true);
         factory.setDataSource(dataSource);

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/3bbbcb32/service/src/main/java/org/apache/griffin/core/exception/GriffinExceptionHandler.java
----------------------------------------------------------------------
diff --git a/service/src/main/java/org/apache/griffin/core/exception/GriffinExceptionHandler.java b/service/src/main/java/org/apache/griffin/core/exception/GriffinExceptionHandler.java
index 0a105b4..d987da7 100644
--- a/service/src/main/java/org/apache/griffin/core/exception/GriffinExceptionHandler.java
+++ b/service/src/main/java/org/apache/griffin/core/exception/GriffinExceptionHandler.java
@@ -33,22 +33,29 @@ public class GriffinExceptionHandler {
 
     @SuppressWarnings("rawtypes")
     @ExceptionHandler(GriffinException.ServiceException.class)
-    public ResponseEntity handleGriffinExceptionOfServer(HttpServletRequest request, GriffinException.ServiceException e) {
+    public ResponseEntity handleGriffinExceptionOfServer(
+            HttpServletRequest request,
+            GriffinException.ServiceException e) {
         String message = e.getMessage();
         Throwable cause = e.getCause();
-        GriffinExceptionResponse body = new GriffinExceptionResponse(HttpStatus.INTERNAL_SERVER_ERROR,
+        GriffinExceptionResponse body = new GriffinExceptionResponse(
+                HttpStatus.INTERNAL_SERVER_ERROR,
                 message, request.getRequestURI(), cause.getClass().getName());
         return new ResponseEntity<>(body, HttpStatus.INTERNAL_SERVER_ERROR);
     }
 
     @SuppressWarnings("rawtypes")
     @ExceptionHandler(GriffinException.class)
-    public ResponseEntity handleGriffinExceptionOfClient(HttpServletRequest request, GriffinException e) {
-        ResponseStatus responseStatus = AnnotationUtils.findAnnotation(e.getClass(), ResponseStatus.class);
+    public ResponseEntity handleGriffinExceptionOfClient(
+            HttpServletRequest request, GriffinException e) {
+        ResponseStatus responseStatus = AnnotationUtils.findAnnotation(
+                e.getClass(), ResponseStatus.class);
         HttpStatus status = responseStatus.code();
         String code = e.getMessage();
-        GriffinExceptionMessage message = GriffinExceptionMessage.valueOf(Integer.valueOf(code));
-        GriffinExceptionResponse body = new GriffinExceptionResponse(status, message, request.getRequestURI());
+        GriffinExceptionMessage message = GriffinExceptionMessage
+                .valueOf(Integer.valueOf(code));
+        GriffinExceptionResponse body = new GriffinExceptionResponse(
+                status, message, request.getRequestURI());
         return new ResponseEntity<>(body, status);
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/3bbbcb32/service/src/main/java/org/apache/griffin/core/exception/GriffinExceptionMessage.java
----------------------------------------------------------------------
diff --git a/service/src/main/java/org/apache/griffin/core/exception/GriffinExceptionMessage.java b/service/src/main/java/org/apache/griffin/core/exception/GriffinExceptionMessage.java
index 14d987d..ae6a0ea 100644
--- a/service/src/main/java/org/apache/griffin/core/exception/GriffinExceptionMessage.java
+++ b/service/src/main/java/org/apache/griffin/core/exception/GriffinExceptionMessage.java
@@ -22,35 +22,57 @@ package org.apache.griffin.core.exception;
 public enum GriffinExceptionMessage {
 
     //400, "Bad Request"
-    MEASURE_TYPE_DOES_NOT_MATCH(40001, "Property 'measure.type' does not match the type of measure in request body"),
-    INVALID_CONNECTOR_NAME(40002, "Property 'name' in 'connectors' field is invalid"),
+    MEASURE_TYPE_DOES_NOT_MATCH(40001, "Property 'measure.type' does not match"
+            + "the type of measure in request body"),
+    INVALID_CONNECTOR_NAME(40002, "Property 'name' in 'connectors' " +
+            "field is invalid"),
     MISSING_METRIC_NAME(40003, "Missing property 'metricName'"),
     INVALID_JOB_NAME(40004, "Property 'job.name' is invalid"),
     MISSING_BASELINE_CONFIG(40005, "Missing 'as.baseline' config in 'data.segments'"),
+
     INVALID_METRIC_RECORDS_OFFSET(40006, "Offset must not be less than zero"),
+
     INVALID_METRIC_RECORDS_SIZE(40007, "Size must not be less than zero"),
+
     INVALID_METRIC_VALUE_FORMAT(40008, "Metric value format is invalid"),
+
     INVALID_MEASURE_ID(40009, "Property 'measure.id' is invalid"),
+
     INVALID_CRON_EXPRESSION(40010, "Property 'cron.expression' is invalid"),
+
     MEASURE_TYPE_DOES_NOT_SUPPORT(40011, "We don't support such measure type."),
+
     JOB_TYPE_DOES_NOT_SUPPORT(40011, "We don't support such job type."),
-    STREAMING_JOB_IS_RUNNING(40012, "There is no need to start again as job is RUNNING."),
-    STREAMING_JOB_IS_STOPPED(40012, "There is no need to stop again as job is STOPPED."),
+
+    STREAMING_JOB_IS_RUNNING(40012, "There is no need to start again " +
+            "as job is RUNNING."),
+    STREAMING_JOB_IS_STOPPED(40012, "There is no need to stop again " +
+            "as job is STOPPED."),
     JOB_IS_NOT_SCHEDULED(40013, "The job isn't scheduled."),
+
     JOB_IS_NOT_IN_PAUSED_STATUS(40014, "The job isn't in paused status."),
+
     JOB_IS_IN_PAUSED_STATUS(40015, "The job is already in paused status."),
 
     //404, "Not Found"
     MEASURE_ID_DOES_NOT_EXIST(40401, "Measure id does not exist"),
+
     JOB_ID_DOES_NOT_EXIST(40402, "Job id does not exist"),
+
     JOB_NAME_DOES_NOT_EXIST(40403, "Job name does not exist"),
+
     NO_SUCH_JOB_ACTION(40404, "No such job action"),
-    JOB_KEY_DOES_NOT_EXIST(40405, "Job key which consists of group and name does not exist."),
-    ORGANIZATION_NAME_DOES_NOT_EXIST(40406, "Organization name does not exist"),
+
+    JOB_KEY_DOES_NOT_EXIST(40405, "Job key which consists of " +
+            "group and name does not exist."),
+    ORGANIZATION_NAME_DOES_NOT_EXIST(40406, "Organization name " +
+            "does not exist"),
+
     HDFS_FILE_NOT_EXIST(40407, "Hadoop data file not exist"),
 
     //409, "Conflict"
     MEASURE_NAME_ALREADY_EXIST(40901, "Measure name already exists"),
+
     QUARTZ_JOB_ALREADY_EXIST(40902, "Quartz job already exist");
 
     private final int code;
@@ -70,7 +92,8 @@ public enum GriffinExceptionMessage {
                 return message;
             }
         }
-        throw new IllegalArgumentException("No matching constant for [" + code + "]");
+        throw new IllegalArgumentException("No matching constant for ["
+                + code + "]");
     }
 
 

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/3bbbcb32/service/src/main/java/org/apache/griffin/core/exception/GriffinExceptionResponse.java
----------------------------------------------------------------------
diff --git a/service/src/main/java/org/apache/griffin/core/exception/GriffinExceptionResponse.java b/service/src/main/java/org/apache/griffin/core/exception/GriffinExceptionResponse.java
index db8f766..47e28da 100644
--- a/service/src/main/java/org/apache/griffin/core/exception/GriffinExceptionResponse.java
+++ b/service/src/main/java/org/apache/griffin/core/exception/GriffinExceptionResponse.java
@@ -37,7 +37,8 @@ public class GriffinExceptionResponse {
     private String path;
 
 
-    GriffinExceptionResponse(HttpStatus status, GriffinExceptionMessage message, String path) {
+    GriffinExceptionResponse(HttpStatus status, GriffinExceptionMessage message,
+                             String path) {
         this.status = status.value();
         this.error = status.getReasonPhrase();
         this.code = Integer.toString(message.getCode());
@@ -45,7 +46,8 @@ public class GriffinExceptionResponse {
         this.path = path;
     }
 
-    GriffinExceptionResponse(HttpStatus status, String message, String path, String exception) {
+    GriffinExceptionResponse(HttpStatus status, String message, String path,
+                             String exception) {
         this.status = status.value();
         this.error = status.getReasonPhrase();
         this.message = message;

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/3bbbcb32/service/src/main/java/org/apache/griffin/core/job/BatchJobOperatorImpl.java
----------------------------------------------------------------------
diff --git a/service/src/main/java/org/apache/griffin/core/job/BatchJobOperatorImpl.java b/service/src/main/java/org/apache/griffin/core/job/BatchJobOperatorImpl.java
index bc73cd8..5b2f816 100644
--- a/service/src/main/java/org/apache/griffin/core/job/BatchJobOperatorImpl.java
+++ b/service/src/main/java/org/apache/griffin/core/job/BatchJobOperatorImpl.java
@@ -51,7 +51,8 @@ import org.springframework.util.StringUtils;
 
 @Service
 public class BatchJobOperatorImpl implements JobOperator {
-    private static final Logger LOGGER = LoggerFactory.getLogger(BatchJobOperatorImpl.class);
+    private static final Logger LOGGER = LoggerFactory
+            .getLogger(BatchJobOperatorImpl.class);
 
     @Autowired
     private SchedulerFactoryBean factory;
@@ -64,7 +65,8 @@ public class BatchJobOperatorImpl implements JobOperator {
 
     @Override
     @Transactional(rollbackFor = Exception.class)
-    public AbstractJob add(AbstractJob job, GriffinMeasure measure) throws Exception {
+    public AbstractJob add(AbstractJob job, GriffinMeasure measure)
+            throws Exception {
         validateParams(job, measure);
         String qName = jobService.getQuartzName(job);
         String qGroup = jobService.getQuartzGroup();
@@ -75,7 +77,9 @@ public class BatchJobOperatorImpl implements JobOperator {
         return job;
     }
 
-    private BatchJob genBatchJobBean(AbstractJob job, String qName, String qGroup) {
+    private BatchJob genBatchJobBean(AbstractJob job,
+                                     String qName,
+                                     String qGroup) {
         BatchJob batchJob = (BatchJob) job;
         batchJob.setMetricName(job.getJobName());
         batchJob.setGroup(qGroup);
@@ -96,17 +100,21 @@ public class BatchJobOperatorImpl implements JobOperator {
         String group = job.getGroup();
         TriggerState state = getTriggerState(name, group);
         if (state == null) {
-            throw new GriffinException.BadRequestException(JOB_IS_NOT_SCHEDULED);
+            throw new GriffinException.BadRequestException(
+                    JOB_IS_NOT_SCHEDULED);
         }
-        /* If job is not in paused state,we can't start it as it may be RUNNING. */
+        /* If job is not in paused state,we can't start it
+        as it may be RUNNING.*/
         if (state != PAUSED) {
-            throw new GriffinException.BadRequestException(JOB_IS_NOT_IN_PAUSED_STATUS);
+            throw new GriffinException.BadRequestException
+                    (JOB_IS_NOT_IN_PAUSED_STATUS);
         }
         JobKey jobKey = jobKey(name, group);
         try {
             factory.getScheduler().resumeJob(jobKey);
         } catch (SchedulerException e) {
-            throw new GriffinException.ServiceException("Failed to start job.", e);
+            throw new GriffinException.ServiceException(
+                    "Failed to start job.", e);
         }
     }
 
@@ -123,19 +131,23 @@ public class BatchJobOperatorImpl implements JobOperator {
 
 
     @Override
-    public JobHealth getHealth(JobHealth jobHealth, AbstractJob job) throws SchedulerException {
-        List<? extends Trigger> triggers = jobService.getTriggers(job.getName(), job.getGroup());
+    public JobHealth getHealth(JobHealth jobHealth, AbstractJob job)
+            throws SchedulerException {
+        List<? extends Trigger> triggers = jobService
+                .getTriggers(job.getName(), job.getGroup());
         if (!CollectionUtils.isEmpty(triggers)) {
             jobHealth.setJobCount(jobHealth.getJobCount() + 1);
             if (jobService.isJobHealthy(job.getId())) {
-                jobHealth.setHealthyJobCount(jobHealth.getHealthyJobCount() + 1);
+                jobHealth.setHealthyJobCount(
+                        jobHealth.getHealthyJobCount() + 1);
             }
         }
         return jobHealth;
     }
 
     @Override
-    public JobState getState(AbstractJob job, String action) throws SchedulerException {
+    public JobState getState(AbstractJob job, String action)
+            throws SchedulerException {
         JobState jobState = new JobState();
         Scheduler scheduler = factory.getScheduler();
         if (job.getGroup() == null || job.getName() == null) {
@@ -150,24 +162,30 @@ public class BatchJobOperatorImpl implements JobOperator {
         return jobState;
     }
 
-    private void setTriggerTime(AbstractJob job, JobState jobState) throws SchedulerException {
-        List<? extends Trigger> triggers = jobService.getTriggers(job.getName(), job.getGroup());
-        // If triggers are empty, in Griffin it means job is completed whose trigger state is NONE or not scheduled.
+    private void setTriggerTime(AbstractJob job, JobState jobState)
+            throws SchedulerException {
+        List<? extends Trigger> triggers = jobService
+                .getTriggers(job.getName(), job.getGroup());
+        // If triggers are empty, in Griffin it means job is completed whose
+        // trigger state is NONE or not scheduled.
         if (CollectionUtils.isEmpty(triggers)) {
             return;
         }
         Trigger trigger = triggers.get(0);
         Date nextFireTime = trigger.getNextFireTime();
         Date previousFireTime = trigger.getPreviousFireTime();
-        jobState.setNextFireTime(nextFireTime != null ? nextFireTime.getTime() : -1);
-        jobState.setPreviousFireTime(previousFireTime != null ? previousFireTime.getTime() : -1);
+        jobState.setNextFireTime(nextFireTime != null ?
+                nextFireTime.getTime() : -1);
+        jobState.setPreviousFireTime(previousFireTime != null ?
+                previousFireTime.getTime() : -1);
     }
 
     /**
      * only PAUSED state of job can be started
      *
      * @param state job state
-     * @return true: job can be started, false: job is running which cannot be started
+     * @return true: job can be started, false: job is running which cannot be
+     * started
      */
     private boolean getStartStatus(TriggerState state) {
         return state == PAUSED;
@@ -177,7 +195,8 @@ public class BatchJobOperatorImpl implements JobOperator {
      * only NORMAL or  BLOCKED state of job can be started
      *
      * @param state job state
-     * @return true: job can be stopped, false: job is running which cannot be stopped
+     * @return true: job can be stopped, false: job is running which cannot be
+     * stopped
      */
     private boolean getStopStatus(TriggerState state) {
         return state == NORMAL || state == BLOCKED;
@@ -186,7 +205,8 @@ public class BatchJobOperatorImpl implements JobOperator {
 
     private TriggerState getTriggerState(String name, String group) {
         try {
-            List<? extends Trigger> triggers = jobService.getTriggers(name, group);
+            List<? extends Trigger> triggers = jobService.getTriggers(name,
+                    group);
             if (CollectionUtils.isEmpty(triggers)) {
                 return null;
             }
@@ -194,7 +214,8 @@ public class BatchJobOperatorImpl implements JobOperator {
             return factory.getScheduler().getTriggerState(key);
         } catch (SchedulerException e) {
             LOGGER.error("Failed to delete job", e);
-            throw new GriffinException.ServiceException("Failed to delete job", e);
+            throw new GriffinException
+                    .ServiceException("Failed to delete job", e);
         }
 
     }
@@ -202,7 +223,8 @@ public class BatchJobOperatorImpl implements JobOperator {
 
     /**
      * @param job    griffin job
-     * @param delete if job needs to be deleted,set isNeedDelete true,otherwise it just will be paused.
+     * @param delete if job needs to be deleted,set isNeedDelete true,otherwise
+     *               it just will be paused.
      */
     private void pauseJob(BatchJob job, boolean delete) {
         try {
@@ -212,7 +234,8 @@ public class BatchJobOperatorImpl implements JobOperator {
             batchJobRepo.save(job);
         } catch (Exception e) {
             LOGGER.error("Job schedule happens exception.", e);
-            throw new GriffinException.ServiceException("Job schedule happens exception.", e);
+            throw new GriffinException.ServiceException("Job schedule " +
+                    "happens exception.", e);
         }
     }
 
@@ -220,7 +243,8 @@ public class BatchJobOperatorImpl implements JobOperator {
         List<JobInstanceBean> instances = instanceRepo.findByJobId(job.getId());
         for (JobInstanceBean instance : instances) {
             if (!instance.isPredicateDeleted()) {
-                deleteJob(instance.getPredicateGroup(), instance.getPredicateName());
+                deleteJob(instance.getPredicateGroup(), instance
+                        .getPredicateName());
                 instance.setPredicateDeleted(true);
                 if (instance.getState().equals(LivySessionStates.State.FINDING)) {
                     instance.setState(LivySessionStates.State.NOT_FOUND);
@@ -233,7 +257,8 @@ public class BatchJobOperatorImpl implements JobOperator {
         Scheduler scheduler = factory.getScheduler();
         JobKey jobKey = new JobKey(name, group);
         if (!scheduler.checkExists(jobKey)) {
-            LOGGER.info("Job({},{}) does not exist.", jobKey.getGroup(), jobKey.getName());
+            LOGGER.info("Job({},{}) does not exist.", jobKey.getGroup(), jobKey
+                    .getName());
             return;
         }
         scheduler.deleteJob(jobKey);
@@ -247,8 +272,10 @@ public class BatchJobOperatorImpl implements JobOperator {
         Scheduler scheduler = factory.getScheduler();
         JobKey jobKey = new JobKey(name, group);
         if (!scheduler.checkExists(jobKey)) {
-            LOGGER.warn("Job({},{}) does not exist.", jobKey.getGroup(), jobKey.getName());
-            throw new GriffinException.NotFoundException(JOB_KEY_DOES_NOT_EXIST);
+            LOGGER.warn("Job({},{}) does not exist.", jobKey.getGroup(), jobKey
+                    .getName());
+            throw new GriffinException.NotFoundException
+                    (JOB_KEY_DOES_NOT_EXIST);
         }
         scheduler.pauseJob(jobKey);
     }
@@ -267,7 +294,8 @@ public class BatchJobOperatorImpl implements JobOperator {
         return pauseStatus;
     }
 
-    private boolean pauseJobInstance(JobInstanceBean instance, List<JobInstanceBean> deletedInstances) {
+    private boolean pauseJobInstance(JobInstanceBean instance,
+                                     List<JobInstanceBean> deletedInstances) {
         boolean status = true;
         String pGroup = instance.getPredicateGroup();
         String pName = instance.getPredicateName();
@@ -278,7 +306,8 @@ public class BatchJobOperatorImpl implements JobOperator {
                 deletedInstances.add(instance);
             }
         } catch (SchedulerException e) {
-            LOGGER.error("Failed to pause predicate job({},{}).", pGroup, pName);
+            LOGGER.error("Failed to pause predicate job({},{}).", pGroup,
+                    pName);
             status = false;
         }
         return status;
@@ -289,14 +318,17 @@ public class BatchJobOperatorImpl implements JobOperator {
             throw new GriffinException.BadRequestException(INVALID_JOB_NAME);
         }
         if (!isValidCronExpression(job.getCronExpression())) {
-            throw new GriffinException.BadRequestException(INVALID_CRON_EXPRESSION);
+            throw new GriffinException.BadRequestException
+                    (INVALID_CRON_EXPRESSION);
         }
         if (!isValidBaseLine(job.getSegments())) {
-            throw new GriffinException.BadRequestException(MISSING_BASELINE_CONFIG);
+            throw new GriffinException.BadRequestException
+                    (MISSING_BASELINE_CONFIG);
         }
         List<String> names = getConnectorNames(measure);
         if (!isValidConnectorNames(job.getSegments(), names)) {
-            throw new GriffinException.BadRequestException(INVALID_CONNECTOR_NAME);
+            throw new GriffinException.BadRequestException
+                    (INVALID_CONNECTOR_NAME);
         }
     }
 
@@ -319,24 +351,29 @@ public class BatchJobOperatorImpl implements JobOperator {
                 return true;
             }
         }
-        LOGGER.warn("Please set segment timestamp baseline in as.baseline field.");
+        LOGGER.warn("Please set segment timestamp baseline " +
+                "in as.baseline field.");
         return false;
     }
 
-    private boolean isValidConnectorNames(List<JobDataSegment> segments, List<String> names) {
+    private boolean isValidConnectorNames(List<JobDataSegment> segments,
+                                          List<String> names) {
         assert segments != null;
         Set<String> sets = new HashSet<>();
         for (JobDataSegment segment : segments) {
             String dcName = segment.getDataConnectorName();
             sets.add(dcName);
-            boolean exist = names.stream().anyMatch(name -> name.equals(dcName));
+            boolean exist = names.stream().anyMatch(name -> name.equals
+                    (dcName));
             if (!exist) {
-                LOGGER.warn("Param {} is a illegal string. Please input one of strings in {}.", dcName, names);
+                LOGGER.warn("Param {} is a illegal string. " +
+                        "Please input one of strings in {}.", dcName, names);
                 return false;
             }
         }
         if (sets.size() < segments.size()) {
-            LOGGER.warn("Connector names in job data segment cannot duplicate.");
+            LOGGER.warn("Connector names in job data segment " +
+                    "cannot duplicate.");
             return false;
         }
         return true;

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/3bbbcb32/service/src/main/java/org/apache/griffin/core/job/FileExistPredicator.java
----------------------------------------------------------------------
diff --git a/service/src/main/java/org/apache/griffin/core/job/FileExistPredicator.java b/service/src/main/java/org/apache/griffin/core/job/FileExistPredicator.java
index 35f981f..703a837 100644
--- a/service/src/main/java/org/apache/griffin/core/job/FileExistPredicator.java
+++ b/service/src/main/java/org/apache/griffin/core/job/FileExistPredicator.java
@@ -32,7 +32,8 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 public class FileExistPredicator implements Predicator {
-    private static final Logger LOGGER = LoggerFactory.getLogger(FileExistPredicator.class);
+    private static final Logger LOGGER = LoggerFactory
+            .getLogger(FileExistPredicator.class);
 
     private static final String PREDICT_PATH = "path";
     private static final String PREDICT_ROOT_PATH = "root.path";
@@ -48,12 +49,15 @@ public class FileExistPredicator implements Predicator {
         Map<String, Object> config = predicate.getConfigMap();
         String[] paths = null;
         String rootPath = null;
-        if (config != null && !StringUtils.isEmpty((String) config.get(PREDICT_PATH))) {
-            paths = ((String) config.get(PREDICT_PATH)).split(PATH_CONNECTOR_CHARACTER);
+        if (config != null && !StringUtils.isEmpty((String) config.get
+                (PREDICT_PATH))) {
+            paths = ((String) config.get(PREDICT_PATH)).split
+                    (PATH_CONNECTOR_CHARACTER);
             rootPath = (String) config.get(PREDICT_ROOT_PATH);
         }
         if (ArrayUtils.isEmpty(paths) || StringUtils.isEmpty(rootPath)) {
-            LOGGER.error("Predicate path is null.Please check predicates config root.path and path.");
+            LOGGER.error("Predicate path is null.Please check predicates " +
+                    "config root.path and path.");
             throw new NullPointerException();
         }
         for (String path : paths) {

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/3bbbcb32/service/src/main/java/org/apache/griffin/core/job/JobController.java
----------------------------------------------------------------------
diff --git a/service/src/main/java/org/apache/griffin/core/job/JobController.java b/service/src/main/java/org/apache/griffin/core/job/JobController.java
index 64b8e42..f4ee791 100644
--- a/service/src/main/java/org/apache/griffin/core/job/JobController.java
+++ b/service/src/main/java/org/apache/griffin/core/job/JobController.java
@@ -48,7 +48,8 @@ public class JobController {
     private JobService jobService;
 
     @RequestMapping(value = "/jobs", method = RequestMethod.GET)
-    public List<AbstractJob> getJobs(@RequestParam(value = "type", defaultValue = "") String type) {
+    public List<AbstractJob> getJobs(@RequestParam(value = "type",
+            defaultValue = "") String type) {
         return jobService.getAliveJobs(type);
     }
 
@@ -65,24 +66,31 @@ public class JobController {
 
     @RequestMapping(value = "/jobs/{id}", method = RequestMethod.PUT)
     @ResponseStatus(HttpStatus.OK)
-    public AbstractJob onActions(@PathVariable("id") Long jobId, @RequestParam String action) throws Exception {
+    public AbstractJob onActions(
+            @PathVariable("id") Long jobId,
+            @RequestParam String action) throws Exception {
         return jobService.onAction(jobId, action);
     }
 
     @RequestMapping(value = "/jobs", method = RequestMethod.DELETE)
     @ResponseStatus(HttpStatus.NO_CONTENT)
-    public void deleteJob(@RequestParam("jobName") String jobName) throws SchedulerException {
+    public void deleteJob(@RequestParam("jobName") String jobName)
+            throws SchedulerException {
         jobService.deleteJob(jobName);
     }
 
     @RequestMapping(value = "/jobs/{id}", method = RequestMethod.DELETE)
     @ResponseStatus(HttpStatus.NO_CONTENT)
-    public void deleteJob(@PathVariable("id") Long id) throws SchedulerException {
+    public void deleteJob(@PathVariable("id") Long id)
+            throws SchedulerException {
         jobService.deleteJob(id);
     }
 
     @RequestMapping(value = "/jobs/instances", method = RequestMethod.GET)
-    public List<JobInstanceBean> findInstancesOfJob(@RequestParam("jobId") Long id, @RequestParam("page") int page, @RequestParam("size") int size) {
+    public List<JobInstanceBean> findInstancesOfJob(
+            @RequestParam("jobId") Long id,
+            @RequestParam("page") int page,
+            @RequestParam("size") int size) {
         return jobService.findInstancesOfJob(id, page, size);
     }
 
@@ -92,11 +100,16 @@ public class JobController {
     }
 
     @RequestMapping(path = "/jobs/download", method = RequestMethod.GET)
-    public ResponseEntity<Resource> download(@RequestParam("jobName") String jobName, @RequestParam("ts") long timestamp) throws Exception {
+    public ResponseEntity<Resource> download(
+        @RequestParam("jobName") String jobName,
+        @RequestParam("ts") long timestamp)
+        throws Exception {
         String path = jobService.getJobHdfsSinksPath(jobName, timestamp);
-        InputStreamResource resource = new InputStreamResource(FSUtil.getMissSampleInputStream(path));
+        InputStreamResource resource = new InputStreamResource(
+            FSUtil.getMissSampleInputStream(path));
         return ResponseEntity.ok().
-                header("content-disposition", "attachment; filename = sampleMissingData.json")
+                header("content-disposition",
+                        "attachment; filename = sampleMissingData.json")
                 .contentType(MediaType.APPLICATION_OCTET_STREAM)
                 .body(resource);
     }

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/3bbbcb32/service/src/main/java/org/apache/griffin/core/job/JobInstance.java
----------------------------------------------------------------------
diff --git a/service/src/main/java/org/apache/griffin/core/job/JobInstance.java b/service/src/main/java/org/apache/griffin/core/job/JobInstance.java
index ab67c81..afb25d8 100644
--- a/service/src/main/java/org/apache/griffin/core/job/JobInstance.java
+++ b/service/src/main/java/org/apache/griffin/core/job/JobInstance.java
@@ -78,7 +78,8 @@ import org.springframework.transaction.annotation.Transactional;
 @PersistJobDataAfterExecution
 @DisallowConcurrentExecution
 public class JobInstance implements Job {
-    private static final Logger LOGGER = LoggerFactory.getLogger(JobInstance.class);
+    private static final Logger LOGGER = LoggerFactory
+            .getLogger(JobInstance.class);
     public static final String MEASURE_KEY = "measure";
     public static final String PREDICATES_KEY = "predicts";
     public static final String PREDICATE_JOB_NAME = "predicateJobName";
@@ -86,7 +87,8 @@ public class JobInstance implements Job {
     static final String PATH_CONNECTOR_CHARACTER = ",";
     public static final String INTERVAL = "interval";
     public static final String REPEAT = "repeat";
-    public static final String CHECK_DONEFILE_SCHEDULE = "checkdonefile.schedule";
+    public static final String CHECK_DONEFILE_SCHEDULE =
+            "checkdonefile.schedule";
 
     @Autowired
     private SchedulerFactoryBean factory;
@@ -104,7 +106,6 @@ public class JobInstance implements Job {
     private List<SegmentPredicate> mPredicates;
     private Long jobStartTime;
 
-
     @Override
     @Transactional
     public void execute(JobExecutionContext context) {
@@ -117,7 +118,8 @@ public class JobInstance implements Job {
         }
     }
 
-    private void initParam(JobExecutionContext context) throws SchedulerException {
+    private void initParam(JobExecutionContext context)
+            throws SchedulerException {
         mPredicates = new ArrayList<>();
         JobDetail jobDetail = context.getJobDetail();
         Long jobId = jobDetail.getJobDataMap().getLong(GRIFFIN_JOB_ID);
@@ -128,20 +130,24 @@ public class JobInstance implements Job {
     }
 
     @SuppressWarnings("unchecked")
-    private void setJobStartTime(JobDetail jobDetail) throws SchedulerException {
+    private void setJobStartTime(JobDetail jobDetail)
+            throws SchedulerException {
         Scheduler scheduler = factory.getScheduler();
         JobKey jobKey = jobDetail.getKey();
-        List<Trigger> triggers = (List<Trigger>) scheduler.getTriggersOfJob(jobKey);
+        List<Trigger> triggers =
+                (List<Trigger>) scheduler.getTriggersOfJob(jobKey);
         Date triggerTime = triggers.get(0).getPreviousFireTime();
         jobStartTime = triggerTime.getTime();
     }
 
 
-    private void setSourcesPartitionsAndPredicates(List<DataSource> sources) throws Exception {
+    private void setSourcesPartitionsAndPredicates(List<DataSource> sources)
+            throws Exception {
         boolean isFirstBaseline = true;
         for (JobDataSegment jds : job.getSegments()) {
             if (jds.isAsTsBaseline() && isFirstBaseline) {
-                Long tsOffset = TimeUtil.str2Long(jds.getSegmentRange().getBegin());
+                Long tsOffset = TimeUtil.str2Long(
+                    jds.getSegmentRange().getBegin());
                 measure.setTimestamp(jobStartTime + tsOffset);
                 isFirstBaseline = false;
             }
@@ -151,7 +157,8 @@ public class JobInstance implements Job {
         }
     }
 
-    private void setDataSourcePartitions(JobDataSegment jds, DataSource ds) throws Exception {
+    private void setDataSourcePartitions(JobDataSegment jds, DataSource ds)
+            throws Exception {
         List<DataConnector> connectors = ds.getConnectors();
         for (DataConnector dc : connectors) {
             setDataConnectorPartitions(jds, dc);
@@ -159,7 +166,9 @@ public class JobInstance implements Job {
     }
 
 
-    private void setDataConnectorPartitions(JobDataSegment jds, DataConnector dc) throws Exception {
+    private void setDataConnectorPartitions(
+            JobDataSegment jds,
+            DataConnector dc) throws Exception {
         String dcName = jds.getDataConnectorName();
         if (dcName.equals(dc.getName())) {
             Long[] sampleTs = genSampleTs(jds.getSegmentRange(), dc);
@@ -179,7 +188,8 @@ public class JobInstance implements Job {
         Long offset = TimeUtil.str2Long(segRange.getBegin());
         Long range = TimeUtil.str2Long(segRange.getLength());
         String unit = dc.getDataUnit();
-        Long dataUnit = TimeUtil.str2Long(StringUtils.isEmpty(unit) ? dc.getDefaultDataUnit() : unit);
+        Long dataUnit = TimeUtil.str2Long(StringUtils.isEmpty(unit) ? dc
+                .getDefaultDataUnit() : unit);
         //offset usually is negative
         Long dataStartTime = jobStartTime + offset;
         if (range < 0) {
@@ -206,7 +216,9 @@ public class JobInstance implements Job {
     private void setConnectorPredicates(DataConnector dc, Long[] sampleTs) {
         List<SegmentPredicate> predicates = dc.getPredicates();
         for (SegmentPredicate predicate : predicates) {
-            genConfMap(predicate.getConfigMap(), sampleTs, dc.getDataTimeZone());
+            genConfMap(predicate.getConfigMap(),
+                    sampleTs,
+                    dc.getDataTimeZone());
             //Do not forget to update origin string config
             predicate.setConfigMap(predicate.getConfigMap());
             mPredicates.add(predicate);
@@ -222,11 +234,15 @@ public class JobInstance implements Job {
     /**
      * @param conf     config map
      * @param sampleTs collection of data split start timestamp
-     * @return all config data combine,like {"where": "year=2017 AND month=11 AND dt=15 AND hour=09,year=2017 AND month=11 AND dt=15 AND hour=10"}
-     * or like {"path": "/year=2017/month=11/dt=15/hour=09/_DONE,/year=2017/month=11/dt=15/hour=10/_DONE"}
+     * @return all config data combine,like {"where": "year=2017 AND month=11
+     * AND dt=15 AND hour=09,year=2017 AND month=11 AND
+     * dt=15 AND hour=10"}
+     * or like {"path": "/year=2017/month=11/dt=15/hour=09/_DONE
+     * ,/year=2017/month=11/dt=15/hour=10/_DONE"}
      */
 
-    private void genConfMap(Map<String, Object> conf, Long[] sampleTs, String timezone) {
+    private void genConfMap(Map<String, Object> conf, Long[] sampleTs, String
+            timezone) {
         if (conf == null) {
             LOGGER.warn("Predicate config is null.");
             return;
@@ -240,9 +256,11 @@ public class JobInstance implements Job {
                     continue;
                 }
                 for (Long timestamp : sampleTs) {
-                    set.add(TimeUtil.format(value, timestamp, getTimeZone(timezone)));
+                    set.add(TimeUtil.format(value, timestamp,
+                            getTimeZone(timezone)));
                 }
-                conf.put(entry.getKey(), StringUtils.join(set, PATH_CONNECTOR_CHARACTER));
+                conf.put(entry.getKey(), StringUtils.join(set,
+                        PATH_CONNECTOR_CHARACTER));
             }
         }
     }
@@ -255,16 +273,20 @@ public class JobInstance implements Job {
     }
 
     @SuppressWarnings("unchecked")
-    private void createJobInstance(Map<String, Object> confMap) throws Exception {
+    private void createJobInstance(Map<String, Object> confMap)
+            throws Exception {
         confMap = checkConfMap(confMap != null ? confMap : new HashMap<>());
-        Map<String, Object> config = (Map<String, Object>) confMap.get(CHECK_DONEFILE_SCHEDULE);
+        Map<String, Object> config = (Map<String, Object>) confMap
+                .get(CHECK_DONEFILE_SCHEDULE);
         Long interval = TimeUtil.str2Long((String) config.get(INTERVAL));
         Integer repeat = Integer.valueOf(config.get(REPEAT).toString());
         String groupName = "PG";
-        String jobName = job.getJobName() + "_predicate_" + System.currentTimeMillis();
+        String jobName = job.getJobName() + "_predicate_" + System
+                .currentTimeMillis();
         TriggerKey tk = triggerKey(jobName, groupName);
         if (factory.getScheduler().checkExists(tk)) {
-            throw new GriffinException.ConflictException(QUARTZ_JOB_ALREADY_EXIST);
+            throw new GriffinException.ConflictException
+                    (QUARTZ_JOB_ALREADY_EXIST);
         }
         saveJobInstance(jobName, groupName);
         createJobInstance(tk, interval, repeat, jobName);
@@ -272,7 +294,8 @@ public class JobInstance implements Job {
 
     @SuppressWarnings("unchecked")
     Map<String, Object> checkConfMap(Map<String, Object> confMap) {
-        Map<String, Object> config = (Map<String, Object>) confMap.get(CHECK_DONEFILE_SCHEDULE);
+        Map<String, Object> config = (Map<String, Object>) confMap.get
+                (CHECK_DONEFILE_SCHEDULE);
         String interval = env.getProperty("predicate.job.interval");
         interval = interval != null ? interval : "5m";
         String repeat = env.getProperty("predicate.job.repeat.count");
@@ -294,30 +317,38 @@ public class JobInstance implements Job {
     }
 
     private void saveJobInstance(String pName, String pGroup) {
-        ProcessType type = measure.getProcessType() == BATCH ? BATCH : STREAMING;
+        ProcessType type = measure.getProcessType() == BATCH ? BATCH :
+                STREAMING;
         Long tms = System.currentTimeMillis();
         String expired = env.getProperty("jobInstance.expired.milliseconds");
-        Long expireTms = Long.valueOf(expired != null ? expired : "604800000") + tms;
-        JobInstanceBean instance = new JobInstanceBean(FINDING, pName, pGroup, tms, expireTms, type);
+        Long expireTms = Long.valueOf(expired != null ? expired : "604800000")
+                + tms;
+        JobInstanceBean instance = new JobInstanceBean(FINDING, pName, pGroup,
+                tms, expireTms, type);
         instance.setJob(job);
         instanceRepo.save(instance);
     }
 
 
-    private void createJobInstance(TriggerKey tk, Long interval, Integer repeatCount, String pJobName) throws Exception {
+    private void createJobInstance(TriggerKey tk, Long interval, Integer
+            repeatCount, String pJobName) throws Exception {
         JobDetail jobDetail = addJobDetail(tk, pJobName);
-        Trigger trigger = genTriggerInstance(tk, jobDetail, interval, repeatCount);
+        Trigger trigger = genTriggerInstance(tk, jobDetail, interval,
+                repeatCount);
         factory.getScheduler().scheduleJob(trigger);
     }
 
 
-    private Trigger genTriggerInstance(TriggerKey tk, JobDetail jd, Long interval, Integer repeatCount) {
+    private Trigger genTriggerInstance(TriggerKey tk, JobDetail jd, Long
+            interval, Integer repeatCount) {
         return newTrigger().withIdentity(tk).forJob(jd).startNow()
-                .withSchedule(simpleSchedule().withIntervalInMilliseconds(interval).withRepeatCount(repeatCount))
+                .withSchedule(simpleSchedule().withIntervalInMilliseconds
+                        (interval).withRepeatCount(repeatCount))
                 .build();
     }
 
-    private JobDetail addJobDetail(TriggerKey tk, String pJobName) throws SchedulerException, IOException {
+    private JobDetail addJobDetail(TriggerKey tk, String pJobName)
+            throws SchedulerException, IOException {
         Scheduler scheduler = factory.getScheduler();
         JobKey jobKey = jobKey(tk.getName(), tk.getGroup());
         JobDetail jobDetail;
@@ -335,7 +366,8 @@ public class JobInstance implements Job {
         return jobDetail;
     }
 
-    private void setJobDataMap(JobDetail jobDetail, String pJobName) throws IOException {
+    private void setJobDataMap(JobDetail jobDetail, String pJobName)
+            throws IOException {
         JobDataMap dataMap = jobDetail.getJobDataMap();
         preProcessMeasure();
         String result = toJson(measure);

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/3bbbcb32/service/src/main/java/org/apache/griffin/core/job/JobOperator.java
----------------------------------------------------------------------
diff --git a/service/src/main/java/org/apache/griffin/core/job/JobOperator.java b/service/src/main/java/org/apache/griffin/core/job/JobOperator.java
index 7fdbe7d..81c3b17 100644
--- a/service/src/main/java/org/apache/griffin/core/job/JobOperator.java
+++ b/service/src/main/java/org/apache/griffin/core/job/JobOperator.java
@@ -26,7 +26,8 @@ import org.apache.griffin.core.measure.entity.GriffinMeasure;
 import org.quartz.SchedulerException;
 
 public interface JobOperator {
-    AbstractJob add(AbstractJob job, GriffinMeasure measure) throws Exception;
+    AbstractJob add(AbstractJob job, GriffinMeasure measure)
+            throws Exception;
 
     void start(AbstractJob job) throws Exception;
 
@@ -34,7 +35,9 @@ public interface JobOperator {
 
     void delete(AbstractJob job) throws SchedulerException;
 
-    JobHealth getHealth(JobHealth jobHealth, AbstractJob job) throws SchedulerException;
+    JobHealth getHealth(JobHealth jobHealth, AbstractJob job)
+            throws SchedulerException;
 
-    JobState getState(AbstractJob job, String action) throws SchedulerException;
+    JobState getState(AbstractJob job, String action)
+            throws SchedulerException;
 }

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/3bbbcb32/service/src/main/java/org/apache/griffin/core/job/JobServiceImpl.java
----------------------------------------------------------------------
diff --git a/service/src/main/java/org/apache/griffin/core/job/JobServiceImpl.java b/service/src/main/java/org/apache/griffin/core/job/JobServiceImpl.java
index 5617065..ac14461 100644
--- a/service/src/main/java/org/apache/griffin/core/job/JobServiceImpl.java
+++ b/service/src/main/java/org/apache/griffin/core/job/JobServiceImpl.java
@@ -101,7 +101,8 @@ import org.springframework.web.client.RestTemplate;
 
 @Service
 public class JobServiceImpl implements JobService {
-    private static final Logger LOGGER = LoggerFactory.getLogger(JobServiceImpl.class);
+    private static final Logger LOGGER = LoggerFactory
+            .getLogger(JobServiceImpl.class);
     public static final String GRIFFIN_JOB_ID = "griffinJobId";
     private static final int MAX_PAGE_SIZE = 1024;
     private static final int DEFAULT_PAGE_SIZE = 10;
@@ -156,7 +157,8 @@ public class JobServiceImpl implements JobService {
             }
         } catch (SchedulerException e) {
             LOGGER.error("Failed to get RUNNING jobs.", e);
-            throw new GriffinException.ServiceException("Failed to get RUNNING jobs.", e);
+            throw new GriffinException
+                    .ServiceException("Failed to get RUNNING jobs.", e);
         }
         return dataList;
     }
@@ -174,7 +176,8 @@ public class JobServiceImpl implements JobService {
         AbstractJob job = jobRepo.findByIdAndDeleted(jobId, false);
         if (job == null) {
             LOGGER.warn("Job id {} does not exist.", jobId);
-            throw new GriffinException.NotFoundException(JOB_ID_DOES_NOT_EXIST);
+            throw new GriffinException
+                    .NotFoundException(JOB_ID_DOES_NOT_EXIST);
         }
         return job;
     }
@@ -194,7 +197,8 @@ public class JobServiceImpl implements JobService {
         return job;
     }
 
-    private void doAction(String action, AbstractJob job, JobOperator op) throws Exception {
+    private void doAction(String action, AbstractJob job, JobOperator op)
+            throws Exception {
         switch (action) {
             case START:
                 op.start(job);
@@ -203,7 +207,8 @@ public class JobServiceImpl implements JobService {
                 op.stop(job);
                 break;
             default:
-                throw new GriffinException.NotFoundException(NO_SUCH_JOB_ACTION);
+                throw new GriffinException
+                        .NotFoundException(NO_SUCH_JOB_ACTION);
         }
     }
 
@@ -233,7 +238,8 @@ public class JobServiceImpl implements JobService {
         List<AbstractJob> jobs = jobRepo.findByJobNameAndDeleted(name, false);
         if (CollectionUtils.isEmpty(jobs)) {
             LOGGER.warn("There is no job with '{}' name.", name);
-            throw new GriffinException.NotFoundException(JOB_NAME_DOES_NOT_EXIST);
+            throw new GriffinException
+                    .NotFoundException(JOB_NAME_DOES_NOT_EXIST);
         }
         for (AbstractJob job : jobs) {
             JobOperator op = getJobOperator(job);
@@ -242,16 +248,22 @@ public class JobServiceImpl implements JobService {
     }
 
     @Override
-    public List<JobInstanceBean> findInstancesOfJob(Long jobId, int page, int size) {
+    public List<JobInstanceBean> findInstancesOfJob(
+            Long jobId,
+            int page,
+            int size) {
         AbstractJob job = jobRepo.findByIdAndDeleted(jobId, false);
         if (job == null) {
             LOGGER.warn("Job id {} does not exist.", jobId);
-            throw new GriffinException.NotFoundException(JOB_ID_DOES_NOT_EXIST);
+            throw new GriffinException
+                    .NotFoundException(JOB_ID_DOES_NOT_EXIST);
         }
         size = size > MAX_PAGE_SIZE ? MAX_PAGE_SIZE : size;
         size = size <= 0 ? DEFAULT_PAGE_SIZE : size;
-        Pageable pageable = new PageRequest(page, size, Sort.Direction.DESC, "tms");
-        List<JobInstanceBean> instances = instanceRepo.findByJobId(jobId, pageable);
+        Pageable pageable = new PageRequest(page, size,
+                Sort.Direction.DESC, "tms");
+        List<JobInstanceBean> instances = instanceRepo.findByJobId(jobId,
+                pageable);
         return updateState(instances);
     }
 
@@ -266,7 +278,8 @@ public class JobServiceImpl implements JobService {
     }
 
     /**
-     * a job is regard as healthy job when its latest instance is in healthy state.
+     * a job is regard as healthy job when its latest instance is in healthy
+     * state.
      *
      * @return job healthy statistics
      */
@@ -280,7 +293,8 @@ public class JobServiceImpl implements JobService {
                 jobHealth = op.getHealth(jobHealth, job);
             } catch (SchedulerException e) {
                 LOGGER.error("Job schedule exception. {}", e);
-                throw new GriffinException.ServiceException("Fail to Get HealthInfo", e);
+                throw new GriffinException
+                        .ServiceException("Fail to Get HealthInfo", e);
             }
 
         }
@@ -290,7 +304,9 @@ public class JobServiceImpl implements JobService {
     @Scheduled(fixedDelayString = "${jobInstance.expired.milliseconds}")
     public void deleteExpiredJobInstance() {
         Long timeMills = System.currentTimeMillis();
-        List<JobInstanceBean> instances = instanceRepo.findByExpireTmsLessThanEqual(timeMills);
+        List<JobInstanceBean> instances = instanceRepo
+                .findByExpireTmsLessThanEqual
+                        (timeMills);
         if (!batchJobOp.pauseJobInstances(instances)) {
             LOGGER.error("Pause job failure.");
             return;
@@ -312,7 +328,8 @@ public class JobServiceImpl implements JobService {
         } else if (job instanceof StreamingJob) {
             return streamingJobOp;
         }
-        throw new GriffinException.BadRequestException(JOB_TYPE_DOES_NOT_SUPPORT);
+        throw new GriffinException.BadRequestException
+                (JOB_TYPE_DOES_NOT_SUPPORT);
     }
 
     private JobOperator getJobOperator(ProcessType type) {
@@ -321,18 +338,22 @@ public class JobServiceImpl implements JobService {
         } else if (type == STREAMING) {
             return streamingJobOp;
         }
-        throw new GriffinException.BadRequestException(MEASURE_TYPE_DOES_NOT_SUPPORT);
+        throw new GriffinException.BadRequestException
+                (MEASURE_TYPE_DOES_NOT_SUPPORT);
     }
 
-    TriggerKey getTriggerKeyIfValid(String qName, String qGroup) throws SchedulerException {
+    TriggerKey getTriggerKeyIfValid(String qName, String qGroup) throws
+            SchedulerException {
         TriggerKey triggerKey = triggerKey(qName, qGroup);
         if (factory.getScheduler().checkExists(triggerKey)) {
-            throw new GriffinException.ConflictException(QUARTZ_JOB_ALREADY_EXIST);
+            throw new GriffinException.ConflictException
+                    (QUARTZ_JOB_ALREADY_EXIST);
         }
         return triggerKey;
     }
 
-    List<? extends Trigger> getTriggers(String name, String group) throws SchedulerException {
+    List<? extends Trigger> getTriggers(String name, String group) throws
+            SchedulerException {
         if (name == null || group == null) {
             return null;
         }
@@ -341,7 +362,8 @@ public class JobServiceImpl implements JobService {
         return scheduler.getTriggersOfJob(jobKey);
     }
 
-    private JobState genJobState(AbstractJob job, String action) throws SchedulerException {
+    private JobState genJobState(AbstractJob job, String action) throws
+            SchedulerException {
         JobOperator op = getJobOperator(job);
         JobState state = op.getState(job, action);
         job.setJobState(state);
@@ -352,7 +374,8 @@ public class JobServiceImpl implements JobService {
         return genJobState(job, null);
     }
 
-    void addJob(TriggerKey tk, AbstractJob job, ProcessType type) throws Exception {
+    void addJob(TriggerKey tk, AbstractJob job, ProcessType type) throws
+            Exception {
         JobDetail jobDetail = addJobDetail(tk, job);
         Trigger trigger = genTriggerInstance(tk, jobDetail, job, type);
         factory.getScheduler().scheduleJob(trigger);
@@ -381,27 +404,35 @@ public class JobServiceImpl implements JobService {
 
 
     private GriffinMeasure getMeasureIfValid(Long measureId) {
-        GriffinMeasure measure = measureRepo.findByIdAndDeleted(measureId, false);
+        GriffinMeasure measure = measureRepo.findByIdAndDeleted(measureId,
+                false);
         if (measure == null) {
-            LOGGER.warn("The measure id {} isn't valid. Maybe it doesn't exist or is external measure type.", measureId);
+            LOGGER.warn("The measure id {} isn't valid. Maybe it doesn't " +
+                            "exist or is external measure type.",
+                    measureId);
             throw new GriffinException.BadRequestException(INVALID_MEASURE_ID);
         }
         return measure;
     }
 
-    private Trigger genTriggerInstance(TriggerKey tk, JobDetail jd, AbstractJob job, ProcessType type) {
+    private Trigger genTriggerInstance(TriggerKey tk, JobDetail jd, AbstractJob
+            job, ProcessType type) {
         TriggerBuilder builder = newTrigger().withIdentity(tk).forJob(jd);
         if (type == BATCH) {
             TimeZone timeZone = getTimeZone(job.getTimeZone());
-            return builder.withSchedule(cronSchedule(job.getCronExpression()).inTimeZone(timeZone)).build();
+            return builder.withSchedule(cronSchedule(job.getCronExpression())
+                    .inTimeZone(timeZone)).build();
         } else if (type == STREAMING) {
-            return builder.startNow().withSchedule(simpleSchedule().withRepeatCount(0)).build();
+            return builder.startNow().withSchedule(simpleSchedule()
+                    .withRepeatCount(0)).build();
         }
-        throw new GriffinException.BadRequestException(JOB_TYPE_DOES_NOT_SUPPORT);
+        throw new GriffinException.BadRequestException
+                (JOB_TYPE_DOES_NOT_SUPPORT);
 
     }
 
-    private JobDetail addJobDetail(TriggerKey triggerKey, AbstractJob job) throws SchedulerException {
+    private JobDetail addJobDetail(TriggerKey triggerKey, AbstractJob job)
+            throws SchedulerException {
         Scheduler scheduler = factory.getScheduler();
         JobKey jobKey = jobKey(triggerKey.getName(), triggerKey.getGroup());
         JobDetail jobDetail;
@@ -409,7 +440,8 @@ public class JobServiceImpl implements JobService {
         if (isJobKeyExist) {
             jobDetail = scheduler.getJobDetail(jobKey);
         } else {
-            jobDetail = newJob(JobInstance.class).storeDurably().withIdentity(jobKey).build();
+            jobDetail = newJob(JobInstance.class).storeDurably().withIdentity
+                    (jobKey).build();
         }
         setJobDataMap(jobDetail, job);
         scheduler.addJob(jobDetail, isJobKeyExist);
@@ -429,8 +461,10 @@ public class JobServiceImpl implements JobService {
      *
      * @param measureId measure id
      */
-    public void deleteJobsRelateToMeasure(Long measureId) throws SchedulerException {
-        List<AbstractJob> jobs = jobRepo.findByMeasureIdAndDeleted(measureId, false);
+    public void deleteJobsRelateToMeasure(Long measureId) throws
+            SchedulerException {
+        List<AbstractJob> jobs = jobRepo.findByMeasureIdAndDeleted(measureId,
+                false);
         if (CollectionUtils.isEmpty(jobs)) {
             LOGGER.info("Measure id {} has no related jobs.", measureId);
             return;
@@ -443,7 +477,8 @@ public class JobServiceImpl implements JobService {
 
     @Scheduled(fixedDelayString = "${jobInstance.fixedDelay.in.milliseconds}")
     public void syncInstancesOfAllJobs() {
-        LivySessionStates.State[] states = {STARTING, NOT_STARTED, RECOVERING, IDLE, RUNNING, BUSY};
+        LivySessionStates.State[] states = {STARTING, NOT_STARTED, RECOVERING,
+                IDLE, RUNNING, BUSY};
         List<JobInstanceBean> beans = instanceRepo.findByActiveState(states);
         for (JobInstanceBean jobInstance : beans) {
             syncInstancesOfJob(jobInstance);
@@ -451,7 +486,8 @@ public class JobServiceImpl implements JobService {
     }
 
     /**
-     * call livy to update part of job instance table data associated with group and jobName in mysql.
+     * call livy to update part of job instance table data associated with group
+     * and jobName in mysql.
      *
      * @param instance job instance livy info
      */
@@ -459,17 +495,22 @@ public class JobServiceImpl implements JobService {
         if (instance.getSessionId() == null) {
             return;
         }
-        String uri = env.getProperty("livy.uri") + "/" + instance.getSessionId();
-        TypeReference<HashMap<String, Object>> type = new TypeReference<HashMap<String, Object>>() {
-        };
+        String uri = env.getProperty("livy.uri") + "/"
+                + instance.getSessionId();
+        TypeReference<HashMap<String, Object>> type =
+                new TypeReference<HashMap<String, Object>>() {
+                };
         try {
             String resultStr = restTemplate.getForObject(uri, String.class);
-            HashMap<String, Object> resultMap = JsonUtil.toEntity(resultStr, type);
+            HashMap<String, Object> resultMap = JsonUtil.toEntity(resultStr,
+                    type);
             setJobInstanceIdAndUri(instance, resultMap);
         } catch (ResourceAccessException e) {
-            LOGGER.error("Your url may be wrong. Please check {}.\n {}", uri, e.getMessage());
+            LOGGER.error("Your url may be wrong. Please check {}.\n {}", uri, e
+                    .getMessage());
         } catch (HttpClientErrorException e) {
-            LOGGER.warn("sessionId({}) appId({}) {}.", instance.getSessionId(), instance.getAppId(), e.getMessage());
+            LOGGER.warn("sessionId({}) appId({}) {}.", instance.getSessionId(),
+                    instance.getAppId(), e.getMessage());
             setStateByYarn(instance, e);
         } catch (Exception e) {
             LOGGER.error(e.getMessage());
@@ -477,11 +518,14 @@ public class JobServiceImpl implements JobService {
 
     }
 
-    private void setStateByYarn(JobInstanceBean instance, HttpClientErrorException e) {
+    private void setStateByYarn(JobInstanceBean instance,
+                                HttpClientErrorException e) {
         if (!checkStatus(instance, e)) {
             int code = e.getStatusCode().value();
-            boolean match = (code == 400 || code == 404) && instance.getAppId() != null;
-            //this means your url is correct,but your param is wrong or livy session may be overdue.
+            boolean match = (code == 400 || code == 404)
+                    && instance.getAppId() != null;
+            //this means your url is correct,but your param is wrong or livy
+            //session may be overdue.
             if (match) {
                 setStateByYarn(instance);
             }
@@ -490,21 +534,26 @@ public class JobServiceImpl implements JobService {
     }
 
     /**
-     * Check instance status in case that session id is overdue and app id is null and so we cannot update instance state.
+     * Check instance status in case that session id is overdue and app id is
+     * null and so we cannot update instance state
+     * .
      *
      * @param instance job instance bean
      * @param e        HttpClientErrorException
      * @return boolean
      */
-    private boolean checkStatus(JobInstanceBean instance, HttpClientErrorException e) {
+    private boolean checkStatus(JobInstanceBean instance,
+                                HttpClientErrorException e) {
         int code = e.getStatusCode().value();
         String appId = instance.getAppId();
         String responseBody = e.getResponseBodyAsString();
         Long sessionId = instance.getSessionId();
         sessionId = sessionId != null ? sessionId : -1;
-        // If code is 404 and appId is null and response body is like 'Session {id} not found',
-        // this means instance may not be scheduled for a long time by spark for too many tasks. It may be dead.
-        if (code == 404 && appId == null && (responseBody != null && responseBody.contains(sessionId.toString()))) {
+        // If code is 404 and appId is null and response body is like 'Session
+        // {id} not found',this means instance may not be scheduled for
+        // a long time by spark for too many tasks. It may be dead.
+        if (code == 404 && appId == null && (responseBody != null &&
+                responseBody.contains(sessionId.toString()))) {
             instance.setState(DEAD);
             instance.setDeleted(true);
             instanceRepo.save(instance);
@@ -514,7 +563,8 @@ public class JobServiceImpl implements JobService {
     }
 
     private void setStateByYarn(JobInstanceBean instance) {
-        LOGGER.warn("Spark session {} may be overdue! Now we use yarn to update state.", instance.getSessionId());
+        LOGGER.warn("Spark session {} may be overdue! " +
+                "Now we use yarn to update state.", instance.getSessionId());
         String yarnUrl = env.getProperty("yarn.uri");
         boolean success = YarnNetUtil.update(yarnUrl, instance);
         if (!success) {
@@ -527,34 +577,43 @@ public class JobServiceImpl implements JobService {
     }
 
 
-    private void setJobInstanceIdAndUri(JobInstanceBean instance, HashMap<String, Object> resultMap) {
+    private void setJobInstanceIdAndUri(JobInstanceBean instance, HashMap<String
+            , Object> resultMap) {
         if (resultMap != null) {
             Object state = resultMap.get("state");
             Object appId = resultMap.get("appId");
-            instance.setState(state == null ? null : LivySessionStates.State.valueOf(state.toString().toUpperCase()));
+            instance.setState(state == null ? null : LivySessionStates.State
+                    .valueOf(state.toString().toUpperCase
+                            ()));
             instance.setAppId(appId == null ? null : appId.toString());
-            instance.setAppUri(appId == null ? null : env.getProperty("yarn.uri") + "/cluster/app/" + appId);
+            instance.setAppUri(appId == null ? null : env
+                    .getProperty("yarn.uri") + " /cluster/app/ " + appId);
             instanceRepo.save(instance);
         }
     }
 
     public Boolean isJobHealthy(Long jobId) {
         Pageable pageable = new PageRequest(0, 1, Sort.Direction.DESC, "tms");
-        List<JobInstanceBean> instances = instanceRepo.findByJobId(jobId, pageable);
-        return !CollectionUtils.isEmpty(instances) && LivySessionStates.isHealthy(instances.get(0).getState());
+        List<JobInstanceBean> instances = instanceRepo.findByJobId(jobId,
+                pageable);
+        return !CollectionUtils.isEmpty(instances) && LivySessionStates
+                .isHealthy(instances.get(0).getState());
     }
 
     @Override
     public String getJobHdfsSinksPath(String jobName, long timestamp) {
-        List<AbstractJob> jobList = jobRepo.findByJobNameAndDeleted(jobName, false);
+        List<AbstractJob> jobList = jobRepo.findByJobNameAndDeleted(
+            jobName, false);
         if (jobList.size() == 0) {
             return null;
         }
         if (jobList.get(0).getType().toLowerCase().equals("batch")) {
-            return getSinksPath(ENV_BATCH) + "/" + jobName + "/" + timestamp + "";
+            return getSinksPath(ENV_BATCH)
+                + "/" + jobName + "/" + timestamp + "";
         }
 
-        return getSinksPath(ENV_STREAMING) + "/" + jobName + "/" + timestamp + "";
+        return getSinksPath(ENV_STREAMING)
+            + "/" + jobName + "/" + timestamp + "";
     }
 
     private String getSinksPath(String jsonString) {
@@ -563,8 +622,10 @@ public class JobServiceImpl implements JobService {
             JSONArray persistArray = obj.getJSONArray("sinks");
             for (int i = 0; i < persistArray.length(); i++) {
                 Object type = persistArray.getJSONObject(i).get("type");
-                if (type instanceof String && "hdfs".equalsIgnoreCase(String.valueOf(type))) {
-                    return persistArray.getJSONObject(i).getJSONObject("config").getString("path");
+                if (type instanceof String
+                    && "hdfs".equalsIgnoreCase(String.valueOf(type))) {
+                    return persistArray.getJSONObject(i)
+                        .getJSONObject("config").getString("path");
                 }
             }
 

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/3bbbcb32/service/src/main/java/org/apache/griffin/core/job/SparkSubmitJob.java
----------------------------------------------------------------------
diff --git a/service/src/main/java/org/apache/griffin/core/job/SparkSubmitJob.java b/service/src/main/java/org/apache/griffin/core/job/SparkSubmitJob.java
index 19e746f..64478b8 100644
--- a/service/src/main/java/org/apache/griffin/core/job/SparkSubmitJob.java
+++ b/service/src/main/java/org/apache/griffin/core/job/SparkSubmitJob.java
@@ -68,7 +68,8 @@ import org.springframework.web.client.RestTemplate;
 @DisallowConcurrentExecution
 @Component
 public class SparkSubmitJob implements Job {
-    private static final Logger LOGGER = LoggerFactory.getLogger(SparkSubmitJob.class);
+    private static final Logger LOGGER =
+            LoggerFactory.getLogger(SparkSubmitJob.class);
 
     @Autowired
     private JobInstanceRepo jobInstanceRepo;
@@ -99,7 +100,8 @@ public class SparkSubmitJob implements Job {
         }
     }
 
-    private void updateJobInstanceState(JobExecutionContext context) throws IOException {
+    private void updateJobInstanceState(JobExecutionContext context) throws
+            IOException {
         SimpleTrigger simpleTrigger = (SimpleTrigger) context.getTrigger();
         int repeatCount = simpleTrigger.getRepeatCount();
         int fireCount = simpleTrigger.getTimesTriggered();
@@ -111,10 +113,13 @@ public class SparkSubmitJob implements Job {
     private String post2Livy() {
         String result = null;
         try {
-            result = restTemplate.postForObject(livyUri, livyConfMap, String.class);
+            result = restTemplate.postForObject(livyUri, livyConfMap,
+                    String.class);
             LOGGER.info(result);
         } catch (HttpClientErrorException e) {
-            LOGGER.error("Post to livy ERROR. \n {} {}", e.getMessage(), e.getResponseBodyAsString());
+            LOGGER.error("Post to livy ERROR. \n {} {}",
+                    e.getMessage(),
+                    e.getResponseBodyAsString());
         } catch (Exception e) {
             LOGGER.error("Post to livy ERROR. {}", e.getMessage());
         }
@@ -126,7 +131,8 @@ public class SparkSubmitJob implements Job {
             return true;
         }
         for (SegmentPredicate segPredicate : predicates) {
-            Predicator predicator = PredicatorFactory.newPredicateInstance(segPredicate);
+            Predicator predicator = PredicatorFactory
+                    .newPredicateInstance(segPredicate);
             try {
                 if (predicator != null && !predicator.predicate()) {
                     return false;
@@ -141,11 +147,14 @@ public class SparkSubmitJob implements Job {
 
     private void initParam(JobDetail jd) throws IOException {
         mPredicates = new ArrayList<>();
-        jobInstance = jobInstanceRepo.findByPredicateName(jd.getJobDataMap().getString(PREDICATE_JOB_NAME));
-        measure = toEntity(jd.getJobDataMap().getString(MEASURE_KEY), GriffinMeasure.class);
+        jobInstance = jobInstanceRepo.findByPredicateName(jd.getJobDataMap()
+                .getString(PREDICATE_JOB_NAME));
+        measure = toEntity(jd.getJobDataMap().getString(MEASURE_KEY),
+                GriffinMeasure.class);
         livyUri = env.getProperty("livy.uri");
         setPredicates(jd.getJobDataMap().getString(PREDICATES_KEY));
-        // in order to keep metric name unique, we set job name as measure name at present
+        // in order to keep metric name unique, we set job name
+        // as measure name at present
         measure.setName(jd.getJobDataMap().getString(JOB_NAME));
     }
 
@@ -154,8 +163,9 @@ public class SparkSubmitJob implements Job {
         if (StringUtils.isEmpty(json)) {
             return;
         }
-        List<Map<String, Object>> maps = toEntity(json, new TypeReference<List<Map>>() {
-        });
+        List<Map<String, Object>> maps = toEntity(json,
+                new TypeReference<List<Map>>() {
+                });
         for (Map<String, Object> map : maps) {
             SegmentPredicate sp = new SegmentPredicate();
             sp.setType((String) map.get("type"));
@@ -195,8 +205,10 @@ public class SparkSubmitJob implements Job {
     }
 
 
-    private void saveJobInstance(JobDetail jd) throws SchedulerException, IOException {
-        // If result is null, it may livy uri is wrong or livy parameter is wrong.
+    private void saveJobInstance(JobDetail jd) throws SchedulerException,
+            IOException {
+        // If result is null, it may livy uri is wrong
+        // or livy parameter is wrong.
         String result = post2Livy();
         String group = jd.getKey().getGroup();
         String name = jd.getKey().getName();
@@ -205,9 +217,11 @@ public class SparkSubmitJob implements Job {
         saveJobInstance(result, FOUND);
     }
 
-    private void saveJobInstance(String result, State state) throws IOException {
-        TypeReference<HashMap<String, Object>> type = new TypeReference<HashMap<String, Object>>() {
-        };
+    private void saveJobInstance(String result, State state)
+            throws IOException {
+        TypeReference<HashMap<String, Object>> type =
+                new TypeReference<HashMap<String, Object>>() {
+                };
         Map<String, Object> resultMap = null;
         if (result != null) {
             resultMap = toEntity(result, type);
@@ -223,8 +237,10 @@ public class SparkSubmitJob implements Job {
             Object status = resultMap.get("state");
             Object id = resultMap.get("id");
             Object appId = resultMap.get("appId");
-            jobInstance.setState(status == null ? null : State.valueOf(status.toString().toUpperCase()));
-            jobInstance.setSessionId(id == null ? null : Long.parseLong(id.toString()));
+            jobInstance.setState(status == null ? null : State.valueOf(status
+                    .toString().toUpperCase()));
+            jobInstance.setSessionId(id == null ? null : Long.parseLong(id
+                    .toString()));
             jobInstance.setAppId(appId == null ? null : appId.toString());
         }
     }