You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@griffin.apache.org by gu...@apache.org on 2019/08/20 13:59:23 UTC

[griffin] branch master updated: [GRIFFIN-271] Before 0.6.0 milestone release, We need to check and tr…

This is an automated email from the ASF dual-hosted git repository.

guoyp pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/griffin.git


The following commit(s) were added to refs/heads/master by this push:
     new 255f147  [GRIFFIN-271] Before 0.6.0 milestone release, We need to check and tr…
255f147 is described below

commit 255f147c4484e9077083bc74b0a3af9bd83d4afb
Author: Eugene <li...@apache.org>
AuthorDate: Tue Aug 20 21:59:03 2019 +0800

    [GRIFFIN-271] Before 0.6.0 milestone release, We need to check and tr…
    
    …im current codes against apache griffin dev guide
    
    trim current java code by dev guide document
    
    Author: Eugene <li...@apache.org>
    
    Closes #522 from toyboxman/g271.
---
 .../apache/griffin/core/GriffinWebApplication.java |   4 +-
 .../griffin/core/common/SimpleCORSFilter.java      |   6 +-
 .../griffin/core/config/EclipseLinkJpaConfig.java  |   8 +-
 .../org/apache/griffin/core/config/EnvConfig.java  |  10 +-
 .../apache/griffin/core/config/LoginConfig.java    |   2 +-
 .../griffin/core/config/PropertiesConfig.java      |  25 ++--
 .../griffin/core/config/SchedulerConfig.java       |   3 +-
 .../griffin/core/event/GriffinEventManager.java    |  10 +-
 .../org/apache/griffin/core/event/JobEvent.java    |  24 ++--
 .../core/exception/GriffinExceptionHandler.java    |  16 +--
 .../core/exception/GriffinExceptionMessage.java    |  14 +--
 .../griffin/core/job/BatchJobOperatorImpl.java     |  56 ++++-----
 .../griffin/core/job/FileExistPredicator.java      |  11 +-
 .../org/apache/griffin/core/job/JobController.java |  24 ++--
 .../org/apache/griffin/core/job/JobInstance.java   |  77 ++++++-------
 .../org/apache/griffin/core/job/JobOperator.java   |   6 +-
 .../apache/griffin/core/job/JobServiceImpl.java    |   7 +-
 .../griffin/core/job/LivyTaskSubmitHelper.java     | 128 ++++++++++-----------
 .../org/apache/griffin/core/job/Predicator.java    |  10 ++
 .../apache/griffin/core/job/SparkSubmitJob.java    |  85 ++++++++------
 .../griffin/core/job/StreamingJobOperatorImpl.java |  70 +++++------
 .../griffin/core/job/entity/AbstractJob.java       |  23 ++--
 .../griffin/core/job/entity/JobDataSegment.java    |   6 +-
 .../griffin/core/job/entity/JobInstanceBean.java   |   2 +-
 .../griffin/core/job/entity/LivySessionStates.java |  14 +--
 .../griffin/core/job/entity/SegmentPredicate.java  |   9 +-
 .../griffin/core/job/entity/SegmentRange.java      |   2 -
 .../factory/AutowiringSpringBeanJobFactory.java    |  14 ++-
 .../core/job/factory/PredicatorFactory.java        |   4 +-
 .../griffin/core/job/repo/JobInstanceRepo.java     |   4 +-
 .../org/apache/griffin/core/job/repo/JobRepo.java  |   2 +-
 .../apache/griffin/core/login/LoginController.java |   2 +-
 .../apache/griffin/core/login/LoginService.java    |   8 ++
 .../core/login/LoginServiceDefaultImpl.java        |   3 +-
 .../griffin/core/login/LoginServiceLdapImpl.java   |  14 +--
 .../core/login/ldap/SelfSignedSocketFactory.java   |   2 +-
 .../core/measure/ExternalMeasureOperatorImpl.java  |   4 +-
 .../griffin/core/measure/MeasureController.java    |   6 +-
 .../griffin/core/measure/MeasureOrgService.java    |   4 +-
 .../core/measure/MeasureOrgServiceImpl.java        |  12 +-
 .../griffin/core/measure/MeasureServiceImpl.java   |  18 +--
 .../measure/entity/AbstractAuditableEntity.java    |   4 +
 .../griffin/core/measure/entity/DataSource.java    |   2 +-
 .../griffin/core/measure/entity/EvaluateRule.java  |   2 +-
 .../griffin/core/measure/entity/Measure.java       |   6 +-
 .../apache/griffin/core/measure/entity/Rule.java   |   4 +-
 .../core/measure/entity/StreamingPreProcess.java   |   4 +-
 .../griffin/core/measure/repo/MeasureRepo.java     |   6 +-
 .../core/metastore/hive/HiveMetaStoreProxy.java    |   4 +-
 .../metastore/hive/HiveMetaStoreServiceImpl.java   |  24 ++--
 .../hive/HiveMetaStoreServiceJdbcImpl.java         |  44 +++----
 .../metastore/kafka/KafkaSchemaController.java     |   2 +-
 .../metastore/kafka/KafkaSchemaServiceImpl.java    |  14 +--
 .../griffin/core/metric/MetricController.java      |  16 +--
 .../griffin/core/metric/MetricServiceImpl.java     |  34 +++---
 .../apache/griffin/core/metric/MetricStore.java    |   2 +-
 .../griffin/core/metric/MetricStoreImpl.java       |  86 +++++++-------
 .../griffin/core/metric/model/MetricValue.java     |  10 +-
 .../java/org/apache/griffin/core/util/FSUtil.java  |  15 +--
 .../org/apache/griffin/core/util/FileUtil.java     |   4 +-
 .../org/apache/griffin/core/util/JsonUtil.java     |  16 +--
 .../org/apache/griffin/core/util/MeasureUtil.java  |  10 +-
 .../apache/griffin/core/util/PropertiesUtil.java   |   4 +-
 .../org/apache/griffin/core/util/TimeUtil.java     |  18 +--
 .../org/apache/griffin/core/util/YarnNetUtil.java  |  14 +--
 .../griffin/core/job/SparkSubmitJobTest.java       |   7 +-
 66 files changed, 565 insertions(+), 536 deletions(-)

diff --git a/service/src/main/java/org/apache/griffin/core/GriffinWebApplication.java b/service/src/main/java/org/apache/griffin/core/GriffinWebApplication.java
index 33fc049..62bdc1f 100644
--- a/service/src/main/java/org/apache/griffin/core/GriffinWebApplication.java
+++ b/service/src/main/java/org/apache/griffin/core/GriffinWebApplication.java
@@ -18,7 +18,6 @@ under the License.
 */
 package org.apache.griffin.core;
 
-
 import org.apache.griffin.core.common.SimpleCORSFilter;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -27,12 +26,11 @@ import org.springframework.boot.autoconfigure.SpringBootApplication;
 import org.springframework.context.annotation.Bean;
 import org.springframework.scheduling.annotation.EnableScheduling;
 
-
 @SpringBootApplication
 @EnableScheduling
 public class GriffinWebApplication {
     private static final Logger LOGGER = LoggerFactory
-            .getLogger(GriffinWebApplication.class);
+        .getLogger(GriffinWebApplication.class);
 
     public static void main(String[] args) {
         SpringApplication.run(GriffinWebApplication.class, args);
diff --git a/service/src/main/java/org/apache/griffin/core/common/SimpleCORSFilter.java b/service/src/main/java/org/apache/griffin/core/common/SimpleCORSFilter.java
index bd39188..6c8e8c3 100644
--- a/service/src/main/java/org/apache/griffin/core/common/SimpleCORSFilter.java
+++ b/service/src/main/java/org/apache/griffin/core/common/SimpleCORSFilter.java
@@ -32,14 +32,14 @@ public class SimpleCORSFilter implements Filter {
     public void doFilter(final ServletRequest req,
                          final ServletResponse res,
                          final FilterChain chain)
-            throws IOException, ServletException {
+        throws IOException, ServletException {
         HttpServletResponse response = (HttpServletResponse) res;
         response.setHeader("Access-Control-Allow-Origin", "*");
         response.setHeader("Access-Control-Allow-Methods",
-                "POST, GET, OPTIONS, DELETE,PUT");
+            "POST, GET, OPTIONS, DELETE,PUT");
         response.setHeader("Access-Control-Max-Age", "3600");
         response.setHeader("Access-Control-Allow-Headers",
-                "X-PINGOTHER, Origin, X-Requested-With, Content-Type, Accept");
+            "X-PINGOTHER, Origin, X-Requested-With, Content-Type, Accept");
         chain.doFilter(req, res);
     }
 
diff --git a/service/src/main/java/org/apache/griffin/core/config/EclipseLinkJpaConfig.java b/service/src/main/java/org/apache/griffin/core/config/EclipseLinkJpaConfig.java
index 1493569..1463d54 100644
--- a/service/src/main/java/org/apache/griffin/core/config/EclipseLinkJpaConfig.java
+++ b/service/src/main/java/org/apache/griffin/core/config/EclipseLinkJpaConfig.java
@@ -37,9 +37,9 @@ import org.springframework.transaction.jta.JtaTransactionManager;
 @ComponentScan("org.apache.griffin.core")
 public class EclipseLinkJpaConfig extends JpaBaseConfiguration {
     protected EclipseLinkJpaConfig(
-            DataSource ds, JpaProperties properties,
-            ObjectProvider<JtaTransactionManager> jtm,
-            ObjectProvider<TransactionManagerCustomizers> tmc) {
+        DataSource ds, JpaProperties properties,
+        ObjectProvider<JtaTransactionManager> jtm,
+        ObjectProvider<TransactionManagerCustomizers> tmc) {
         super(ds, properties, jtm, tmc);
     }
 
@@ -53,7 +53,7 @@ public class EclipseLinkJpaConfig extends JpaBaseConfiguration {
         Map<String, Object> map = new HashMap<>();
         map.put(PersistenceUnitProperties.WEAVING, "false");
         map.put(PersistenceUnitProperties.DDL_GENERATION,
-                "create-or-extend-tables");
+            "create-or-extend-tables");
         return map;
     }
 }
diff --git a/service/src/main/java/org/apache/griffin/core/config/EnvConfig.java b/service/src/main/java/org/apache/griffin/core/config/EnvConfig.java
index 8c075a4..ef303a1 100644
--- a/service/src/main/java/org/apache/griffin/core/config/EnvConfig.java
+++ b/service/src/main/java/org/apache/griffin/core/config/EnvConfig.java
@@ -35,7 +35,7 @@ import org.springframework.core.io.ClassPathResource;
 
 public class EnvConfig {
     private static final Logger LOGGER = LoggerFactory
-            .getLogger(EnvConfig.class);
+        .getLogger(EnvConfig.class);
     public static String ENV_BATCH;
     public static String ENV_STREAMING;
 
@@ -47,7 +47,7 @@ public class EnvConfig {
      * @throws IOException io exception
      */
     private static String readEnvFromResource(String path)
-            throws IOException {
+        throws IOException {
         if (path == null) {
             LOGGER.warn("Parameter path is null.");
             return null;
@@ -75,7 +75,7 @@ public class EnvConfig {
      * @throws IOException io exception
      */
     private static String readEnvFromAbsolutePath(String path)
-            throws IOException {
+        throws IOException {
         if (path == null) {
             LOGGER.warn("Parameter path is null.");
             return null;
@@ -103,7 +103,7 @@ public class EnvConfig {
      * @throws IOException io exception
      */
     static String getBatchEnv(String name, String defaultPath, String location)
-            throws IOException {
+        throws IOException {
         if (ENV_BATCH != null) {
             return ENV_BATCH;
         }
@@ -121,7 +121,7 @@ public class EnvConfig {
     static String getStreamingEnv(String name,
                                   String defaultPath,
                                   String location)
-            throws IOException {
+        throws IOException {
         if (ENV_STREAMING != null) {
             return ENV_STREAMING;
         }
diff --git a/service/src/main/java/org/apache/griffin/core/config/LoginConfig.java b/service/src/main/java/org/apache/griffin/core/config/LoginConfig.java
index 8c9e4a2..c362755 100644
--- a/service/src/main/java/org/apache/griffin/core/config/LoginConfig.java
+++ b/service/src/main/java/org/apache/griffin/core/config/LoginConfig.java
@@ -53,7 +53,7 @@ public class LoginConfig {
                 return new LoginServiceDefaultImpl();
             case "ldap":
                 return new LoginServiceLdapImpl(url, email, searchBase,
-                        searchPattern, sslSkipVerify, bindDN, bindPassword);
+                    searchPattern, sslSkipVerify, bindDN, bindPassword);
             default:
                 return null;
         }
diff --git a/service/src/main/java/org/apache/griffin/core/config/PropertiesConfig.java b/service/src/main/java/org/apache/griffin/core/config/PropertiesConfig.java
index ebbd6cf..53c0dfc 100644
--- a/service/src/main/java/org/apache/griffin/core/config/PropertiesConfig.java
+++ b/service/src/main/java/org/apache/griffin/core/config/PropertiesConfig.java
@@ -42,11 +42,18 @@ import org.springframework.context.annotation.Bean;
 import org.springframework.context.annotation.Configuration;
 import org.springframework.core.io.ClassPathResource;
 
+/**
+ * PropertiesConfig is responsible for initializing configuration objects
+ * from property files.
+ *
+ * @see EnvConfig
+ * @see org.apache.griffin.core.util.PropertiesUtil
+ */
 @Configuration
 public class PropertiesConfig {
 
     private static final Logger LOGGER = LoggerFactory
-            .getLogger(PropertiesConfig.class);
+        .getLogger(PropertiesConfig.class);
 
     public static Map<String, Object> livyConfMap;
 
@@ -55,12 +62,12 @@ public class PropertiesConfig {
     private String envLocation;
 
     public PropertiesConfig(
-            @Value("${external.config.location}") String configLocation,
-            @Value("${external.env.location}") String envLocation) {
+        @Value("${external.config.location}") String configLocation,
+        @Value("${external.env.location}") String envLocation) {
         LOGGER.info("external.config.location : {}",
-                configLocation != null ? configLocation : "null");
+            configLocation != null ? configLocation : "null");
         LOGGER.info("external.env.location : {}",
-                envLocation != null ? envLocation : "null");
+            envLocation != null ? envLocation : "null");
         this.configLocation = configLocation;
         this.envLocation = envLocation;
     }
@@ -93,9 +100,9 @@ public class PropertiesConfig {
     }
 
     private static void genLivyConf(
-            String name,
-            String defaultPath,
-            String location) throws IOException {
+        String name,
+        String defaultPath,
+        String location) throws IOException {
         if (livyConfMap != null) {
             return;
         }
@@ -117,7 +124,7 @@ public class PropertiesConfig {
      * @throws IOException io exception
      */
     private static Map<String, Object> readPropertiesFromResource(String path)
-            throws IOException {
+        throws IOException {
         if (path == null) {
             LOGGER.warn("Parameter path is null.");
             return null;
diff --git a/service/src/main/java/org/apache/griffin/core/config/SchedulerConfig.java b/service/src/main/java/org/apache/griffin/core/config/SchedulerConfig.java
index 7b6af51..843e321 100644
--- a/service/src/main/java/org/apache/griffin/core/config/SchedulerConfig.java
+++ b/service/src/main/java/org/apache/griffin/core/config/SchedulerConfig.java
@@ -44,7 +44,7 @@ public class SchedulerConfig {
     @Bean
     public JobFactory jobFactory(ApplicationContext applicationContext) {
         AutowiringSpringBeanJobFactory jobFactory =
-                new AutowiringSpringBeanJobFactory();
+            new AutowiringSpringBeanJobFactory();
         jobFactory.setApplicationContext(applicationContext);
         return jobFactory;
     }
@@ -60,5 +60,4 @@ public class SchedulerConfig {
         return factory;
     }
 
-
 }
diff --git a/service/src/main/java/org/apache/griffin/core/event/GriffinEventManager.java b/service/src/main/java/org/apache/griffin/core/event/GriffinEventManager.java
index 2ebad57..e0dee89 100644
--- a/service/src/main/java/org/apache/griffin/core/event/GriffinEventManager.java
+++ b/service/src/main/java/org/apache/griffin/core/event/GriffinEventManager.java
@@ -43,11 +43,11 @@ public class GriffinEventManager {
     void initializeListeners() {
         List<GriffinHook> eventListeners = new ArrayList<>();
         applicationContext.getBeansOfType(GriffinHook.class)
-                .forEach((beanName, listener) -> {
-                    if (enabledListeners.contains(beanName)) {
-                        eventListeners.add(listener);
-                    }
-                });
+            .forEach((beanName, listener) -> {
+                if (enabledListeners.contains(beanName)) {
+                    eventListeners.add(listener);
+                }
+            });
         this.eventListeners = eventListeners;
     }
 
diff --git a/service/src/main/java/org/apache/griffin/core/event/JobEvent.java b/service/src/main/java/org/apache/griffin/core/event/JobEvent.java
index 5fdea0f..467cf28 100644
--- a/service/src/main/java/org/apache/griffin/core/event/JobEvent.java
+++ b/service/src/main/java/org/apache/griffin/core/event/JobEvent.java
@@ -32,29 +32,29 @@ public class JobEvent extends GriffinAbstractEvent<AbstractJob> {
 
     public static JobEvent yieldJobEventBeforeCreation(AbstractJob source) {
         return new JobEvent(source,
-                EventType.CREATION_EVENT,
-                EventSourceType.JOB,
-                EventPointcutType.BEFORE);
+            EventType.CREATION_EVENT,
+            EventSourceType.JOB,
+            EventPointcutType.BEFORE);
     }
 
     public static JobEvent yieldJobEventAfterCreation(AbstractJob source) {
         return new JobEvent(source,
-                EventType.CREATION_EVENT,
-                EventSourceType.JOB,
-                EventPointcutType.AFTER);
+            EventType.CREATION_EVENT,
+            EventSourceType.JOB,
+            EventPointcutType.AFTER);
     }
 
     public static JobEvent yieldJobEventBeforeRemoval(AbstractJob source) {
         return new JobEvent(source,
-                EventType.REMOVAL_EVENT,
-                EventSourceType.JOB,
-                EventPointcutType.BEFORE);
+            EventType.REMOVAL_EVENT,
+            EventSourceType.JOB,
+            EventPointcutType.BEFORE);
     }
 
     public static JobEvent yieldJobEventAfterRemoval(AbstractJob source) {
         return new JobEvent(source,
-                EventType.REMOVAL_EVENT,
-                EventSourceType.JOB,
-                EventPointcutType.AFTER);
+            EventType.REMOVAL_EVENT,
+            EventSourceType.JOB,
+            EventPointcutType.AFTER);
     }
 }
diff --git a/service/src/main/java/org/apache/griffin/core/exception/GriffinExceptionHandler.java b/service/src/main/java/org/apache/griffin/core/exception/GriffinExceptionHandler.java
index d987da7..5fb88fb 100644
--- a/service/src/main/java/org/apache/griffin/core/exception/GriffinExceptionHandler.java
+++ b/service/src/main/java/org/apache/griffin/core/exception/GriffinExceptionHandler.java
@@ -34,28 +34,28 @@ public class GriffinExceptionHandler {
     @SuppressWarnings("rawtypes")
     @ExceptionHandler(GriffinException.ServiceException.class)
     public ResponseEntity handleGriffinExceptionOfServer(
-            HttpServletRequest request,
-            GriffinException.ServiceException e) {
+        HttpServletRequest request,
+        GriffinException.ServiceException e) {
         String message = e.getMessage();
         Throwable cause = e.getCause();
         GriffinExceptionResponse body = new GriffinExceptionResponse(
-                HttpStatus.INTERNAL_SERVER_ERROR,
-                message, request.getRequestURI(), cause.getClass().getName());
+            HttpStatus.INTERNAL_SERVER_ERROR,
+            message, request.getRequestURI(), cause.getClass().getName());
         return new ResponseEntity<>(body, HttpStatus.INTERNAL_SERVER_ERROR);
     }
 
     @SuppressWarnings("rawtypes")
     @ExceptionHandler(GriffinException.class)
     public ResponseEntity handleGriffinExceptionOfClient(
-            HttpServletRequest request, GriffinException e) {
+        HttpServletRequest request, GriffinException e) {
         ResponseStatus responseStatus = AnnotationUtils.findAnnotation(
-                e.getClass(), ResponseStatus.class);
+            e.getClass(), ResponseStatus.class);
         HttpStatus status = responseStatus.code();
         String code = e.getMessage();
         GriffinExceptionMessage message = GriffinExceptionMessage
-                .valueOf(Integer.valueOf(code));
+            .valueOf(Integer.valueOf(code));
         GriffinExceptionResponse body = new GriffinExceptionResponse(
-                status, message, request.getRequestURI());
+            status, message, request.getRequestURI());
         return new ResponseEntity<>(body, status);
     }
 }
diff --git a/service/src/main/java/org/apache/griffin/core/exception/GriffinExceptionMessage.java b/service/src/main/java/org/apache/griffin/core/exception/GriffinExceptionMessage.java
index 4f65248..5a39c66 100644
--- a/service/src/main/java/org/apache/griffin/core/exception/GriffinExceptionMessage.java
+++ b/service/src/main/java/org/apache/griffin/core/exception/GriffinExceptionMessage.java
@@ -23,9 +23,9 @@ public enum GriffinExceptionMessage {
 
     //400, "Bad Request"
     MEASURE_TYPE_DOES_NOT_MATCH(40001, "Property 'measure.type' does not match"
-            + "the type of measure in request body"),
+        + "the type of measure in request body"),
     INVALID_CONNECTOR_NAME(40002, "Property 'name' in 'connectors' " +
-            "field is invalid"),
+        "field is invalid"),
     MISSING_METRIC_NAME(40003, "Missing property 'metricName'"),
     INVALID_JOB_NAME(40004, "Property 'job.name' is invalid"),
     MISSING_BASELINE_CONFIG(40005, "Missing 'as.baseline' config in 'data.segments'"),
@@ -45,9 +45,9 @@ public enum GriffinExceptionMessage {
     JOB_TYPE_DOES_NOT_SUPPORT(40011, "We don't support such job type."),
 
     STREAMING_JOB_IS_RUNNING(40012, "There is no need to start again " +
-            "as job is RUNNING."),
+        "as job is RUNNING."),
     STREAMING_JOB_IS_STOPPED(40012, "There is no need to stop again " +
-            "as job is STOPPED."),
+        "as job is STOPPED."),
     JOB_IS_NOT_SCHEDULED(40013, "The job isn't scheduled."),
 
     JOB_IS_NOT_IN_PAUSED_STATUS(40014, "The job isn't in paused status."),
@@ -66,9 +66,9 @@ public enum GriffinExceptionMessage {
     NO_SUCH_JOB_ACTION(40404, "No such job action"),
 
     JOB_KEY_DOES_NOT_EXIST(40405, "Job key which consists of " +
-            "group and name does not exist."),
+        "group and name does not exist."),
     ORGANIZATION_NAME_DOES_NOT_EXIST(40406, "Organization name " +
-            "does not exist"),
+        "does not exist"),
 
     HDFS_FILE_NOT_EXIST(40407, "Hadoop data file not exist"),
 
@@ -101,7 +101,7 @@ public enum GriffinExceptionMessage {
             }
         }
         throw new IllegalArgumentException("No matching constant for ["
-                + code + "]");
+            + code + "]");
     }
 
 
diff --git a/service/src/main/java/org/apache/griffin/core/job/BatchJobOperatorImpl.java b/service/src/main/java/org/apache/griffin/core/job/BatchJobOperatorImpl.java
index 0210822..cdaa68c 100644
--- a/service/src/main/java/org/apache/griffin/core/job/BatchJobOperatorImpl.java
+++ b/service/src/main/java/org/apache/griffin/core/job/BatchJobOperatorImpl.java
@@ -71,7 +71,7 @@ import org.springframework.util.StringUtils;
 @Service
 public class BatchJobOperatorImpl implements JobOperator {
     private static final Logger LOGGER = LoggerFactory
-            .getLogger(BatchJobOperatorImpl.class);
+        .getLogger(BatchJobOperatorImpl.class);
 
     @Autowired
     private SchedulerFactoryBean factory;
@@ -85,7 +85,7 @@ public class BatchJobOperatorImpl implements JobOperator {
     @Override
     @Transactional(rollbackFor = Exception.class)
     public AbstractJob add(AbstractJob job, GriffinMeasure measure)
-            throws Exception {
+        throws Exception {
         validateParams(job, measure);
         String qName = jobService.getQuartzName(job);
         String qGroup = jobService.getQuartzGroup();
@@ -120,20 +120,20 @@ public class BatchJobOperatorImpl implements JobOperator {
         TriggerState state = getTriggerState(name, group);
         if (state == null) {
             throw new GriffinException.BadRequestException(
-                    JOB_IS_NOT_SCHEDULED);
+                JOB_IS_NOT_SCHEDULED);
         }
         /* If job is not in paused state,we can't start it
         as it may be RUNNING.*/
         if (state != PAUSED) {
             throw new GriffinException.BadRequestException
-                    (JOB_IS_NOT_IN_PAUSED_STATUS);
+                (JOB_IS_NOT_IN_PAUSED_STATUS);
         }
         JobKey jobKey = jobKey(name, group);
         try {
             factory.getScheduler().resumeJob(jobKey);
         } catch (SchedulerException e) {
             throw new GriffinException.ServiceException(
-                    "Failed to start job.", e);
+                "Failed to start job.", e);
         }
     }
 
@@ -151,14 +151,14 @@ public class BatchJobOperatorImpl implements JobOperator {
 
     @Override
     public JobHealth getHealth(JobHealth jobHealth, AbstractJob job)
-            throws SchedulerException {
+        throws SchedulerException {
         List<? extends Trigger> triggers = jobService
-                .getTriggers(job.getName(), job.getGroup());
+            .getTriggers(job.getName(), job.getGroup());
         if (!CollectionUtils.isEmpty(triggers)) {
             jobHealth.setJobCount(jobHealth.getJobCount() + 1);
             if (jobService.isJobHealthy(job.getId())) {
                 jobHealth.setHealthyJobCount(
-                        jobHealth.getHealthyJobCount() + 1);
+                    jobHealth.getHealthyJobCount() + 1);
             }
         }
         return jobHealth;
@@ -166,7 +166,7 @@ public class BatchJobOperatorImpl implements JobOperator {
 
     @Override
     public JobState getState(AbstractJob job, String action)
-            throws SchedulerException {
+        throws SchedulerException {
         JobState jobState = new JobState();
         Scheduler scheduler = factory.getScheduler();
         if (job.getGroup() == null || job.getName() == null) {
@@ -182,9 +182,9 @@ public class BatchJobOperatorImpl implements JobOperator {
     }
 
     private void setTriggerTime(AbstractJob job, JobState jobState)
-            throws SchedulerException {
+        throws SchedulerException {
         List<? extends Trigger> triggers = jobService
-                .getTriggers(job.getName(), job.getGroup());
+            .getTriggers(job.getName(), job.getGroup());
         // If triggers are empty, in Griffin it means job is completed whose
         // trigger state is NONE or not scheduled.
         if (CollectionUtils.isEmpty(triggers)) {
@@ -194,9 +194,9 @@ public class BatchJobOperatorImpl implements JobOperator {
         Date nextFireTime = trigger.getNextFireTime();
         Date previousFireTime = trigger.getPreviousFireTime();
         jobState.setNextFireTime(nextFireTime != null ?
-                nextFireTime.getTime() : -1);
+            nextFireTime.getTime() : -1);
         jobState.setPreviousFireTime(previousFireTime != null ?
-                previousFireTime.getTime() : -1);
+            previousFireTime.getTime() : -1);
     }
 
     /**
@@ -225,7 +225,7 @@ public class BatchJobOperatorImpl implements JobOperator {
     private TriggerState getTriggerState(String name, String group) {
         try {
             List<? extends Trigger> triggers = jobService.getTriggers(name,
-                    group);
+                group);
             if (CollectionUtils.isEmpty(triggers)) {
                 return null;
             }
@@ -234,7 +234,7 @@ public class BatchJobOperatorImpl implements JobOperator {
         } catch (SchedulerException e) {
             LOGGER.error("Failed to delete job", e);
             throw new GriffinException
-                    .ServiceException("Failed to delete job", e);
+                .ServiceException("Failed to delete job", e);
         }
 
     }
@@ -254,7 +254,7 @@ public class BatchJobOperatorImpl implements JobOperator {
         } catch (Exception e) {
             LOGGER.error("Job schedule happens exception.", e);
             throw new GriffinException.ServiceException("Job schedule " +
-                    "happens exception.", e);
+                "happens exception.", e);
         }
     }
 
@@ -263,7 +263,7 @@ public class BatchJobOperatorImpl implements JobOperator {
         for (JobInstanceBean instance : instances) {
             if (!instance.isPredicateDeleted()) {
                 deleteJob(instance.getPredicateGroup(), instance
-                        .getPredicateName());
+                    .getPredicateName());
                 instance.setPredicateDeleted(true);
                 if (instance.getState().equals(LivySessionStates.State.FINDING)) {
                     instance.setState(LivySessionStates.State.NOT_FOUND);
@@ -277,7 +277,7 @@ public class BatchJobOperatorImpl implements JobOperator {
         JobKey jobKey = new JobKey(name, group);
         if (!scheduler.checkExists(jobKey)) {
             LOGGER.info("Job({},{}) does not exist.", jobKey.getGroup(), jobKey
-                    .getName());
+                .getName());
             return;
         }
         scheduler.deleteJob(jobKey);
@@ -292,9 +292,9 @@ public class BatchJobOperatorImpl implements JobOperator {
         JobKey jobKey = new JobKey(name, group);
         if (!scheduler.checkExists(jobKey)) {
             LOGGER.warn("Job({},{}) does not exist.", jobKey.getGroup(), jobKey
-                    .getName());
+                .getName());
             throw new GriffinException.NotFoundException
-                    (JOB_KEY_DOES_NOT_EXIST);
+                (JOB_KEY_DOES_NOT_EXIST);
         }
         scheduler.pauseJob(jobKey);
     }
@@ -326,7 +326,7 @@ public class BatchJobOperatorImpl implements JobOperator {
             }
         } catch (SchedulerException e) {
             LOGGER.error("Failed to pause predicate job({},{}).", pGroup,
-                    pName);
+                pName);
             status = false;
         }
         return status;
@@ -338,16 +338,16 @@ public class BatchJobOperatorImpl implements JobOperator {
         }
         if (!isValidCronExpression(job.getCronExpression())) {
             throw new GriffinException.BadRequestException
-                    (INVALID_CRON_EXPRESSION);
+                (INVALID_CRON_EXPRESSION);
         }
         if (!isValidBaseLine(job.getSegments())) {
             throw new GriffinException.BadRequestException
-                    (MISSING_BASELINE_CONFIG);
+                (MISSING_BASELINE_CONFIG);
         }
         List<String> names = getConnectorNames(measure);
         if (!isValidConnectorNames(job.getSegments(), names)) {
             throw new GriffinException.BadRequestException
-                    (INVALID_CONNECTOR_NAME);
+                (INVALID_CONNECTOR_NAME);
         }
     }
 
@@ -371,7 +371,7 @@ public class BatchJobOperatorImpl implements JobOperator {
             }
         }
         LOGGER.warn("Please set segment timestamp baseline " +
-                "in as.baseline field.");
+            "in as.baseline field.");
         return false;
     }
 
@@ -383,16 +383,16 @@ public class BatchJobOperatorImpl implements JobOperator {
             String dcName = segment.getDataConnectorName();
             sets.add(dcName);
             boolean exist = names.stream().anyMatch(name -> name.equals
-                    (dcName));
+                (dcName));
             if (!exist) {
                 LOGGER.warn("Param {} is a illegal string. " +
-                        "Please input one of strings in {}.", dcName, names);
+                    "Please input one of strings in {}.", dcName, names);
                 return false;
             }
         }
         if (sets.size() < segments.size()) {
             LOGGER.warn("Connector names in job data segment " +
-                    "cannot duplicate.");
+                "cannot duplicate.");
             return false;
         }
         return true;
diff --git a/service/src/main/java/org/apache/griffin/core/job/FileExistPredicator.java b/service/src/main/java/org/apache/griffin/core/job/FileExistPredicator.java
index 703a837..1a9209b 100644
--- a/service/src/main/java/org/apache/griffin/core/job/FileExistPredicator.java
+++ b/service/src/main/java/org/apache/griffin/core/job/FileExistPredicator.java
@@ -33,7 +33,7 @@ import org.slf4j.LoggerFactory;
 
 public class FileExistPredicator implements Predicator {
     private static final Logger LOGGER = LoggerFactory
-            .getLogger(FileExistPredicator.class);
+        .getLogger(FileExistPredicator.class);
 
     private static final String PREDICT_PATH = "path";
     private static final String PREDICT_ROOT_PATH = "root.path";
@@ -49,15 +49,14 @@ public class FileExistPredicator implements Predicator {
         Map<String, Object> config = predicate.getConfigMap();
         String[] paths = null;
         String rootPath = null;
-        if (config != null && !StringUtils.isEmpty((String) config.get
-                (PREDICT_PATH))) {
-            paths = ((String) config.get(PREDICT_PATH)).split
-                    (PATH_CONNECTOR_CHARACTER);
+        if (config != null && !StringUtils.isEmpty((String) config.get(PREDICT_PATH))) {
+            paths = ((String) config.get(PREDICT_PATH))
+                .split(PATH_CONNECTOR_CHARACTER);
             rootPath = (String) config.get(PREDICT_ROOT_PATH);
         }
         if (ArrayUtils.isEmpty(paths) || StringUtils.isEmpty(rootPath)) {
             LOGGER.error("Predicate path is null.Please check predicates " +
-                    "config root.path and path.");
+                "config root.path and path.");
             throw new NullPointerException();
         }
         for (String path : paths) {
diff --git a/service/src/main/java/org/apache/griffin/core/job/JobController.java b/service/src/main/java/org/apache/griffin/core/job/JobController.java
index aca90d2..3b52ee1 100644
--- a/service/src/main/java/org/apache/griffin/core/job/JobController.java
+++ b/service/src/main/java/org/apache/griffin/core/job/JobController.java
@@ -51,7 +51,7 @@ public class JobController {
 
     @RequestMapping(value = "/jobs", method = RequestMethod.GET)
     public List<AbstractJob> getJobs(@RequestParam(value = "type",
-            defaultValue = "") String type) {
+        defaultValue = "") String type) {
         return jobService.getAliveJobs(type);
     }
 
@@ -69,30 +69,30 @@ public class JobController {
     @RequestMapping(value = "/jobs/{id}", method = RequestMethod.PUT)
     @ResponseStatus(HttpStatus.OK)
     public AbstractJob onActions(
-            @PathVariable("id") Long jobId,
-            @RequestParam String action) throws Exception {
+        @PathVariable("id") Long jobId,
+        @RequestParam String action) throws Exception {
         return jobService.onAction(jobId, action);
     }
 
     @RequestMapping(value = "/jobs", method = RequestMethod.DELETE)
     @ResponseStatus(HttpStatus.NO_CONTENT)
     public void deleteJob(@RequestParam("jobName") String jobName)
-            throws SchedulerException {
+        throws SchedulerException {
         jobService.deleteJob(jobName);
     }
 
     @RequestMapping(value = "/jobs/{id}", method = RequestMethod.DELETE)
     @ResponseStatus(HttpStatus.NO_CONTENT)
     public void deleteJob(@PathVariable("id") Long id)
-            throws SchedulerException {
+        throws SchedulerException {
         jobService.deleteJob(id);
     }
 
     @RequestMapping(value = "/jobs/instances", method = RequestMethod.GET)
     public List<JobInstanceBean> findInstancesOfJob(
-            @RequestParam("jobId") Long id,
-            @RequestParam("page") int page,
-            @RequestParam("size") int size) {
+        @RequestParam("jobId") Long id,
+        @RequestParam("page") int page,
+        @RequestParam("size") int size) {
         return jobService.findInstancesOfJob(id, page, size);
     }
 
@@ -115,10 +115,10 @@ public class JobController {
         InputStreamResource resource = new InputStreamResource(
             FSUtil.getMissSampleInputStream(path));
         return ResponseEntity.ok().
-                header("content-disposition",
-                        "attachment; filename = sampleMissingData.json")
-                .contentType(MediaType.APPLICATION_OCTET_STREAM)
-                .body(resource);
+            header("content-disposition",
+                "attachment; filename = sampleMissingData.json")
+            .contentType(MediaType.APPLICATION_OCTET_STREAM)
+            .body(resource);
     }
 
     @RequestMapping(value = "/jobs/trigger/{id}", method = RequestMethod.POST)
diff --git a/service/src/main/java/org/apache/griffin/core/job/JobInstance.java b/service/src/main/java/org/apache/griffin/core/job/JobInstance.java
index c721d27..3b0f768 100644
--- a/service/src/main/java/org/apache/griffin/core/job/JobInstance.java
+++ b/service/src/main/java/org/apache/griffin/core/job/JobInstance.java
@@ -78,7 +78,7 @@ import org.springframework.transaction.annotation.Transactional;
 @DisallowConcurrentExecution
 public class JobInstance implements Job {
     private static final Logger LOGGER = LoggerFactory
-            .getLogger(JobInstance.class);
+        .getLogger(JobInstance.class);
     public static final String MEASURE_KEY = "measure";
     public static final String PREDICATES_KEY = "predicts";
     public static final String PREDICATE_JOB_NAME = "predicateJobName";
@@ -88,7 +88,7 @@ public class JobInstance implements Job {
     public static final String INTERVAL = "interval";
     public static final String REPEAT = "repeat";
     public static final String CHECK_DONEFILE_SCHEDULE =
-            "checkdonefile.schedule";
+        "checkdonefile.schedule";
 
     @Autowired
     private SchedulerFactoryBean factory;
@@ -119,7 +119,7 @@ public class JobInstance implements Job {
     }
 
     private void initParam(JobExecutionContext context)
-            throws SchedulerException {
+        throws SchedulerException {
         mPredicates = new ArrayList<>();
         JobDetail jobDetail = context.getJobDetail();
         Long jobId = jobDetail.getJobDataMap().getLong(GRIFFIN_JOB_ID);
@@ -135,18 +135,17 @@ public class JobInstance implements Job {
 
     @SuppressWarnings("unchecked")
     private void setJobStartTime(JobDetail jobDetail)
-            throws SchedulerException {
+        throws SchedulerException {
         Scheduler scheduler = factory.getScheduler();
         JobKey jobKey = jobDetail.getKey();
         List<Trigger> triggers =
-                (List<Trigger>) scheduler.getTriggersOfJob(jobKey);
+            (List<Trigger>) scheduler.getTriggersOfJob(jobKey);
         Date triggerTime = triggers.get(0).getPreviousFireTime();
         jobStartTime = triggerTime.getTime();
     }
 
-
     private void setSourcesPartitionsAndPredicates(List<DataSource> sources)
-            throws Exception {
+        throws Exception {
         boolean isFirstBaseline = true;
         for (JobDataSegment jds : job.getSegments()) {
             if (jds.isAsTsBaseline() && isFirstBaseline) {
@@ -162,17 +161,16 @@ public class JobInstance implements Job {
     }
 
     private void setDataSourcePartitions(JobDataSegment jds, DataSource ds)
-            throws Exception {
+        throws Exception {
         List<DataConnector> connectors = ds.getConnectors();
         for (DataConnector dc : connectors) {
             setDataConnectorPartitions(jds, dc);
         }
     }
 
-
     private void setDataConnectorPartitions(
-            JobDataSegment jds,
-            DataConnector dc) throws Exception {
+        JobDataSegment jds,
+        DataConnector dc) throws Exception {
         String dcName = jds.getDataConnectorName();
         if (dcName.equals(dc.getName())) {
             Long[] sampleTs = genSampleTs(jds.getSegmentRange(), dc);
@@ -193,7 +191,7 @@ public class JobInstance implements Job {
         Long range = TimeUtil.str2Long(segRange.getLength());
         String unit = dc.getDataUnit();
         Long dataUnit = TimeUtil.str2Long(StringUtils.isEmpty(unit) ? dc
-                .getDefaultDataUnit() : unit);
+            .getDefaultDataUnit() : unit);
         //offset usually is negative
         Long dataStartTime = jobStartTime + offset;
         if (range < 0) {
@@ -221,8 +219,8 @@ public class JobInstance implements Job {
         List<SegmentPredicate> predicates = dc.getPredicates();
         for (SegmentPredicate predicate : predicates) {
             genConfMap(predicate.getConfigMap(),
-                    sampleTs,
-                    dc.getDataTimeZone());
+                sampleTs,
+                dc.getDataTimeZone());
             //Do not forget to update origin string config
             predicate.setConfigMap(predicate.getConfigMap());
             mPredicates.add(predicate);
@@ -234,7 +232,6 @@ public class JobInstance implements Job {
         dc.setConfigMap(dc.getConfigMap());
     }
 
-
     /**
      * @param conf     config map
      * @param sampleTs collection of data split start timestamp
@@ -244,9 +241,8 @@ public class JobInstance implements Job {
      * or like {"path": "/year=2017/month=11/dt=15/hour=09/_DONE
      * ,/year=2017/month=11/dt=15/hour=10/_DONE"}
      */
-
     private void genConfMap(Map<String, Object> conf, Long[] sampleTs, String
-            timezone) {
+        timezone) {
         if (conf == null) {
             LOGGER.warn("Predicate config is null.");
             return;
@@ -261,29 +257,28 @@ public class JobInstance implements Job {
                 }
                 for (Long timestamp : sampleTs) {
                     set.add(TimeUtil.format(value, timestamp,
-                            TimeUtil.getTimeZone(timezone)));
+                        TimeUtil.getTimeZone(timezone)));
                 }
                 conf.put(entry.getKey(), StringUtils.join(set,
-                        PATH_CONNECTOR_CHARACTER));
+                    PATH_CONNECTOR_CHARACTER));
             }
         }
     }
 
     @SuppressWarnings("unchecked")
     private void createJobInstance(Map<String, Object> confMap)
-            throws Exception {
+        throws Exception {
         confMap = checkConfMap(confMap != null ? confMap : new HashMap<>());
         Map<String, Object> config = (Map<String, Object>) confMap
-                .get(CHECK_DONEFILE_SCHEDULE);
+            .get(CHECK_DONEFILE_SCHEDULE);
         Long interval = TimeUtil.str2Long((String) config.get(INTERVAL));
         Integer repeat = Integer.valueOf(config.get(REPEAT).toString());
         String groupName = "PG";
-        String jobName = job.getJobName() + "_predicate_" + System
-                .currentTimeMillis();
+        String jobName = job.getJobName() + "_predicate_"
+            + System.currentTimeMillis();
         TriggerKey tk = triggerKey(jobName, groupName);
         if (factory.getScheduler().checkExists(tk)) {
-            throw new GriffinException.ConflictException
-                    (QUARTZ_JOB_ALREADY_EXIST);
+            throw new GriffinException.ConflictException(QUARTZ_JOB_ALREADY_EXIST);
         }
         String triggerKey = (String) confMap.get(TRIGGER_KEY);
         saveJobInstance(jobName, groupName, triggerKey);
@@ -293,7 +288,7 @@ public class JobInstance implements Job {
     @SuppressWarnings("unchecked")
     Map<String, Object> checkConfMap(Map<String, Object> confMap) {
         Map<String, Object> config = (Map<String, Object>) confMap.get
-                (CHECK_DONEFILE_SCHEDULE);
+            (CHECK_DONEFILE_SCHEDULE);
         String interval = env.getProperty("predicate.job.interval");
         interval = interval != null ? interval : "5m";
         String repeat = env.getProperty("predicate.job.repeat.count");
@@ -316,38 +311,36 @@ public class JobInstance implements Job {
 
     private void saveJobInstance(String pName, String pGroup, String triggerKey) {
         ProcessType type = measure.getProcessType() == BATCH ? BATCH :
-                STREAMING;
+            STREAMING;
         Long tms = System.currentTimeMillis();
         String expired = env.getProperty("jobInstance.expired.milliseconds");
         Long expireTms = Long.valueOf(expired != null ? expired : "604800000")
-                + tms;
+            + tms;
         JobInstanceBean instance = new JobInstanceBean(FINDING, pName, pGroup,
-                tms, expireTms, type);
+            tms, expireTms, type);
         instance.setJob(job);
         instance.setTriggerKey(triggerKey);
         instanceRepo.save(instance);
     }
 
-
     private void createJobInstance(TriggerKey tk, Long interval, Integer
-            repeatCount, String pJobName) throws Exception {
+        repeatCount, String pJobName) throws Exception {
         JobDetail jobDetail = addJobDetail(tk, pJobName);
         Trigger trigger = genTriggerInstance(tk, jobDetail, interval,
-                repeatCount);
+            repeatCount);
         factory.getScheduler().scheduleJob(trigger);
     }
 
-
     private Trigger genTriggerInstance(TriggerKey tk, JobDetail jd, Long
-            interval, Integer repeatCount) {
+        interval, Integer repeatCount) {
         return newTrigger().withIdentity(tk).forJob(jd).startNow()
-                .withSchedule(simpleSchedule().withIntervalInMilliseconds
-                        (interval).withRepeatCount(repeatCount))
-                .build();
+            .withSchedule(simpleSchedule().withIntervalInMilliseconds
+                (interval).withRepeatCount(repeatCount))
+            .build();
     }
 
     private JobDetail addJobDetail(TriggerKey tk, String pJobName)
-            throws SchedulerException, IOException {
+        throws SchedulerException, IOException {
         Scheduler scheduler = factory.getScheduler();
         JobKey jobKey = jobKey(tk.getName(), tk.getGroup());
         JobDetail jobDetail;
@@ -356,9 +349,9 @@ public class JobInstance implements Job {
             jobDetail = scheduler.getJobDetail(jobKey);
         } else {
             jobDetail = newJob(SparkSubmitJob.class)
-                    .storeDurably()
-                    .withIdentity(jobKey)
-                    .build();
+                .storeDurably()
+                .withIdentity(jobKey)
+                .build();
         }
         setJobDataMap(jobDetail, pJobName);
         scheduler.addJob(jobDetail, isJobKeyExist);
@@ -366,7 +359,7 @@ public class JobInstance implements Job {
     }
 
     private void setJobDataMap(JobDetail jobDetail, String pJobName)
-            throws IOException {
+        throws IOException {
         JobDataMap dataMap = jobDetail.getJobDataMap();
         preProcessMeasure();
         String result = toJson(measure);
diff --git a/service/src/main/java/org/apache/griffin/core/job/JobOperator.java b/service/src/main/java/org/apache/griffin/core/job/JobOperator.java
index 81c3b17..05279f9 100644
--- a/service/src/main/java/org/apache/griffin/core/job/JobOperator.java
+++ b/service/src/main/java/org/apache/griffin/core/job/JobOperator.java
@@ -27,7 +27,7 @@ import org.quartz.SchedulerException;
 
 public interface JobOperator {
     AbstractJob add(AbstractJob job, GriffinMeasure measure)
-            throws Exception;
+        throws Exception;
 
     void start(AbstractJob job) throws Exception;
 
@@ -36,8 +36,8 @@ public interface JobOperator {
     void delete(AbstractJob job) throws SchedulerException;
 
     JobHealth getHealth(JobHealth jobHealth, AbstractJob job)
-            throws SchedulerException;
+        throws SchedulerException;
 
     JobState getState(AbstractJob job, String action)
-            throws SchedulerException;
+        throws SchedulerException;
 }
diff --git a/service/src/main/java/org/apache/griffin/core/job/JobServiceImpl.java b/service/src/main/java/org/apache/griffin/core/job/JobServiceImpl.java
index bd8988c..f41eb2a 100644
--- a/service/src/main/java/org/apache/griffin/core/job/JobServiceImpl.java
+++ b/service/src/main/java/org/apache/griffin/core/job/JobServiceImpl.java
@@ -101,7 +101,6 @@ import org.springframework.stereotype.Service;
 import org.springframework.util.CollectionUtils;
 import org.springframework.web.client.HttpClientErrorException;
 import org.springframework.web.client.ResourceAccessException;
-import org.springframework.web.client.RestTemplate;
 
 @Service
 public class JobServiceImpl implements JobService {
@@ -686,9 +685,9 @@ public class JobServiceImpl implements JobService {
         JobKey jobKey = jobKey(job.getName(), job.getGroup());
         if (scheduler.checkExists(jobKey)) {
             Trigger trigger = TriggerBuilder.newTrigger()
-                    .forJob(jobKey)
-                    .startNow()
-                    .build();
+                .forJob(jobKey)
+                .startNow()
+                .build();
             scheduler.scheduleJob(trigger);
             return trigger.getKey().toString();
         } else {
diff --git a/service/src/main/java/org/apache/griffin/core/job/LivyTaskSubmitHelper.java b/service/src/main/java/org/apache/griffin/core/job/LivyTaskSubmitHelper.java
index d9ee9e8..cf50527 100644
--- a/service/src/main/java/org/apache/griffin/core/job/LivyTaskSubmitHelper.java
+++ b/service/src/main/java/org/apache/griffin/core/job/LivyTaskSubmitHelper.java
@@ -19,8 +19,26 @@ under the License.
 
 package org.apache.griffin.core.job;
 
+import static org.apache.griffin.core.config.PropertiesConfig.livyConfMap;
+import static org.apache.griffin.core.job.entity.LivySessionStates.State.NOT_FOUND;
+import static org.apache.griffin.core.util.JsonUtil.toEntity;
+import static org.apache.griffin.core.util.JsonUtil.toJsonWithFormat;
+
 import com.fasterxml.jackson.core.JsonProcessingException;
 import com.fasterxml.jackson.core.type.TypeReference;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.atomic.AtomicInteger;
+import javax.annotation.PostConstruct;
+
 import org.apache.commons.collections.map.HashedMap;
 import org.quartz.JobDetail;
 import org.slf4j.Logger;
@@ -35,29 +53,13 @@ import org.springframework.security.kerberos.client.KerberosRestTemplate;
 import org.springframework.stereotype.Component;
 import org.springframework.web.client.RestTemplate;
 
-import javax.annotation.PostConstruct;
-import java.io.IOException;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-
-import static org.apache.griffin.core.config.PropertiesConfig.livyConfMap;
-import static org.apache.griffin.core.job.entity.LivySessionStates.State.NOT_FOUND;
-import static org.apache.griffin.core.util.JsonUtil.toEntity;
-import static org.apache.griffin.core.util.JsonUtil.toJsonWithFormat;
-
 @Component
 public class LivyTaskSubmitHelper {
-
-    private static final Logger logger = LoggerFactory.getLogger(LivyTaskSubmitHelper.class);
-
+    private static final Logger LOGGER = LoggerFactory.getLogger(LivyTaskSubmitHelper.class);
     private static final String REQUEST_BY_HEADER = "X-Requested-By";
+    public static final int DEFAULT_QUEUE_SIZE = 20000;
+    private static final int SLEEP_TIME = 300;
+
     private SparkSubmitJob sparkSubmitJob;
     private ConcurrentMap<Long, Integer> taskAppIdMap = new ConcurrentHashMap<>();
     // Current number of tasks
@@ -66,8 +68,6 @@ public class LivyTaskSubmitHelper {
     private RestTemplate restTemplate = new RestTemplate();
     // queue for pub or sub
     private BlockingQueue<JobDetail> queue;
-    public static final int DEFAULT_QUEUE_SIZE = 20000;
-    private static final int SLEEP_TIME = 300;
     private String uri;
 
     @Value("${livy.task.max.concurrent.count:20}")
@@ -75,7 +75,6 @@ public class LivyTaskSubmitHelper {
     @Value("${livy.task.submit.interval.second:3}")
     private int batchIntervalSecond;
 
-
     @Autowired
     private Environment env;
 
@@ -86,7 +85,7 @@ public class LivyTaskSubmitHelper {
     public void init() {
         startWorker();
         uri = env.getProperty("livy.uri");
-        logger.info("Livy uri : {}", uri);
+        LOGGER.info("Livy uri : {}", uri);
     }
 
     public LivyTaskSubmitHelper() {
@@ -110,20 +109,19 @@ public class LivyTaskSubmitHelper {
      */
     public void addTaskToWaitingQueue(JobDetail jd) throws IOException {
         if (jd == null) {
-            logger.warn("task is blank, workerNamePre: {}", workerNamePre);
+            LOGGER.warn("task is blank, workerNamePre: {}", workerNamePre);
             return;
         }
 
         if (queue.remainingCapacity() <= 0) {
-            logger.warn("task is discard, workerNamePre: {}, task: {}", workerNamePre, jd);
+            LOGGER.warn("task is discard, workerNamePre: {}, task: {}", workerNamePre, jd);
             sparkSubmitJob.saveJobInstance(null, NOT_FOUND);
             return;
         }
 
         queue.add(jd);
-
-        logger.info("add_task_to_waiting_queue_success, workerNamePre: {}, task: {}",
-                workerNamePre, jd);
+        LOGGER.info("add_task_to_waiting_queue_success, workerNamePre: {}, task: {}",
+            workerNamePre, jd);
     }
 
     /**
@@ -141,7 +139,7 @@ public class LivyTaskSubmitHelper {
             while (true) {
                 try {
                     if (curConcurrentTaskNum.get() < maxConcurrentTaskCount
-                            && (System.currentTimeMillis() - insertTime) >= batchIntervalSecond * 1000) {
+                        && (System.currentTimeMillis() - insertTime) >= batchIntervalSecond * 1000) {
                         JobDetail jd = queue.take();
                         sparkSubmitJob.saveJobInstance(jd);
                         insertTime = System.currentTimeMillis();
@@ -149,7 +147,7 @@ public class LivyTaskSubmitHelper {
                         Thread.sleep(SLEEP_TIME);
                     }
                 } catch (Exception e) {
-                    logger.error("Async_worker_doTask_failed, {}", e.getMessage(), e);
+                    LOGGER.error("Async_worker_doTask_failed, {}", e.getMessage(), e);
                     es.execute(this);
                 }
             }
@@ -181,12 +179,12 @@ public class LivyTaskSubmitHelper {
     }
 
     protected Map<String, Object> retryLivyGetAppId(String result, int appIdRetryCount)
-            throws IOException {
+        throws IOException {
 
         int retryCount = appIdRetryCount;
         TypeReference<HashMap<String, Object>> type =
-                new TypeReference<HashMap<String, Object>>() {
-                };
+            new TypeReference<HashMap<String, Object>>() {
+            };
         Map<String, Object> resultMap = toEntity(result, type);
 
         if (retryCount <= 0) {
@@ -206,10 +204,10 @@ public class LivyTaskSubmitHelper {
             try {
                 Thread.sleep(SLEEP_TIME);
             } catch (InterruptedException e) {
-                logger.error(e.getMessage(), e);
+                LOGGER.error(e.getMessage(), e);
             }
             resultMap = getResultByLivyId(livyBatchesId, type);
-            logger.info("retry get livy resultMap: {}, batches id : {}", resultMap, livyBatchesId);
+            LOGGER.info("retry get livy resultMap: {}, batches id : {}", resultMap, livyBatchesId);
 
             if (resultMap.get("appId") != null) {
                 break;
@@ -220,104 +218,102 @@ public class LivyTaskSubmitHelper {
     }
 
     private Map<String, Object> getResultByLivyId(Object livyBatchesId, TypeReference<HashMap<String, Object>> type)
-            throws IOException {
+        throws IOException {
         Map<String, Object> resultMap = new HashedMap();
         String livyUri = uri + "/" + livyBatchesId;
         String result = getFromLivy(livyUri);
-        logger.info(result);
+        LOGGER.info(result);
         return result == null ? resultMap : toEntity(result, type);
     }
 
     public String postToLivy(String uri) {
-        logger.info("Post To Livy URI is: " + uri);
+        LOGGER.info("Post To Livy URI is: " + uri);
         String needKerberos = env.getProperty("livy.need.kerberos");
-        logger.info("Need Kerberos:" + needKerberos);
+        LOGGER.info("Need Kerberos:" + needKerberos);
 
         HttpHeaders headers = new HttpHeaders();
         headers.setContentType(MediaType.APPLICATION_JSON);
-        headers.set(REQUEST_BY_HEADER,"admin");
+        headers.set(REQUEST_BY_HEADER, "admin");
 
         if (needKerberos == null || needKerberos.isEmpty()) {
-            logger.error("The property \"livy.need.kerberos\" is empty");
+            LOGGER.error("The property \"livy.need.kerberos\" is empty");
             return null;
         }
 
         if (needKerberos.equalsIgnoreCase("false")) {
-            logger.info("The livy server doesn't need Kerberos Authentication");
+            LOGGER.info("The livy server doesn't need Kerberos Authentication");
             String result = null;
             try {
-
-                HttpEntity<String> springEntity = new HttpEntity<>(toJsonWithFormat(livyConfMap),headers);
-                result = restTemplate.postForObject(uri,springEntity,String.class);
-
-                logger.info(result);
+                HttpEntity<String> springEntity = new HttpEntity<>(toJsonWithFormat(livyConfMap), headers);
+                result = restTemplate.postForObject(uri, springEntity, String.class);
+                LOGGER.info(result);
             } catch (JsonProcessingException e) {
-                logger.error("Post to livy ERROR. \n {}", e.getMessage());
+                LOGGER.error("Post to livy ERROR. \n {}", e.getMessage());
             }
             return result;
         } else {
-            logger.info("The livy server needs Kerberos Authentication");
+            LOGGER.info("The livy server needs Kerberos Authentication");
             String userPrincipal = env.getProperty("livy.server.auth.kerberos.principal");
             String keyTabLocation = env.getProperty("livy.server.auth.kerberos.keytab");
-            logger.info("principal:{}, lcoation:{}", userPrincipal, keyTabLocation);
+            LOGGER.info("principal:{}, lcoation:{}", userPrincipal, keyTabLocation);
 
             KerberosRestTemplate restTemplate = new KerberosRestTemplate(keyTabLocation, userPrincipal);
             HttpEntity<String> springEntity = null;
             try {
                 springEntity = new HttpEntity<>(toJsonWithFormat(livyConfMap), headers);
             } catch (JsonProcessingException e) {
-                e.printStackTrace();
+                LOGGER.error("Json Parsing failed, {}", e.getMessage(), e);
             }
             String result = restTemplate.postForObject(uri, springEntity, String.class);
-            logger.info(result);
+            LOGGER.info(result);
             return result;
         }
     }
 
     public String getFromLivy(String uri) {
-        logger.info("Get From Livy URI is: " + uri);
+        LOGGER.info("Get From Livy URI is: " + uri);
         String needKerberos = env.getProperty("livy.need.kerberos");
-        logger.info("Need Kerberos:" + needKerberos);
+        LOGGER.info("Need Kerberos:" + needKerberos);
 
         if (needKerberos == null || needKerberos.isEmpty()) {
-            logger.error("The property \"livy.need.kerberos\" is empty");
+            LOGGER.error("The property \"livy.need.kerberos\" is empty");
             return null;
         }
 
         if (needKerberos.equalsIgnoreCase("false")) {
-            logger.info("The livy server doesn't need Kerberos Authentication");
+            LOGGER.info("The livy server doesn't need Kerberos Authentication");
             return restTemplate.getForObject(uri, String.class);
         } else {
-            logger.info("The livy server needs Kerberos Authentication");
+            LOGGER.info("The livy server needs Kerberos Authentication");
             String userPrincipal = env.getProperty("livy.server.auth.kerberos.principal");
             String keyTabLocation = env.getProperty("livy.server.auth.kerberos.keytab");
-            logger.info("principal:{}, lcoation:{}", userPrincipal, keyTabLocation);
+            LOGGER.info("principal:{}, lcoation:{}", userPrincipal, keyTabLocation);
 
             KerberosRestTemplate restTemplate = new KerberosRestTemplate(keyTabLocation, userPrincipal);
             String result = restTemplate.getForObject(uri, String.class);
-            logger.info(result);
+            LOGGER.info(result);
             return result;
         }
     }
 
     public void deleteByLivy(String uri) {
-        logger.info("Delete by Livy URI is: " + uri);
+        LOGGER.info("Delete by Livy URI is: " + uri);
         String needKerberos = env.getProperty("livy.need.kerberos");
-        logger.info("Need Kerberos:" + needKerberos);
+        LOGGER.info("Need Kerberos:" + needKerberos);
 
         if (needKerberos == null || needKerberos.isEmpty()) {
-            logger.error("The property \"livy.need.kerberos\" is empty");
+            LOGGER.error("The property \"livy.need.kerberos\" is empty");
             return;
         }
 
         if (needKerberos.equalsIgnoreCase("false")) {
-            logger.info("The livy server doesn't need Kerberos Authentication");
+            LOGGER.info("The livy server doesn't need Kerberos Authentication");
             new RestTemplate().delete(uri);
         } else {
-            logger.info("The livy server needs Kerberos Authentication");
+            LOGGER.info("The livy server needs Kerberos Authentication");
             String userPrincipal = env.getProperty("livy.server.auth.kerberos.principal");
             String keyTabLocation = env.getProperty("livy.server.auth.kerberos.keytab");
-            logger.info("principal:{}, lcoation:{}", userPrincipal, keyTabLocation);
+            LOGGER.info("principal:{}, lcoation:{}", userPrincipal, keyTabLocation);
 
             KerberosRestTemplate restTemplate = new KerberosRestTemplate(keyTabLocation, userPrincipal);
             restTemplate.delete(uri);
diff --git a/service/src/main/java/org/apache/griffin/core/job/Predicator.java b/service/src/main/java/org/apache/griffin/core/job/Predicator.java
index dd9e105..153157e 100644
--- a/service/src/main/java/org/apache/griffin/core/job/Predicator.java
+++ b/service/src/main/java/org/apache/griffin/core/job/Predicator.java
@@ -21,6 +21,16 @@ package org.apache.griffin.core.job;
 
 import java.io.IOException;
 
+/**
+ * Predicator is an object that judges if one condition is met.
+ * This interface only has one method {@link #predicate()}
+ */
 public interface Predicator {
+    /**
+     * predicate a condition
+     *
+     * @return True condition is met, otherwise False
+     * @throws IOException
+     */
     boolean predicate() throws IOException;
 }
diff --git a/service/src/main/java/org/apache/griffin/core/job/SparkSubmitJob.java b/service/src/main/java/org/apache/griffin/core/job/SparkSubmitJob.java
index 01e41ca..e48053d 100644
--- a/service/src/main/java/org/apache/griffin/core/job/SparkSubmitJob.java
+++ b/service/src/main/java/org/apache/griffin/core/job/SparkSubmitJob.java
@@ -19,7 +19,27 @@ under the License.
 
 package org.apache.griffin.core.job;
 
+import static org.apache.griffin.core.config.EnvConfig.ENV_BATCH;
+import static org.apache.griffin.core.config.EnvConfig.ENV_STREAMING;
+import static org.apache.griffin.core.config.PropertiesConfig.livyConfMap;
+import static org.apache.griffin.core.job.JobInstance.JOB_NAME;
+import static org.apache.griffin.core.job.JobInstance.MEASURE_KEY;
+import static org.apache.griffin.core.job.JobInstance.PREDICATES_KEY;
+import static org.apache.griffin.core.job.JobInstance.PREDICATE_JOB_NAME;
+import static org.apache.griffin.core.job.entity.LivySessionStates.State;
+import static org.apache.griffin.core.job.entity.LivySessionStates.State.FOUND;
+import static org.apache.griffin.core.job.entity.LivySessionStates.State.NOT_FOUND;
+import static org.apache.griffin.core.measure.entity.GriffinMeasure.ProcessType.BATCH;
+import static org.apache.griffin.core.util.JsonUtil.toEntity;
+
 import com.fasterxml.jackson.core.type.TypeReference;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
 import org.apache.griffin.core.job.entity.JobInstanceBean;
 import org.apache.griffin.core.job.entity.SegmentPredicate;
 import org.apache.griffin.core.job.factory.PredicatorFactory;
@@ -42,31 +62,21 @@ import org.springframework.core.env.Environment;
 import org.springframework.stereotype.Component;
 import org.springframework.util.CollectionUtils;
 import org.springframework.util.StringUtils;
-import org.springframework.web.client.RestTemplate;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-import static org.apache.griffin.core.config.EnvConfig.ENV_BATCH;
-import static org.apache.griffin.core.config.EnvConfig.ENV_STREAMING;
-import static org.apache.griffin.core.config.PropertiesConfig.livyConfMap;
-import static org.apache.griffin.core.job.JobInstance.*;
-import static org.apache.griffin.core.job.entity.LivySessionStates.State;
-import static org.apache.griffin.core.job.entity.LivySessionStates.State.FOUND;
-import static org.apache.griffin.core.job.entity.LivySessionStates.State.NOT_FOUND;
-import static org.apache.griffin.core.measure.entity.GriffinMeasure.ProcessType.BATCH;
-import static org.apache.griffin.core.util.JsonUtil.toEntity;
 
+/**
+ * Simple implementation of the Quartz Job interface, submitting the
+ * griffin job to spark cluster via livy
+ *
+ * @see LivyTaskSubmitHelper#postToLivy(String)
+ * @see Job#execute(JobExecutionContext) 
+ */
 @PersistJobDataAfterExecution
 @DisallowConcurrentExecution
 @Component
 public class SparkSubmitJob implements Job {
     private static final Logger LOGGER =
-            LoggerFactory.getLogger(SparkSubmitJob.class);
-    private static final String REQUEST_BY_HEADER = "X-Requested-By";
+        LoggerFactory.getLogger(SparkSubmitJob.class);
+
     @Autowired
     private JobInstanceRepo jobInstanceRepo;
     @Autowired
@@ -85,7 +95,6 @@ public class SparkSubmitJob implements Job {
     private String livyUri;
     private List<SegmentPredicate> mPredicates;
     private JobInstanceBean jobInstance;
-    private RestTemplate restTemplate = new RestTemplate();
 
     @Override
     public void execute(JobExecutionContext context) {
@@ -108,8 +117,8 @@ public class SparkSubmitJob implements Job {
         }
     }
 
-    private void updateJobInstanceState(JobExecutionContext context) throws
-            IOException {
+    private void updateJobInstanceState(JobExecutionContext context)
+        throws IOException {
         SimpleTrigger simpleTrigger = (SimpleTrigger) context.getTrigger();
         int repeatCount = simpleTrigger.getRepeatCount();
         int fireCount = simpleTrigger.getTimesTriggered();
@@ -126,9 +135,10 @@ public class SparkSubmitJob implements Job {
         if (CollectionUtils.isEmpty(predicates)) {
             return true;
         }
+
         for (SegmentPredicate segPredicate : predicates) {
             Predicator predicator = PredicatorFactory
-                    .newPredicateInstance(segPredicate);
+                .newPredicateInstance(segPredicate);
             try {
                 if (predicator != null && !predicator.predicate()) {
                     return false;
@@ -136,7 +146,6 @@ public class SparkSubmitJob implements Job {
             } catch (Exception e) {
                 return false;
             }
-
         }
         return true;
     }
@@ -144,9 +153,9 @@ public class SparkSubmitJob implements Job {
     private void initParam(JobDetail jd) throws IOException {
         mPredicates = new ArrayList<>();
         jobInstance = jobInstanceRepo.findByPredicateName(jd.getJobDataMap()
-                .getString(PREDICATE_JOB_NAME));
+            .getString(PREDICATE_JOB_NAME));
         measure = toEntity(jd.getJobDataMap().getString(MEASURE_KEY),
-                GriffinMeasure.class);
+            GriffinMeasure.class);
         livyUri = env.getProperty("livy.uri");
         setPredicates(jd.getJobDataMap().getString(PREDICATES_KEY));
         // in order to keep metric name unique, we set job name
@@ -160,12 +169,13 @@ public class SparkSubmitJob implements Job {
             return;
         }
         List<SegmentPredicate> predicates = toEntity(json,
-                new TypeReference<List<SegmentPredicate>>() {
-                });
+            new TypeReference<List<SegmentPredicate>>() {
+            });
         if (predicates != null) {
             mPredicates.addAll(predicates);
         }
     }
+
     private String escapeCharacter(String str, String regex) {
         if (StringUtils.isEmpty(str)) {
             return str;
@@ -196,9 +206,8 @@ public class SparkSubmitJob implements Job {
         livyConfMap.put("args", args);
     }
 
-
     protected void saveJobInstance(JobDetail jd) throws SchedulerException,
-            IOException {
+        IOException {
         // If result is null, it may livy uri is wrong
         // or livy parameter is wrong.
         Map<String, Object> resultMap = post2LivyWithRetry();
@@ -211,13 +220,14 @@ public class SparkSubmitJob implements Job {
     }
 
     private Map<String, Object> post2LivyWithRetry()
-            throws IOException {
+        throws IOException {
         String result = post2Livy();
         Map<String, Object> resultMap = null;
         if (result != null) {
-            resultMap = livyTaskSubmitHelper.retryLivyGetAppId(result,appIdRetryCount);
+            resultMap = livyTaskSubmitHelper.retryLivyGetAppId(result, appIdRetryCount);
             if (resultMap != null) {
-                livyTaskSubmitHelper.increaseCurTaskNum(Long.valueOf(String.valueOf(resultMap.get("id"))).longValue());
+                livyTaskSubmitHelper.increaseCurTaskNum(Long.valueOf(
+                    String.valueOf(resultMap.get("id"))).longValue());
             }
         }
 
@@ -225,9 +235,10 @@ public class SparkSubmitJob implements Job {
     }
 
     protected void saveJobInstance(String result, State state)
-            throws IOException {
+        throws IOException {
         TypeReference<HashMap<String, Object>> type =
-                new TypeReference<HashMap<String, Object>>() {};
+            new TypeReference<HashMap<String, Object>>() {
+            };
         Map<String, Object> resultMap = null;
         if (result != null) {
             resultMap = toEntity(result, type);
@@ -244,9 +255,9 @@ public class SparkSubmitJob implements Job {
             Object id = resultMap.get("id");
             Object appId = resultMap.get("appId");
             jobInstance.setState(status == null ? null : State.valueOf(status
-                    .toString().toUpperCase()));
+                .toString().toUpperCase()));
             jobInstance.setSessionId(id == null ? null : Long.parseLong(id
-                    .toString()));
+                .toString()));
             jobInstance.setAppId(appId == null ? null : appId.toString());
         }
     }
diff --git a/service/src/main/java/org/apache/griffin/core/job/StreamingJobOperatorImpl.java b/service/src/main/java/org/apache/griffin/core/job/StreamingJobOperatorImpl.java
index e7ff758..e3fab06 100644
--- a/service/src/main/java/org/apache/griffin/core/job/StreamingJobOperatorImpl.java
+++ b/service/src/main/java/org/apache/griffin/core/job/StreamingJobOperatorImpl.java
@@ -60,7 +60,7 @@ import org.springframework.web.client.RestClientException;
 @Service
 public class StreamingJobOperatorImpl implements JobOperator {
     private static final Logger LOGGER = LoggerFactory
-            .getLogger(StreamingJobOperatorImpl.class);
+        .getLogger(StreamingJobOperatorImpl.class);
     @Autowired
     private StreamingJobRepo streamingJobRepo;
     @Autowired
@@ -84,7 +84,7 @@ public class StreamingJobOperatorImpl implements JobOperator {
     @Override
     @Transactional(rollbackFor = Exception.class)
     public AbstractJob add(AbstractJob job, GriffinMeasure measure) throws
-            Exception {
+        Exception {
         validateParams(job);
         String qName = jobService.getQuartzName(job);
         String qGroup = jobService.getQuartzGroup();
@@ -126,24 +126,24 @@ public class StreamingJobOperatorImpl implements JobOperator {
         /* Firstly you should check whether job is scheduled.
         If it is scheduled, triggers are empty. */
         List<? extends Trigger> triggers = jobService.getTriggers(
-                job.getName(),
-                job.getGroup());
+            job.getName(),
+            job.getGroup());
         if (!CollectionUtils.isEmpty(triggers)) {
             throw new GriffinException.BadRequestException
-                    (STREAMING_JOB_IS_RUNNING);
+                (STREAMING_JOB_IS_RUNNING);
         }
         /* Secondly you should check whether job instance is running. */
         List<JobInstanceBean> instances = instanceRepo.findByJobId(job.getId());
         instances.stream().filter(instance -> !instance.isDeleted()).forEach
-                (instance -> {
-                    State state = instance.getState();
-                    String quartzState = convert2QuartzState(state);
-                    if (!getStartStatus(quartzState)) {
-                        throw new GriffinException.BadRequestException
-                                (STREAMING_JOB_IS_RUNNING);
-                    }
-                    instance.setDeleted(true);
-                });
+            (instance -> {
+                State state = instance.getState();
+                String quartzState = convert2QuartzState(state);
+                if (!getStartStatus(quartzState)) {
+                    throw new GriffinException.BadRequestException
+                        (STREAMING_JOB_IS_RUNNING);
+                }
+                instance.setDeleted(true);
+            });
     }
 
 
@@ -174,7 +174,7 @@ public class StreamingJobOperatorImpl implements JobOperator {
     public JobState getState(AbstractJob job, String action) {
         JobState jobState = new JobState();
         List<JobInstanceBean> instances = instanceRepo
-                .findByJobId(job.getId());
+            .findByJobId(job.getId());
         for (JobInstanceBean instance : instances) {
             State state = instance.getState();
             if (!instance.isDeleted() && state != null) {
@@ -208,7 +208,7 @@ public class StreamingJobOperatorImpl implements JobOperator {
      * started
      */
     private boolean getStartStatus(String state) {
-        return !"NORMAL" .equals(state) && !"BLOCKED" .equals(state);
+        return !"NORMAL".equals(state) && !"BLOCKED".equals(state);
     }
 
     /**
@@ -219,15 +219,15 @@ public class StreamingJobOperatorImpl implements JobOperator {
      * stopped
      */
     private boolean getStopStatus(String state) {
-        return !"COMPLETE" .equals(state) && !"ERROR" .equals(state);
+        return !"COMPLETE".equals(state) && !"ERROR".equals(state);
     }
 
     private void deleteByLivy(JobInstanceBean instance) {
         Long sessionId = instance.getSessionId();
         if (sessionId == null) {
             LOGGER.warn("Session id of instance({},{}) is null.", instance
-                    .getPredicateGroup(), instance.getPredicateName
-                    ());
+                .getPredicateGroup(), instance.getPredicateName
+                ());
             return;
         }
         String url = livyUri + "/" + instance.getSessionId();
@@ -236,15 +236,15 @@ public class StreamingJobOperatorImpl implements JobOperator {
             livyTaskSubmitHelper.deleteByLivy(url);
 
             LOGGER.info("Job instance({}) has been deleted. {}", instance
-                    .getSessionId(), url);
+                .getSessionId(), url);
         } catch (ResourceAccessException e) {
             LOGGER.error("Your url may be wrong. Please check {}.\n {}",
-                    livyUri, e.getMessage());
+                livyUri, e.getMessage());
         } catch (RestClientException e) {
             LOGGER.warn("sessionId({}) appId({}) {}.", instance.getSessionId(),
-                    instance.getAppId(), e.getMessage());
+                instance.getAppId(), e.getMessage());
             YarnNetUtil.delete(env.getProperty("yarn.uri"),
-                    instance.getAppId());
+                instance.getAppId());
         }
     }
 
@@ -255,23 +255,23 @@ public class StreamingJobOperatorImpl implements JobOperator {
      *               job
      */
     private void stop(StreamingJob job, boolean delete) throws
-            SchedulerException {
+        SchedulerException {
         pauseJob(job);
         /* to prevent situation that streaming job is submitted
         before pause or when pausing. */
         List<JobInstanceBean> instances = instanceRepo
-                .findByJobId(job.getId());
+            .findByJobId(job.getId());
         instances.stream().filter(instance -> !instance.isDeleted())
-                .forEach(instance -> {
-                    State state = instance.getState();
-                    String quartzState = convert2QuartzState(state);
-                    if (getStopStatus(quartzState)) {
-                        deleteByLivy(instance);
-
-                    }
-                    instance.setState(STOPPED);
-                    instance.setDeleted(true);
-                });
+            .forEach(instance -> {
+                State state = instance.getState();
+                String quartzState = convert2QuartzState(state);
+                if (getStopStatus(quartzState)) {
+                    deleteByLivy(instance);
+
+                }
+                instance.setState(STOPPED);
+                instance.setDeleted(true);
+            });
         job.setDeleted(delete);
         streamingJobRepo.save(job);
     }
diff --git a/service/src/main/java/org/apache/griffin/core/job/entity/AbstractJob.java b/service/src/main/java/org/apache/griffin/core/job/entity/AbstractJob.java
index 660eb27..1c48adf 100644
--- a/service/src/main/java/org/apache/griffin/core/job/entity/AbstractJob.java
+++ b/service/src/main/java/org/apache/griffin/core/job/entity/AbstractJob.java
@@ -57,20 +57,20 @@ import org.slf4j.LoggerFactory;
 @Table(name = "job")
 @Inheritance(strategy = InheritanceType.SINGLE_TABLE)
 @JsonTypeInfo(use = JsonTypeInfo.Id.NAME, include = JsonTypeInfo.As.PROPERTY,
-        property = "job.type")
+    property = "job.type")
 @JsonSubTypes({@JsonSubTypes.Type(value = BatchJob.class, name = "batch"),
-        @JsonSubTypes.Type(
-                value = StreamingJob.class,
-                name = "streaming"),
-        @JsonSubTypes.Type(
-                value = VirtualJob.class,
-                name = "virtual")})
+    @JsonSubTypes.Type(
+        value = StreamingJob.class,
+        name = "streaming"),
+    @JsonSubTypes.Type(
+        value = VirtualJob.class,
+        name = "virtual")})
 @DiscriminatorColumn(name = "type")
 public abstract class AbstractJob extends AbstractAuditableEntity {
     private static final long serialVersionUID = 7569493377868453677L;
 
     private static final Logger LOGGER = LoggerFactory
-            .getLogger(AbstractJob.class);
+        .getLogger(AbstractJob.class);
 
     protected Long measureId;
 
@@ -107,7 +107,7 @@ public abstract class AbstractJob extends AbstractAuditableEntity {
 
     @NotNull
     @OneToMany(fetch = FetchType.EAGER, cascade = {CascadeType.PERSIST,
-            CascadeType.REMOVE, CascadeType.MERGE})
+        CascadeType.REMOVE, CascadeType.MERGE})
     @JoinColumn(name = "job_id")
     private List<JobDataSegment> segments = new ArrayList<>();
 
@@ -236,8 +236,8 @@ public abstract class AbstractJob extends AbstractAuditableEntity {
     public void load() throws IOException {
         if (!StringUtils.isEmpty(predicateConfig)) {
             this.configMap = JsonUtil.toEntity(predicateConfig,
-                    new TypeReference<Map<String, Object>>() {
-                    });
+                new TypeReference<Map<String, Object>>() {
+                });
         }
     }
 
@@ -265,7 +265,6 @@ public abstract class AbstractJob extends AbstractAuditableEntity {
         this.deleted = deleted;
     }
 
-
     AbstractJob(String jobName, Long measureId, String metricName) {
         this.jobName = jobName;
         this.measureId = measureId;
diff --git a/service/src/main/java/org/apache/griffin/core/job/entity/JobDataSegment.java b/service/src/main/java/org/apache/griffin/core/job/entity/JobDataSegment.java
index 2cc394f..2272e14 100644
--- a/service/src/main/java/org/apache/griffin/core/job/entity/JobDataSegment.java
+++ b/service/src/main/java/org/apache/griffin/core/job/entity/JobDataSegment.java
@@ -39,7 +39,7 @@ public class JobDataSegment extends AbstractAuditableEntity {
     private static final long serialVersionUID = -9056531122243340484L;
 
     private static final Logger LOGGER = LoggerFactory
-            .getLogger(JobDataSegment.class);
+        .getLogger(JobDataSegment.class);
 
     @NotNull
     private String dataConnectorName;
@@ -47,7 +47,7 @@ public class JobDataSegment extends AbstractAuditableEntity {
     private boolean asTsBaseline = false;
 
     @OneToOne(fetch = FetchType.EAGER, cascade = {CascadeType.PERSIST,
-            CascadeType.REMOVE, CascadeType.MERGE})
+        CascadeType.REMOVE, CascadeType.MERGE})
     @JoinColumn(name = "segment_range_id")
     private SegmentRange segmentRange = new SegmentRange();
 
@@ -77,7 +77,7 @@ public class JobDataSegment extends AbstractAuditableEntity {
     public void setDataConnectorName(String dataConnectorName) {
         if (StringUtils.isEmpty(dataConnectorName)) {
             LOGGER.warn(" Data connector name is invalid. " +
-                    "Please check your connector name.");
+                "Please check your connector name.");
             throw new NullPointerException();
         }
         this.dataConnectorName = dataConnectorName;
diff --git a/service/src/main/java/org/apache/griffin/core/job/entity/JobInstanceBean.java b/service/src/main/java/org/apache/griffin/core/job/entity/JobInstanceBean.java
index ef0b139..23b70a4 100644
--- a/service/src/main/java/org/apache/griffin/core/job/entity/JobInstanceBean.java
+++ b/service/src/main/java/org/apache/griffin/core/job/entity/JobInstanceBean.java
@@ -203,7 +203,7 @@ public class JobInstanceBean extends AbstractAuditableEntity {
         this.state = state;
         this.tms = tms;
         this.expireTms = expireTms;
-        this.appId=appId;
+        this.appId = appId;
     }
 
     public JobInstanceBean(State state, Long tms, Long expireTms) {
diff --git a/service/src/main/java/org/apache/griffin/core/job/entity/LivySessionStates.java b/service/src/main/java/org/apache/griffin/core/job/entity/LivySessionStates.java
index de122f5..2e892f6 100644
--- a/service/src/main/java/org/apache/griffin/core/job/entity/LivySessionStates.java
+++ b/service/src/main/java/org/apache/griffin/core/job/entity/LivySessionStates.java
@@ -127,7 +127,7 @@ public class LivySessionStates {
 
     public static boolean isActive(State state) {
         if (UNKNOWN.equals(state) || STOPPED.equals(state) || NOT_FOUND.equals
-                (state) || FOUND.equals(state)) {
+            (state) || FOUND.equals(state)) {
             // set UNKNOWN isActive() as false.
             return false;
         } else if (FINDING.equals(state)) {
@@ -143,8 +143,8 @@ public class LivySessionStates {
             return "COMPLETE";
         }
         if (UNKNOWN.equals(state) || NOT_FOUND.equals(state)
-                || FOUND.equals(state) || sessionState == null
-                || !sessionState.isActive()) {
+            || FOUND.equals(state) || sessionState == null
+            || !sessionState.isActive()) {
             return "ERROR";
         }
         return "NORMAL";
@@ -153,9 +153,9 @@ public class LivySessionStates {
 
     public static boolean isHealthy(State state) {
         return !(State.ERROR.equals(state) || State.DEAD.equals(state)
-                || State.SHUTTING_DOWN.equals(state)
-                || State.FINDING.equals(state)
-                || State.NOT_FOUND.equals(state)
-                || State.FOUND.equals(state));
+            || State.SHUTTING_DOWN.equals(state)
+            || State.FINDING.equals(state)
+            || State.NOT_FOUND.equals(state)
+            || State.FOUND.equals(state));
     }
 }
diff --git a/service/src/main/java/org/apache/griffin/core/job/entity/SegmentPredicate.java b/service/src/main/java/org/apache/griffin/core/job/entity/SegmentPredicate.java
index 138ab0f..843ad43 100644
--- a/service/src/main/java/org/apache/griffin/core/job/entity/SegmentPredicate.java
+++ b/service/src/main/java/org/apache/griffin/core/job/entity/SegmentPredicate.java
@@ -17,7 +17,6 @@ specific language governing permissions and limitations
 under the License.
 */
 
-
 package org.apache.griffin.core.job.entity;
 
 import com.fasterxml.jackson.annotation.JsonIgnore;
@@ -87,16 +86,16 @@ public class SegmentPredicate extends AbstractAuditableEntity {
     public void load() throws IOException {
         if (!StringUtils.isEmpty(config)) {
             this.configMap = JsonUtil.toEntity(config,
-                    new TypeReference<Map<String, Object>>() {
-                    });
+                new TypeReference<Map<String, Object>>() {
+                });
         }
     }
 
     public SegmentPredicate() {
     }
 
-    public SegmentPredicate(String type, Map<String, String> configMap) throws
-            JsonProcessingException {
+    public SegmentPredicate(String type, Map<String, String> configMap)
+        throws JsonProcessingException {
         this.type = type;
         this.config = JsonUtil.toJson(configMap);
     }
diff --git a/service/src/main/java/org/apache/griffin/core/job/entity/SegmentRange.java b/service/src/main/java/org/apache/griffin/core/job/entity/SegmentRange.java
index f9fcc72..a486f8c 100644
--- a/service/src/main/java/org/apache/griffin/core/job/entity/SegmentRange.java
+++ b/service/src/main/java/org/apache/griffin/core/job/entity/SegmentRange.java
@@ -17,7 +17,6 @@ specific language governing permissions and limitations
 under the License.
 */
 
-
 package org.apache.griffin.core.job.entity;
 
 import javax.persistence.Column;
@@ -35,7 +34,6 @@ public class SegmentRange extends AbstractAuditableEntity {
 
     private String length = "1h";
 
-
     public String getBegin() {
         return begin;
     }
diff --git a/service/src/main/java/org/apache/griffin/core/job/factory/AutowiringSpringBeanJobFactory.java b/service/src/main/java/org/apache/griffin/core/job/factory/AutowiringSpringBeanJobFactory.java
index 8bb3d60..78d1d78 100644
--- a/service/src/main/java/org/apache/griffin/core/job/factory/AutowiringSpringBeanJobFactory.java
+++ b/service/src/main/java/org/apache/griffin/core/job/factory/AutowiringSpringBeanJobFactory.java
@@ -27,10 +27,18 @@ import org.springframework.context.ApplicationContext;
 import org.springframework.context.ApplicationContextAware;
 import org.springframework.scheduling.quartz.SpringBeanJobFactory;
 
+/**
+ * Auto-wiring SpringBeanJobFactory is special  BeanJobFactory that adds auto-wiring support against
+ * {@link SpringBeanJobFactory} allowing you to inject properties from the scheduler context, job data map
+ * and trigger data entries into the job bean.
+ *
+ * @see SpringBeanJobFactory
+ * @see ApplicationContextAware
+ */
 public final class AutowiringSpringBeanJobFactory extends SpringBeanJobFactory
-        implements ApplicationContextAware {
+    implements ApplicationContextAware {
     private static final Logger LOGGER = LoggerFactory
-            .getLogger(AutowiringSpringBeanJobFactory.class);
+        .getLogger(AutowiringSpringBeanJobFactory.class);
 
     private transient AutowireCapableBeanFactory beanFactory;
 
@@ -41,12 +49,10 @@ public final class AutowiringSpringBeanJobFactory extends SpringBeanJobFactory
 
     @Override
     protected Object createJobInstance(final TriggerFiredBundle bundle) {
-
         try {
             final Object job = super.createJobInstance(bundle);
             beanFactory.autowireBean(job);
             return job;
-
         } catch (Exception e) {
             LOGGER.error("fail to create job instance. {}", e);
         }
diff --git a/service/src/main/java/org/apache/griffin/core/job/factory/PredicatorFactory.java b/service/src/main/java/org/apache/griffin/core/job/factory/PredicatorFactory.java
index 7671a42..7f16ef7 100644
--- a/service/src/main/java/org/apache/griffin/core/job/factory/PredicatorFactory.java
+++ b/service/src/main/java/org/apache/griffin/core/job/factory/PredicatorFactory.java
@@ -33,7 +33,7 @@ import org.slf4j.LoggerFactory;
 
 public class PredicatorFactory {
     private static final Logger LOGGER = LoggerFactory
-            .getLogger(PredicatorFactory.class);
+        .getLogger(PredicatorFactory.class);
 
     public static Predicator newPredicateInstance(SegmentPredicate segPredicate) {
         Predicator predicate;
@@ -63,7 +63,7 @@ public class PredicatorFactory {
             throw new GriffinException.ServiceException(message, e);
         } catch (NoSuchMethodException e) {
             String message = "For predicate with type " + predicateClassName +
-                    " constructor with parameter of type " + SegmentPredicate.class.getName() + " not found";
+                " constructor with parameter of type " + SegmentPredicate.class.getName() + " not found";
             LOGGER.error(message, e);
             throw new GriffinException.ServiceException(message, e);
         } catch (InstantiationException | IllegalAccessException | InvocationTargetException e) {
diff --git a/service/src/main/java/org/apache/griffin/core/job/repo/JobInstanceRepo.java b/service/src/main/java/org/apache/griffin/core/job/repo/JobInstanceRepo.java
index 218717a..4db291d 100644
--- a/service/src/main/java/org/apache/griffin/core/job/repo/JobInstanceRepo.java
+++ b/service/src/main/java/org/apache/griffin/core/job/repo/JobInstanceRepo.java
@@ -30,7 +30,7 @@ import org.springframework.data.repository.CrudRepository;
 import org.springframework.transaction.annotation.Transactional;
 
 public interface JobInstanceRepo
-        extends CrudRepository<JobInstanceBean, Long> {
+    extends CrudRepository<JobInstanceBean, Long> {
 
     JobInstanceBean findByPredicateName(String name);
 
@@ -48,7 +48,7 @@ public interface JobInstanceRepo
     @Transactional(rollbackFor = Exception.class)
     @Modifying
     @Query("delete from JobInstanceBean j " +
-            "where j.expireTms <= ?1 and j.deleted = false ")
+        "where j.expireTms <= ?1 and j.deleted = false ")
     int deleteByExpireTimestamp(Long expireTms);
 
     @Query("select DISTINCT s from JobInstanceBean s where s.state in ?1")
diff --git a/service/src/main/java/org/apache/griffin/core/job/repo/JobRepo.java b/service/src/main/java/org/apache/griffin/core/job/repo/JobRepo.java
index 75f8048..a9bfbe2 100644
--- a/service/src/main/java/org/apache/griffin/core/job/repo/JobRepo.java
+++ b/service/src/main/java/org/apache/griffin/core/job/repo/JobRepo.java
@@ -28,7 +28,7 @@ import org.springframework.data.repository.CrudRepository;
 public interface JobRepo<T extends AbstractJob> extends CrudRepository<T, Long> {
 
     @Query("select count(j) from #{#entityName} j " +
-            "where j.jobName = ?1 and j.deleted = ?2")
+        "where j.jobName = ?1 and j.deleted = ?2")
     int countByJobNameAndDeleted(String jobName, Boolean deleted);
 
     List<T> findByDeleted(boolean deleted);
diff --git a/service/src/main/java/org/apache/griffin/core/login/LoginController.java b/service/src/main/java/org/apache/griffin/core/login/LoginController.java
index 3d08551..51b4cae 100644
--- a/service/src/main/java/org/apache/griffin/core/login/LoginController.java
+++ b/service/src/main/java/org/apache/griffin/core/login/LoginController.java
@@ -37,7 +37,7 @@ public class LoginController {
 
     @RequestMapping(value = "/authenticate", method = RequestMethod.POST)
     public ResponseEntity<Map<String, Object>> login(
-            @RequestBody Map<String, String> map) {
+        @RequestBody Map<String, String> map) {
         return loginService.login(map);
     }
 }
diff --git a/service/src/main/java/org/apache/griffin/core/login/LoginService.java b/service/src/main/java/org/apache/griffin/core/login/LoginService.java
index 11c3a3d..3c78bb1 100644
--- a/service/src/main/java/org/apache/griffin/core/login/LoginService.java
+++ b/service/src/main/java/org/apache/griffin/core/login/LoginService.java
@@ -23,6 +23,14 @@ import java.util.Map;
 
 import org.springframework.http.ResponseEntity;
 
+/**
+ * LoginService defines an abstract validation method for login action, you can implement
+ * it to customize authentication business.
+ *
+ * @see org.apache.griffin.core.config.LoginConfig
+ * @see LoginServiceDefaultImpl
+ * @see LoginServiceLdapImpl
+ */
 public interface LoginService {
 
     ResponseEntity<Map<String, Object>> login(Map<String, String> map);
diff --git a/service/src/main/java/org/apache/griffin/core/login/LoginServiceDefaultImpl.java b/service/src/main/java/org/apache/griffin/core/login/LoginServiceDefaultImpl.java
index fc54ea4..fc4041c 100644
--- a/service/src/main/java/org/apache/griffin/core/login/LoginServiceDefaultImpl.java
+++ b/service/src/main/java/org/apache/griffin/core/login/LoginServiceDefaultImpl.java
@@ -34,10 +34,9 @@ public class LoginServiceDefaultImpl implements LoginService {
         if (StringUtils.isBlank(username)) {
             username = "Anonymous";
         }
-        String fullName = username;
         Map<String, Object> message = new HashMap<>();
         message.put("ntAccount", username);
-        message.put("fullName", fullName);
+        message.put("fullName", username);
         message.put("status", 0);
         return new ResponseEntity<>(message, HttpStatus.OK);
     }
diff --git a/service/src/main/java/org/apache/griffin/core/login/LoginServiceLdapImpl.java b/service/src/main/java/org/apache/griffin/core/login/LoginServiceLdapImpl.java
index 56f6765..1389d12 100644
--- a/service/src/main/java/org/apache/griffin/core/login/LoginServiceLdapImpl.java
+++ b/service/src/main/java/org/apache/griffin/core/login/LoginServiceLdapImpl.java
@@ -43,10 +43,10 @@ import org.springframework.http.ResponseEntity;
 
 public class LoginServiceLdapImpl implements LoginService {
     private static final Logger LOGGER = LoggerFactory.getLogger
-            (LoginServiceLdapImpl.class);
+        (LoginServiceLdapImpl.class);
 
     private static final String LDAP_FACTORY =
-            "com.sun.jndi.ldap.LdapCtxFactory";
+        "com.sun.jndi.ldap.LdapCtxFactory";
 
     private String url;
     private String email;
@@ -86,7 +86,7 @@ public class LoginServiceLdapImpl implements LoginService {
             ctx = getContextInstance(toPrincipal(bindAccount), bindPassword);
 
             NamingEnumeration<SearchResult> results = ctx.search(searchBase,
-                    searchFilter, searchControls);
+                searchFilter, searchControls);
             SearchResult userObject = getSingleUser(results);
 
             // verify password if different bind user is used
@@ -102,7 +102,7 @@ public class LoginServiceLdapImpl implements LoginService {
             return new ResponseEntity<>(message, HttpStatus.OK);
         } catch (AuthenticationException e) {
             LOGGER.warn("User {} failed to login with LDAP auth. {}", username,
-                    e.getMessage());
+                e.getMessage());
         } catch (NamingException e) {
             LOGGER.warn(String.format("User %s failed to login with LDAP auth.", username), e);
         } finally {
@@ -129,7 +129,7 @@ public class LoginServiceLdapImpl implements LoginService {
         if (results.hasMoreElements()) {
             SearchResult second = results.nextElement();
             throw new NamingException(String.format("Ambiguous search, found two users: %s, %s",
-                    result.getNameInNamespace(), second.getNameInNamespace()));
+                result.getNameInNamespace(), second.getNameInNamespace()));
         }
         return result;
     }
@@ -161,7 +161,7 @@ public class LoginServiceLdapImpl implements LoginService {
             }
         } catch (NamingException e) {
             LOGGER.warn("User {} successfully login with LDAP auth, " +
-                    "but failed to get full name.", ntAccount);
+                "but failed to get full name.", ntAccount);
             return ntAccount;
         }
     }
@@ -175,7 +175,7 @@ public class LoginServiceLdapImpl implements LoginService {
     }
 
     private LdapContext getContextInstance(String principal, String password)
-            throws NamingException {
+        throws NamingException {
         Hashtable<String, String> ht = new Hashtable<>();
         ht.put(Context.INITIAL_CONTEXT_FACTORY, LDAP_FACTORY);
         ht.put(Context.PROVIDER_URL, url);
diff --git a/service/src/main/java/org/apache/griffin/core/login/ldap/SelfSignedSocketFactory.java b/service/src/main/java/org/apache/griffin/core/login/ldap/SelfSignedSocketFactory.java
index 187365f..028d878 100644
--- a/service/src/main/java/org/apache/griffin/core/login/ldap/SelfSignedSocketFactory.java
+++ b/service/src/main/java/org/apache/griffin/core/login/ldap/SelfSignedSocketFactory.java
@@ -35,7 +35,7 @@ import org.apache.griffin.core.exception.GriffinException;
 
 /**
  * SocketFactory ignoring insecure (self-signed, expired) certificates.
- *
+ * <p>
  * Maintains internal {@code SSLSocketFactory} configured with {@code NoopTrustManager}.
  * All SocketFactory methods are proxied to internal SSLSocketFactory instance.
  * Accepts all client and server certificates, from any issuers.
diff --git a/service/src/main/java/org/apache/griffin/core/measure/ExternalMeasureOperatorImpl.java b/service/src/main/java/org/apache/griffin/core/measure/ExternalMeasureOperatorImpl.java
index c941f28..dda373a 100644
--- a/service/src/main/java/org/apache/griffin/core/measure/ExternalMeasureOperatorImpl.java
+++ b/service/src/main/java/org/apache/griffin/core/measure/ExternalMeasureOperatorImpl.java
@@ -55,9 +55,9 @@ public class ExternalMeasureOperatorImpl implements MeasureOperator {
         ExternalMeasure latestMeasure = (ExternalMeasure) measure;
         validateMeasure(latestMeasure);
         ExternalMeasure originMeasure = measureRepo.findOne(
-                latestMeasure.getId());
+            latestMeasure.getId());
         VirtualJob vj = genVirtualJob(latestMeasure,
-                originMeasure.getVirtualJob());
+            originMeasure.getVirtualJob());
         latestMeasure.setVirtualJob(vj);
         measure = measureRepo.save(latestMeasure);
         return measure;
diff --git a/service/src/main/java/org/apache/griffin/core/measure/MeasureController.java b/service/src/main/java/org/apache/griffin/core/measure/MeasureController.java
index 3c5e0b0..a6921bf 100644
--- a/service/src/main/java/org/apache/griffin/core/measure/MeasureController.java
+++ b/service/src/main/java/org/apache/griffin/core/measure/MeasureController.java
@@ -42,7 +42,7 @@ public class MeasureController {
 
     @RequestMapping(value = "/measures", method = RequestMethod.GET)
     public List<? extends Measure> getAllAliveMeasures(@RequestParam(value =
-            "type", defaultValue = "") String type) {
+        "type", defaultValue = "") String type) {
         return measureService.getAllAliveMeasures(type);
     }
 
@@ -54,7 +54,7 @@ public class MeasureController {
     @RequestMapping(value = "/measures/{id}", method = RequestMethod.DELETE)
     @ResponseStatus(HttpStatus.NO_CONTENT)
     public void deleteMeasureById(@PathVariable("id") Long id) throws
-            SchedulerException {
+        SchedulerException {
         measureService.deleteMeasureById(id);
     }
 
@@ -71,7 +71,7 @@ public class MeasureController {
     }
 
     @RequestMapping(value = "/measures/owner/{owner}", method =
-            RequestMethod.GET)
+        RequestMethod.GET)
     public List<Measure> getAliveMeasuresByOwner(@PathVariable("owner")
                                                  @Valid String owner) {
         return measureService.getAliveMeasuresByOwner(owner);
diff --git a/service/src/main/java/org/apache/griffin/core/measure/MeasureOrgService.java b/service/src/main/java/org/apache/griffin/core/measure/MeasureOrgService.java
index accf1a5..4801ec2 100644
--- a/service/src/main/java/org/apache/griffin/core/measure/MeasureOrgService.java
+++ b/service/src/main/java/org/apache/griffin/core/measure/MeasureOrgService.java
@@ -32,6 +32,6 @@ public interface MeasureOrgService {
 
     Map<String, Map<String, List<Map<String, Object>>>>
     getMeasureWithJobDetailsGroupByOrg(Map<String,
-            List<Map<String, Object>>>
-                                               jobDetailsGroupByMeasure);
+        List<Map<String, Object>>>
+                                           jobDetailsGroupByMeasure);
 }
diff --git a/service/src/main/java/org/apache/griffin/core/measure/MeasureOrgServiceImpl.java b/service/src/main/java/org/apache/griffin/core/measure/MeasureOrgServiceImpl.java
index 64ec06a..13344c9 100644
--- a/service/src/main/java/org/apache/griffin/core/measure/MeasureOrgServiceImpl.java
+++ b/service/src/main/java/org/apache/griffin/core/measure/MeasureOrgServiceImpl.java
@@ -50,7 +50,7 @@ public class MeasureOrgServiceImpl implements MeasureOrgService {
         List<String> orgs = measureRepo.findNameByOrganization(org, false);
         if (CollectionUtils.isEmpty(orgs)) {
             throw new GriffinException.NotFoundException
-                    (ORGANIZATION_NAME_DOES_NOT_EXIST);
+                (ORGANIZATION_NAME_DOES_NOT_EXIST);
         }
         return orgs;
     }
@@ -64,7 +64,7 @@ public class MeasureOrgServiceImpl implements MeasureOrgService {
             orgName = orgName == null ? "null" : orgName;
             String measureName = measure.getName();
             List<String> measureList = orgWithMetricsMap.getOrDefault(orgName,
-                    new ArrayList<>());
+                new ArrayList<>());
             measureList.add(measureName);
             orgWithMetricsMap.put(orgName, measureList);
         }
@@ -74,9 +74,9 @@ public class MeasureOrgServiceImpl implements MeasureOrgService {
     @Override
     public Map<String, Map<String, List<Map<String, Object>>>>
     getMeasureWithJobDetailsGroupByOrg(Map<String,
-            List<Map<String, Object>>> jobDetails) {
+        List<Map<String, Object>>> jobDetails) {
         Map<String, Map<String, List<Map<String, Object>>>> result =
-                new HashMap<>();
+            new HashMap<>();
         List<GriffinMeasure> measures = measureRepo.findByDeleted(false);
         if (measures == null) {
             return null;
@@ -86,9 +86,9 @@ public class MeasureOrgServiceImpl implements MeasureOrgService {
             String measureName = measure.getName();
             String measureId = measure.getId().toString();
             List<Map<String, Object>> jobList = jobDetails
-                    .getOrDefault(measureId, new ArrayList<>());
+                .getOrDefault(measureId, new ArrayList<>());
             Map<String, List<Map<String, Object>>> measureWithJobs = result
-                    .getOrDefault(orgName, new HashMap<>());
+                .getOrDefault(orgName, new HashMap<>());
             measureWithJobs.put(measureName, jobList);
             result.put(orgName, measureWithJobs);
         }
diff --git a/service/src/main/java/org/apache/griffin/core/measure/MeasureServiceImpl.java b/service/src/main/java/org/apache/griffin/core/measure/MeasureServiceImpl.java
index 1ea945e..97e8117 100644
--- a/service/src/main/java/org/apache/griffin/core/measure/MeasureServiceImpl.java
+++ b/service/src/main/java/org/apache/griffin/core/measure/MeasureServiceImpl.java
@@ -45,7 +45,7 @@ import org.springframework.util.CollectionUtils;
 @Service
 public class MeasureServiceImpl implements MeasureService {
     private static final Logger LOGGER = LoggerFactory
-            .getLogger(MeasureServiceImpl.class);
+        .getLogger(MeasureServiceImpl.class);
     private static final String GRIFFIN = "griffin";
     private static final String EXTERNAL = "external";
 
@@ -77,7 +77,7 @@ public class MeasureServiceImpl implements MeasureService {
         Measure measure = measureRepo.findByIdAndDeleted(id, false);
         if (measure == null) {
             throw new GriffinException
-                    .NotFoundException(MEASURE_ID_DOES_NOT_EXIST);
+                .NotFoundException(MEASURE_ID_DOES_NOT_EXIST);
         }
         return measure;
     }
@@ -90,12 +90,12 @@ public class MeasureServiceImpl implements MeasureService {
     @Override
     public Measure createMeasure(Measure measure) {
         List<Measure> aliveMeasureList = measureRepo
-                .findByNameAndDeleted(measure.getName(), false);
+            .findByNameAndDeleted(measure.getName(), false);
         if (!CollectionUtils.isEmpty(aliveMeasureList)) {
             LOGGER.warn("Failed to create new measure {}, it already exists.",
-                    measure.getName());
+                measure.getName());
             throw new GriffinException.ConflictException(
-                    MEASURE_NAME_ALREADY_EXIST);
+                MEASURE_NAME_ALREADY_EXIST);
         }
         MeasureOperator op = getOperation(measure);
         return op.create(measure);
@@ -106,12 +106,12 @@ public class MeasureServiceImpl implements MeasureService {
         Measure m = measureRepo.findByIdAndDeleted(measure.getId(), false);
         if (m == null) {
             throw new GriffinException.NotFoundException(
-                    MEASURE_ID_DOES_NOT_EXIST);
+                MEASURE_ID_DOES_NOT_EXIST);
         }
         if (!m.getType().equals(measure.getType())) {
             LOGGER.warn("Can't update measure to different type.");
             throw new GriffinException.BadRequestException(
-                    MEASURE_TYPE_DOES_NOT_MATCH);
+                MEASURE_TYPE_DOES_NOT_MATCH);
         }
         MeasureOperator op = getOperation(measure);
         return op.update(measure);
@@ -122,7 +122,7 @@ public class MeasureServiceImpl implements MeasureService {
         Measure measure = measureRepo.findByIdAndDeleted(measureId, false);
         if (measure == null) {
             throw new GriffinException.NotFoundException(
-                    MEASURE_ID_DOES_NOT_EXIST);
+                MEASURE_ID_DOES_NOT_EXIST);
         }
         MeasureOperator op = getOperation(measure);
         op.delete(measure);
@@ -144,7 +144,7 @@ public class MeasureServiceImpl implements MeasureService {
             return externalOp;
         }
         throw new GriffinException.BadRequestException(
-                MEASURE_TYPE_DOES_NOT_SUPPORT);
+            MEASURE_TYPE_DOES_NOT_SUPPORT);
     }
 
 }
diff --git a/service/src/main/java/org/apache/griffin/core/measure/entity/AbstractAuditableEntity.java b/service/src/main/java/org/apache/griffin/core/measure/entity/AbstractAuditableEntity.java
index cb5a399..5bcf9fa 100644
--- a/service/src/main/java/org/apache/griffin/core/measure/entity/AbstractAuditableEntity.java
+++ b/service/src/main/java/org/apache/griffin/core/measure/entity/AbstractAuditableEntity.java
@@ -28,6 +28,10 @@ import javax.persistence.GenerationType;
 import javax.persistence.Id;
 import javax.persistence.MappedSuperclass;
 
+/**
+ * AbstractAuditableEntity is base entity class definition in Apache Griffin, all
+ * {@link javax.persistence.Entity} classes should extend it.
+ */
 @MappedSuperclass
 public abstract class AbstractAuditableEntity implements Serializable {
 
diff --git a/service/src/main/java/org/apache/griffin/core/measure/entity/DataSource.java b/service/src/main/java/org/apache/griffin/core/measure/entity/DataSource.java
index f0a0038..970977b 100644
--- a/service/src/main/java/org/apache/griffin/core/measure/entity/DataSource.java
+++ b/service/src/main/java/org/apache/griffin/core/measure/entity/DataSource.java
@@ -52,7 +52,7 @@ public class DataSource extends AbstractAuditableEntity {
     private String name;
 
     @OneToMany(fetch = FetchType.EAGER, cascade = {CascadeType.PERSIST,
-            CascadeType.REMOVE, CascadeType.MERGE})
+        CascadeType.REMOVE, CascadeType.MERGE})
     @JoinColumn(name = "data_source_id")
     private List<DataConnector> connectors = new ArrayList<>();
 
diff --git a/service/src/main/java/org/apache/griffin/core/measure/entity/EvaluateRule.java b/service/src/main/java/org/apache/griffin/core/measure/entity/EvaluateRule.java
index 9584800..3107852 100644
--- a/service/src/main/java/org/apache/griffin/core/measure/entity/EvaluateRule.java
+++ b/service/src/main/java/org/apache/griffin/core/measure/entity/EvaluateRule.java
@@ -34,7 +34,7 @@ public class EvaluateRule extends AbstractAuditableEntity {
     private static final long serialVersionUID = 4240072518233967528L;
 
     @OneToMany(fetch = FetchType.EAGER, cascade = {CascadeType.PERSIST,
-            CascadeType.REMOVE, CascadeType.MERGE})
+        CascadeType.REMOVE, CascadeType.MERGE})
     @JoinColumn(name = "evaluate_rule_id")
     private List<Rule> rules = new ArrayList<>();
 
diff --git a/service/src/main/java/org/apache/griffin/core/measure/entity/Measure.java b/service/src/main/java/org/apache/griffin/core/measure/entity/Measure.java
index f75e9d6..9949949 100644
--- a/service/src/main/java/org/apache/griffin/core/measure/entity/Measure.java
+++ b/service/src/main/java/org/apache/griffin/core/measure/entity/Measure.java
@@ -47,10 +47,10 @@ import org.apache.griffin.core.util.JsonUtil;
 @Entity
 @Inheritance(strategy = InheritanceType.JOINED)
 @JsonTypeInfo(use = JsonTypeInfo.Id.NAME, include = JsonTypeInfo.As.PROPERTY,
-        property = "measure.type")
+    property = "measure.type")
 @JsonSubTypes({
-        @JsonSubTypes.Type(value = GriffinMeasure.class, name = "griffin"),
-        @JsonSubTypes.Type(value = ExternalMeasure.class, name = "external")})
+    @JsonSubTypes.Type(value = GriffinMeasure.class, name = "griffin"),
+    @JsonSubTypes.Type(value = ExternalMeasure.class, name = "external")})
 public abstract class Measure extends AbstractAuditableEntity {
     private static final long serialVersionUID = -4748881017029815714L;
 
diff --git a/service/src/main/java/org/apache/griffin/core/measure/entity/Rule.java b/service/src/main/java/org/apache/griffin/core/measure/entity/Rule.java
index 3792e4d..de3a9b2 100644
--- a/service/src/main/java/org/apache/griffin/core/measure/entity/Rule.java
+++ b/service/src/main/java/org/apache/griffin/core/measure/entity/Rule.java
@@ -186,12 +186,12 @@ public class Rule extends AbstractAuditableEntity {
         if (!StringUtils.isEmpty(details)) {
             this.detailsMap = JsonUtil.toEntity(
                 details, new TypeReference<Map<String, Object>>() {
-            });
+                });
         }
         if (!StringUtils.isEmpty(out)) {
             this.outList = JsonUtil.toEntity(
                 out, new TypeReference<List<Map<String, Object>>>() {
-            });
+                });
         }
     }
 
diff --git a/service/src/main/java/org/apache/griffin/core/measure/entity/StreamingPreProcess.java b/service/src/main/java/org/apache/griffin/core/measure/entity/StreamingPreProcess.java
index 963304b..1387c7e 100644
--- a/service/src/main/java/org/apache/griffin/core/measure/entity/StreamingPreProcess.java
+++ b/service/src/main/java/org/apache/griffin/core/measure/entity/StreamingPreProcess.java
@@ -122,8 +122,8 @@ public class StreamingPreProcess extends AbstractAuditableEntity {
     public void load() throws IOException {
         if (!StringUtils.isEmpty(details)) {
             this.detailsMap = JsonUtil.toEntity(details,
-                    new TypeReference<Map<String, Object>>() {
-                    });
+                new TypeReference<Map<String, Object>>() {
+                });
         }
     }
 
diff --git a/service/src/main/java/org/apache/griffin/core/measure/repo/MeasureRepo.java b/service/src/main/java/org/apache/griffin/core/measure/repo/MeasureRepo.java
index eb24b1d..a96415c 100644
--- a/service/src/main/java/org/apache/griffin/core/measure/repo/MeasureRepo.java
+++ b/service/src/main/java/org/apache/griffin/core/measure/repo/MeasureRepo.java
@@ -32,7 +32,7 @@ import org.springframework.data.repository.CrudRepository;
  * @param <T> Measure and its subclass
  */
 public interface MeasureRepo<T extends Measure>
-        extends CrudRepository<T, Long> {
+    extends CrudRepository<T, Long> {
 
     /**
      * search repository by name and deletion state
@@ -76,7 +76,7 @@ public interface MeasureRepo<T extends Measure>
      * @return organization collection
      */
     @Query("select DISTINCT m.organization from #{#entityName} m "
-            + "where m.deleted = ?1 and m.organization is not null")
+        + "where m.deleted = ?1 and m.organization is not null")
     List<String> findOrganizations(Boolean deleted);
 
     /**
@@ -87,6 +87,6 @@ public interface MeasureRepo<T extends Measure>
      * @return organization collection
      */
     @Query("select m.name from #{#entityName} m "
-            + "where m.organization= ?1 and m.deleted= ?2")
+        + "where m.organization= ?1 and m.deleted= ?2")
     List<String> findNameByOrganization(String organization, Boolean deleted);
 }
diff --git a/service/src/main/java/org/apache/griffin/core/metastore/hive/HiveMetaStoreProxy.java b/service/src/main/java/org/apache/griffin/core/metastore/hive/HiveMetaStoreProxy.java
index adb71e9..f9c1236 100644
--- a/service/src/main/java/org/apache/griffin/core/metastore/hive/HiveMetaStoreProxy.java
+++ b/service/src/main/java/org/apache/griffin/core/metastore/hive/HiveMetaStoreProxy.java
@@ -33,7 +33,7 @@ import org.springframework.stereotype.Component;
 @Component
 public class HiveMetaStoreProxy {
     private static final Logger LOGGER = LoggerFactory
-            .getLogger(HiveMetaStoreProxy.class);
+        .getLogger(HiveMetaStoreProxy.class);
 
     @Value("${hive.metastore.uris}")
     private String uris;
@@ -60,7 +60,7 @@ public class HiveMetaStoreProxy {
         HiveConf hiveConf = new HiveConf();
         hiveConf.set("hive.metastore.local", "false");
         hiveConf.setIntVar(HiveConf.ConfVars.METASTORETHRIFTCONNECTIONRETRIES,
-                3);
+            3);
         hiveConf.setVar(HiveConf.ConfVars.METASTOREURIS, uris);
         hiveConf.setIntVar(HiveConf.ConfVars.HMSHANDLERATTEMPTS, attempts);
         hiveConf.setVar(HiveConf.ConfVars.HMSHANDLERINTERVAL, interval);
diff --git a/service/src/main/java/org/apache/griffin/core/metastore/hive/HiveMetaStoreServiceImpl.java b/service/src/main/java/org/apache/griffin/core/metastore/hive/HiveMetaStoreServiceImpl.java
index a12dc08..d855183 100644
--- a/service/src/main/java/org/apache/griffin/core/metastore/hive/HiveMetaStoreServiceImpl.java
+++ b/service/src/main/java/org/apache/griffin/core/metastore/hive/HiveMetaStoreServiceImpl.java
@@ -47,7 +47,7 @@ import org.springframework.util.StringUtils;
 public class HiveMetaStoreServiceImpl implements HiveMetaStoreService {
 
     private static final Logger LOGGER = LoggerFactory
-            .getLogger(HiveMetaStoreService.class);
+        .getLogger(HiveMetaStoreService.class);
 
     @Autowired
     private IMetaStoreClient client = null;
@@ -70,7 +70,7 @@ public class HiveMetaStoreServiceImpl implements HiveMetaStoreService {
         try {
             if (client == null) {
                 LOGGER.warn("Hive client is null. " +
-                        "Please check your hive config.");
+                    "Please check your hive config.");
                 return new ArrayList<>();
             }
             results = client.getAllDatabases();
@@ -89,7 +89,7 @@ public class HiveMetaStoreServiceImpl implements HiveMetaStoreService {
         try {
             if (client == null) {
                 LOGGER.warn("Hive client is null. " +
-                        "Please check your hive config.");
+                    "Please check your hive config.");
                 return new ArrayList<>();
             }
             results = client.getAllTables(getUseDbName(dbName));
@@ -112,7 +112,7 @@ public class HiveMetaStoreServiceImpl implements HiveMetaStoreService {
     @Cacheable(unless = "#result==null || #result.isEmpty()")
     public Map<String, List<String>> getAllTableNames() {
         Map<String, List<String>> result = new HashMap<>();
-        for (String dbName: getAllDatabases()) {
+        for (String dbName : getAllDatabases()) {
             result.put(dbName, Lists.newArrayList(getAllTableNames(dbName)));
         }
         return result;
@@ -143,30 +143,30 @@ public class HiveMetaStoreServiceImpl implements HiveMetaStoreService {
 
 
     @Override
-    @Cacheable(unless="#result==null")
+    @Cacheable(unless = "#result==null")
     public Table getTable(String dbName, String tableName) {
         Table result = null;
         try {
             if (client == null) {
                 LOGGER.warn("Hive client is null. " +
-                        "Please check your hive config.");
+                    "Please check your hive config.");
                 return null;
             }
             result = client.getTable(getUseDbName(dbName), tableName);
         } catch (Exception e) {
             reconnect();
             LOGGER.error("Exception fetching table info : {}. {}", tableName,
-                    e);
+                e);
         }
         return result;
     }
 
     @Scheduled(fixedRateString =
-            "${cache.evict.hive.fixedRate.in.milliseconds}")
+        "${cache.evict.hive.fixedRate.in.milliseconds}")
     @CacheEvict(
-            cacheNames = "hive",
-            allEntries = true,
-            beforeInvocation = true)
+        cacheNames = "hive",
+        allEntries = true,
+        beforeInvocation = true)
     public void evictHiveCache() {
         LOGGER.info("Evict hive cache");
         // TODO: calls within same bean are not cached -- this call is not populating anything
@@ -182,7 +182,7 @@ public class HiveMetaStoreServiceImpl implements HiveMetaStoreService {
         try {
             if (client == null) {
                 LOGGER.warn("Hive client is null. " +
-                        "Please check your hive config.");
+                    "Please check your hive config.");
                 return allTables;
             }
             Iterable<String> tables = client.getAllTables(useDbName);
diff --git a/service/src/main/java/org/apache/griffin/core/metastore/hive/HiveMetaStoreServiceJdbcImpl.java b/service/src/main/java/org/apache/griffin/core/metastore/hive/HiveMetaStoreServiceJdbcImpl.java
index dcc83e7..0df028f 100644
--- a/service/src/main/java/org/apache/griffin/core/metastore/hive/HiveMetaStoreServiceJdbcImpl.java
+++ b/service/src/main/java/org/apache/griffin/core/metastore/hive/HiveMetaStoreServiceJdbcImpl.java
@@ -55,7 +55,7 @@ import org.springframework.stereotype.Service;
 public class HiveMetaStoreServiceJdbcImpl implements HiveMetaStoreService {
 
     private static final Logger LOGGER = LoggerFactory
-            .getLogger(HiveMetaStoreService.class);
+        .getLogger(HiveMetaStoreService.class);
 
     private static final String SHOW_TABLES_IN = "show tables in ";
 
@@ -190,17 +190,18 @@ public class HiveMetaStoreServiceJdbcImpl implements HiveMetaStoreService {
     }
 
     @Scheduled(fixedRateString =
-            "${cache.evict.hive.fixedRate.in.milliseconds}")
+        "${cache.evict.hive.fixedRate.in.milliseconds}")
     @CacheEvict(
-            cacheNames = "jdbcHive",
-            allEntries = true,
-            beforeInvocation = true)
+        cacheNames = "jdbcHive",
+        allEntries = true,
+        beforeInvocation = true)
     public void evictHiveCache() {
         LOGGER.info("Evict hive cache");
     }
 
     /**
      * Query Hive for Show tables or show databases, which will return List of String
+     *
      * @param sql sql string
      * @return
      */
@@ -248,6 +249,7 @@ public class HiveMetaStoreServiceJdbcImpl implements HiveMetaStoreService {
 
     /**
      * Get the Hive table location from hive table metadata string
+     *
      * @param tableMetadata hive table metadata string
      * @return Hive table location
      */
@@ -271,28 +273,28 @@ public class HiveMetaStoreServiceJdbcImpl implements HiveMetaStoreService {
     /**
      * Get the Hive table schema: column name, column type, column comment
      * The input String looks like following:
-     *
+     * <p>
      * CREATE TABLE `employee`(
-     *   `eid` int,
-     *   `name` string,
-     *   `salary` string,
-     *   `destination` string)
+     * `eid` int,
+     * `name` string,
+     * `salary` string,
+     * `destination` string)
      * COMMENT 'Employee details'
      * ROW FORMAT SERDE
-     *   'org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe'
+     * 'org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe'
      * WITH SERDEPROPERTIES (
-     *   'field.delim'='\t',
-     *   'line.delim'='\n',
-     *   'serialization.format'='\t')
+     * 'field.delim'='\t',
+     * 'line.delim'='\n',
+     * 'serialization.format'='\t')
      * STORED AS INPUTFORMAT
-     *   'org.apache.hadoop.mapred.TextInputFormat'
+     * 'org.apache.hadoop.mapred.TextInputFormat'
      * OUTPUTFORMAT
-     *   'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat'
+     * 'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat'
      * LOCATION
-     *   'file:/user/hive/warehouse/employee'
+     * 'file:/user/hive/warehouse/employee'
      * TBLPROPERTIES (
-     *   'bucketing_version'='2',
-     *   'transient_lastDdlTime'='1562086077')
+     * 'bucketing_version'='2',
+     * 'transient_lastDdlTime'='1562086077')
      *
      * @param tableMetadata hive table metadata string
      * @return List of FieldSchema
@@ -316,9 +318,9 @@ public class HiveMetaStoreServiceJdbcImpl implements HiveMetaStoreService {
 
     /**
      * Parse one column string
-     *
+     * <p>
      * Input example:
-     *  `merch_date` string COMMENT 'this is merch process date'
+     * `merch_date` string COMMENT 'this is merch process date'
      *
      * @param colStr column string
      * @return
diff --git a/service/src/main/java/org/apache/griffin/core/metastore/kafka/KafkaSchemaController.java b/service/src/main/java/org/apache/griffin/core/metastore/kafka/KafkaSchemaController.java
index 732a59e..63df077 100644
--- a/service/src/main/java/org/apache/griffin/core/metastore/kafka/KafkaSchemaController.java
+++ b/service/src/main/java/org/apache/griffin/core/metastore/kafka/KafkaSchemaController.java
@@ -48,7 +48,7 @@ public class KafkaSchemaController {
 
     @RequestMapping(value = "/versions", method = RequestMethod.GET)
     public Iterable<Integer> getSubjectVersions(
-            @RequestParam("subject") String subject) {
+        @RequestParam("subject") String subject) {
         return kafkaSchemaService.getSubjectVersions(subject);
     }
 
diff --git a/service/src/main/java/org/apache/griffin/core/metastore/kafka/KafkaSchemaServiceImpl.java b/service/src/main/java/org/apache/griffin/core/metastore/kafka/KafkaSchemaServiceImpl.java
index ba6bba2..89b8985 100644
--- a/service/src/main/java/org/apache/griffin/core/metastore/kafka/KafkaSchemaServiceImpl.java
+++ b/service/src/main/java/org/apache/griffin/core/metastore/kafka/KafkaSchemaServiceImpl.java
@@ -36,7 +36,7 @@ import org.springframework.web.client.RestTemplate;
 public class KafkaSchemaServiceImpl implements KafkaSchemaService {
 
     private static final Logger log = LoggerFactory
-            .getLogger(KafkaSchemaServiceImpl.class);
+        .getLogger(KafkaSchemaServiceImpl.class);
 
     @Value("${kafka.schema.registry.url}")
     private String url;
@@ -59,7 +59,7 @@ public class KafkaSchemaServiceImpl implements KafkaSchemaService {
         String path = "/schemas/ids/" + id;
         String regUrl = registryUrl(path);
         ResponseEntity<SchemaString> res = restTemplate.getForEntity(regUrl,
-                SchemaString.class);
+            SchemaString.class);
         SchemaString result = res.getBody();
         return result;
     }
@@ -69,7 +69,7 @@ public class KafkaSchemaServiceImpl implements KafkaSchemaService {
         String path = "/subjects";
         String regUrl = registryUrl(path);
         ResponseEntity<String[]> res = restTemplate.getForEntity(regUrl,
-                String[].class);
+            String[].class);
         Iterable<String> result = Arrays.asList(res.getBody());
         return result;
     }
@@ -79,7 +79,7 @@ public class KafkaSchemaServiceImpl implements KafkaSchemaService {
         String path = "/subjects/" + subject + "/versions";
         String regUrl = registryUrl(path);
         ResponseEntity<Integer[]> res = restTemplate.getForEntity(regUrl,
-                Integer[].class);
+            Integer[].class);
         Iterable<Integer> result = Arrays.asList(res.getBody());
         return result;
     }
@@ -89,7 +89,7 @@ public class KafkaSchemaServiceImpl implements KafkaSchemaService {
         String path = "/subjects/" + subject + "/versions/" + version;
         String regUrl = registryUrl(path);
         ResponseEntity<Schema> res = restTemplate.getForEntity(regUrl,
-                Schema.class);
+            Schema.class);
         Schema result = res.getBody();
         return result;
     }
@@ -99,7 +99,7 @@ public class KafkaSchemaServiceImpl implements KafkaSchemaService {
         String path = "/config";
         String regUrl = registryUrl(path);
         ResponseEntity<Config> res = restTemplate.getForEntity(regUrl,
-                Config.class);
+            Config.class);
         Config result = res.getBody();
         return result;
     }
@@ -109,7 +109,7 @@ public class KafkaSchemaServiceImpl implements KafkaSchemaService {
         String path = "/config/" + subject;
         String regUrl = registryUrl(path);
         ResponseEntity<Config> res = restTemplate.getForEntity(regUrl,
-                Config.class);
+            Config.class);
         Config result = res.getBody();
         return result;
     }
diff --git a/service/src/main/java/org/apache/griffin/core/metric/MetricController.java b/service/src/main/java/org/apache/griffin/core/metric/MetricController.java
index 5cd43af..23b1238 100644
--- a/service/src/main/java/org/apache/griffin/core/metric/MetricController.java
+++ b/service/src/main/java/org/apache/griffin/core/metric/MetricController.java
@@ -49,32 +49,32 @@ public class MetricController {
 
     @RequestMapping(value = "/metrics/values", method = RequestMethod.GET)
     public List<MetricValue> getMetricValues(@RequestParam("metricName")
-                                                     String metricName,
+                                                 String metricName,
                                              @RequestParam("size") int size,
                                              @RequestParam(value = "offset",
-                                                     defaultValue = "0")
-                                                     int offset,
+                                                 defaultValue = "0")
+                                                 int offset,
                                              @RequestParam(value = "tmst",
-                                                     defaultValue = "0")
-                                                     long tmst) {
+                                                 defaultValue = "0")
+                                                 long tmst) {
         return metricService.getMetricValues(metricName, offset, size, tmst);
     }
 
     @RequestMapping(value = "/metrics/values", method = RequestMethod.POST)
     public ResponseEntity<?> addMetricValues(@RequestBody List<MetricValue>
-                                                     values) {
+                                                 values) {
         return metricService.addMetricValues(values);
     }
 
     @RequestMapping(value = "/metrics/values", method = RequestMethod.DELETE)
     @ResponseStatus(HttpStatus.NO_CONTENT)
     public ResponseEntity<?> deleteMetricValues(@RequestParam("metricName")
-                                                        String metricName) {
+                                                    String metricName) {
         return metricService.deleteMetricValues(metricName);
     }
 
     @RequestMapping(value = "/metrics/values/{instanceId}", method = RequestMethod.GET)
-    public MetricValue getMetric(@PathVariable("instanceId") Long id){
+    public MetricValue getMetric(@PathVariable("instanceId") Long id) {
         return metricService.findMetric(id);
     }
 }
diff --git a/service/src/main/java/org/apache/griffin/core/metric/MetricServiceImpl.java b/service/src/main/java/org/apache/griffin/core/metric/MetricServiceImpl.java
index beca19e..886e6b9 100644
--- a/service/src/main/java/org/apache/griffin/core/metric/MetricServiceImpl.java
+++ b/service/src/main/java/org/apache/griffin/core/metric/MetricServiceImpl.java
@@ -54,7 +54,7 @@ import org.springframework.stereotype.Service;
 @Service
 public class MetricServiceImpl implements MetricService {
     private static final Logger LOGGER = LoggerFactory
-            .getLogger(MetricServiceImpl.class);
+        .getLogger(MetricServiceImpl.class);
 
     @Autowired
     private MeasureRepo<Measure> measureRepo;
@@ -71,9 +71,9 @@ public class MetricServiceImpl implements MetricService {
         List<AbstractJob> jobs = jobRepo.findByDeleted(false);
         List<Measure> measures = measureRepo.findByDeleted(false);
         Map<Long, Measure> measureMap = measures.stream().collect(Collectors
-                .toMap(Measure::getId, Function.identity()));
+            .toMap(Measure::getId, Function.identity()));
         Map<Long, List<AbstractJob>> jobMap = jobs.stream().collect(Collectors
-                .groupingBy(AbstractJob::getMeasureId, Collectors.toList()));
+            .groupingBy(AbstractJob::getMeasureId, Collectors.toList()));
         for (Map.Entry<Long, List<AbstractJob>> entry : jobMap.entrySet()) {
             Long measureId = entry.getKey();
             Measure measure = measureMap.get(measureId);
@@ -81,9 +81,9 @@ public class MetricServiceImpl implements MetricService {
             List<Metric> metrics = new ArrayList<>();
             for (AbstractJob job : jobList) {
                 List<MetricValue> metricValues = getMetricValues(job
-                        .getMetricName(), 0, 300, job.getCreatedDate());
+                    .getMetricName(), 0, 300, job.getCreatedDate());
                 metrics.add(new Metric(job.getMetricName(), measure.getDqType(),
-                        measure.getOwner(), metricValues));
+                    measure.getOwner(), metricValues));
             }
             metricMap.put(measure.getName(), metrics);
 
@@ -96,19 +96,19 @@ public class MetricServiceImpl implements MetricService {
                                              int size, long tmst) {
         if (offset < 0) {
             throw new GriffinException.BadRequestException
-                    (INVALID_METRIC_RECORDS_OFFSET);
+                (INVALID_METRIC_RECORDS_OFFSET);
         }
         if (size < 0) {
             throw new GriffinException.BadRequestException
-                    (INVALID_METRIC_RECORDS_SIZE);
+                (INVALID_METRIC_RECORDS_SIZE);
         }
         try {
             return metricStore.getMetricValues(metricName, offset, size, tmst);
         } catch (IOException e) {
             LOGGER.error("Failed to get metric values named {}. {}",
-                    metricName, e.getMessage());
+                metricName, e.getMessage());
             throw new GriffinException.ServiceException(
-                    "Failed to get metric values", e);
+                "Failed to get metric values", e);
         }
     }
 
@@ -123,11 +123,11 @@ public class MetricServiceImpl implements MetricService {
         } catch (JsonProcessingException e) {
             LOGGER.warn("Failed to parse metric value.", e.getMessage());
             throw new GriffinException.BadRequestException
-                    (INVALID_METRIC_VALUE_FORMAT);
+                (INVALID_METRIC_VALUE_FORMAT);
         } catch (IOException e) {
             LOGGER.error("Failed to add metric values", e);
             throw new GriffinException.ServiceException(
-                    "Failed to add metric values", e);
+                "Failed to add metric values", e);
         }
     }
 
@@ -138,19 +138,19 @@ public class MetricServiceImpl implements MetricService {
             return metricStore.deleteMetricValues(metricName);
         } catch (IOException e) {
             LOGGER.error("Failed to delete metric values named {}. {}",
-                    metricName, e.getMessage());
+                metricName, e.getMessage());
             throw new GriffinException.ServiceException(
-                    "Failed to delete metric values.", e);
+                "Failed to delete metric values.", e);
         }
     }
 
     @Override
     public MetricValue findMetric(Long id) {
         JobInstanceBean jobInstanceBean = jobInstanceRepo.findByInstanceId(id);
-        if (jobInstanceBean == null){
+        if (jobInstanceBean == null) {
             LOGGER.warn("There are no job instances with id {} ", id);
             throw new GriffinException
-                    .NotFoundException(JOB_INSTANCE_NOT_FOUND);
+                .NotFoundException(JOB_INSTANCE_NOT_FOUND);
         }
         String appId = jobInstanceBean.getAppId();
         try {
@@ -163,9 +163,9 @@ public class MetricServiceImpl implements MetricService {
 
     private void checkFormat(MetricValue value) {
         if (StringUtils.isBlank(value.getName()) || value.getTmst() == null
-                || MapUtils.isEmpty(value.getValue())) {
+            || MapUtils.isEmpty(value.getValue())) {
             throw new GriffinException.BadRequestException
-                    (INVALID_METRIC_VALUE_FORMAT);
+                (INVALID_METRIC_VALUE_FORMAT);
         }
     }
 }
diff --git a/service/src/main/java/org/apache/griffin/core/metric/MetricStore.java b/service/src/main/java/org/apache/griffin/core/metric/MetricStore.java
index f85c4db..9510cce 100644
--- a/service/src/main/java/org/apache/griffin/core/metric/MetricStore.java
+++ b/service/src/main/java/org/apache/griffin/core/metric/MetricStore.java
@@ -31,7 +31,7 @@ public interface MetricStore {
                                       long tmst) throws IOException;
 
     ResponseEntity<?> addMetricValues(List<MetricValue> metricValues)
-            throws IOException;
+        throws IOException;
 
     ResponseEntity<?> deleteMetricValues(String metricName) throws IOException;
 
diff --git a/service/src/main/java/org/apache/griffin/core/metric/MetricStoreImpl.java b/service/src/main/java/org/apache/griffin/core/metric/MetricStoreImpl.java
index bf1d680..cc4ade8 100644
--- a/service/src/main/java/org/apache/griffin/core/metric/MetricStoreImpl.java
+++ b/service/src/main/java/org/apache/griffin/core/metric/MetricStoreImpl.java
@@ -77,8 +77,8 @@ public class MetricStoreImpl implements MetricStore {
         if (!user.isEmpty() && !password.isEmpty()) {
             String encodedAuth = buildBasicAuthString(user, password);
             Header[] requestHeaders = new Header[]{
-                    new BasicHeader(org.apache.http.HttpHeaders.AUTHORIZATION,
-                            encodedAuth)};
+                new BasicHeader(org.apache.http.HttpHeaders.AUTHORIZATION,
+                    encodedAuth)};
             builder.setDefaultHeaders(requestHeaders);
         }
         this.client = builder.build();
@@ -90,22 +90,22 @@ public class MetricStoreImpl implements MetricStore {
         this.urlPost = urlBase.concat("/_bulk");
         this.urlDelete = urlBase.concat("/_delete_by_query");
         this.indexMetaData = String.format(
-                "{ \"index\" : { \"_index\" : " +
-                        "\"%s\",\"_type\" : \"%s\" } }%n",
-                INDEX,
-                TYPE);
+            "{ \"index\" : { \"_index\" : " +
+                "\"%s\",\"_type\" : \"%s\" } }%n",
+            INDEX,
+            TYPE);
         this.mapper = new ObjectMapper();
     }
 
     @Override
     public List<MetricValue> getMetricValues(String metricName, int from,
                                              int size, long tmst)
-            throws IOException {
+        throws IOException {
         HttpEntity entity = getHttpEntityForSearch(metricName, from, size,
-                tmst);
+            tmst);
         try {
             Response response = client.performRequest("GET", urlGet,
-                    Collections.emptyMap(), entity);
+                Collections.emptyMap(), entity);
             return getMetricValuesFromResponse(response);
         } catch (ResponseException e) {
             if (e.getResponse().getStatusLine().getStatusCode() == 404) {
@@ -116,44 +116,46 @@ public class MetricStoreImpl implements MetricStore {
     }
 
     private HttpEntity getHttpEntityForSearch(String metricName, int from, int
-            size, long tmst)
-            throws JsonProcessingException {
+        size, long tmst)
+        throws JsonProcessingException {
         Map<String, Object> map = new HashMap<>();
         Map<String, Object> queryParam = new HashMap<>();
         Map<String, Object> termQuery = Collections.singletonMap("name.keyword",
-                metricName);
+            metricName);
         queryParam.put("filter", Collections.singletonMap("term", termQuery));
         Map<String, Object> sortParam = Collections
-                .singletonMap("tmst", Collections.singletonMap("order",
-                        "desc"));
+            .singletonMap("tmst", Collections.singletonMap("order",
+                "desc"));
         map.put("query", Collections.singletonMap("bool", queryParam));
         map.put("sort", sortParam);
         map.put("from", from);
         map.put("size", size);
         return new NStringEntity(JsonUtil.toJson(map),
-                ContentType.APPLICATION_JSON);
+            ContentType.APPLICATION_JSON);
     }
 
     private List<MetricValue> getMetricValuesFromResponse(Response response)
-            throws IOException {
+        throws IOException {
         List<MetricValue> metricValues = new ArrayList<>();
         JsonNode jsonNode = mapper.readTree(EntityUtils.toString(response
-                .getEntity()));
+            .getEntity()));
         if (jsonNode.hasNonNull("hits") && jsonNode.get("hits")
-                .hasNonNull("hits")) {
+            .hasNonNull("hits")) {
             for (JsonNode node : jsonNode.get("hits").get("hits")) {
                 JsonNode sourceNode = node.get("_source");
                 Map<String, Object> value = JsonUtil.toEntity(
-                        sourceNode.get("value").toString(),
-                        new TypeReference<Map<String, Object>>() {});
+                    sourceNode.get("value").toString(),
+                    new TypeReference<Map<String, Object>>() {
+                    });
                 Map<String, Object> meta = JsonUtil.toEntity(
-                        Objects.toString(sourceNode.get("metadata"), null),
-                        new TypeReference<Map<String, Object>>() {});
+                    Objects.toString(sourceNode.get("metadata"), null),
+                    new TypeReference<Map<String, Object>>() {
+                    });
                 MetricValue metricValue = new MetricValue(
-                        sourceNode.get("name").asText(),
-                        Long.parseLong(sourceNode.get("tmst").asText()),
-                        meta,
-                        value);
+                    sourceNode.get("name").asText(),
+                    Long.parseLong(sourceNode.get("tmst").asText()),
+                    meta,
+                    value);
                 metricValues.add(metricValue);
             }
         }
@@ -162,17 +164,17 @@ public class MetricStoreImpl implements MetricStore {
 
     @Override
     public ResponseEntity<?> addMetricValues(List<MetricValue> metricValues)
-            throws IOException {
+        throws IOException {
         String bulkRequestBody = getBulkRequestBody(metricValues);
         HttpEntity entity = new NStringEntity(bulkRequestBody,
-                ContentType.APPLICATION_JSON);
+            ContentType.APPLICATION_JSON);
         Response response = client.performRequest("POST", urlPost,
-                Collections.emptyMap(), entity);
+            Collections.emptyMap(), entity);
         return getResponseEntityFromResponse(response);
     }
 
     private String getBulkRequestBody(List<MetricValue> metricValues) throws
-            JsonProcessingException {
+        JsonProcessingException {
         StringBuilder bulkRequestBody = new StringBuilder();
         for (MetricValue metricValue : metricValues) {
             bulkRequestBody.append(indexMetaData);
@@ -184,38 +186,38 @@ public class MetricStoreImpl implements MetricStore {
 
     @Override
     public ResponseEntity<?> deleteMetricValues(String metricName) throws
-            IOException {
+        IOException {
         Map<String, Object> param = Collections.singletonMap("query",
-                Collections.singletonMap("term",
-                        Collections.singletonMap("name.keyword", metricName)));
+            Collections.singletonMap("term",
+                Collections.singletonMap("name.keyword", metricName)));
         HttpEntity entity = new NStringEntity(
-                JsonUtil.toJson(param),
-                ContentType.APPLICATION_JSON);
+            JsonUtil.toJson(param),
+            ContentType.APPLICATION_JSON);
         Response response = client.performRequest("POST", urlDelete,
-                Collections.emptyMap(), entity);
+            Collections.emptyMap(), entity);
         return getResponseEntityFromResponse(response);
     }
 
     private ResponseEntity<?> getResponseEntityFromResponse(Response response)
-            throws IOException {
+        throws IOException {
         String body = EntityUtils.toString(response.getEntity());
         HttpStatus status = HttpStatus.valueOf(response.getStatusLine()
-                .getStatusCode());
+            .getStatusCode());
         return new ResponseEntity<>(body, responseHeaders, status);
     }
 
     private static String buildBasicAuthString(String user, String password) {
         String auth = user + ":" + password;
         return String.format("Basic %s", Base64.getEncoder().encodeToString(
-                auth.getBytes()));
+            auth.getBytes()));
     }
 
     @Override
     public MetricValue getMetric(String applicationId) throws IOException {
         Response response = client.performRequest(
-                "GET", urlGet,
-                Collections.singletonMap(
-                        "q", "metadata.applicationId:" + applicationId));
+            "GET", urlGet,
+            Collections.singletonMap(
+                "q", "metadata.applicationId:" + applicationId));
         List<MetricValue> metricValues = getMetricValuesFromResponse(response);
         return metricValues.get(0);
     }
diff --git a/service/src/main/java/org/apache/griffin/core/metric/model/MetricValue.java b/service/src/main/java/org/apache/griffin/core/metric/model/MetricValue.java
index 4c7f386..a540c9b 100644
--- a/service/src/main/java/org/apache/griffin/core/metric/model/MetricValue.java
+++ b/service/src/main/java/org/apache/griffin/core/metric/model/MetricValue.java
@@ -89,9 +89,9 @@ public class MetricValue {
         if (o == null || getClass() != o.getClass()) return false;
         MetricValue that = (MetricValue) o;
         return Objects.equals(name, that.name) &&
-                Objects.equals(tmst, that.tmst) &&
-                Objects.equals(metadata, that.metadata) &&
-                Objects.equals(value, that.value);
+            Objects.equals(tmst, that.tmst) &&
+            Objects.equals(metadata, that.metadata) &&
+            Objects.equals(value, that.value);
     }
 
     @Override
@@ -102,7 +102,7 @@ public class MetricValue {
     @Override
     public String toString() {
         return String.format(
-                "MetricValue{name=%s, ts=%s, meta=%s, value=%s}",
-                name, tmst, metadata, value);
+            "MetricValue{name=%s, ts=%s, meta=%s, value=%s}",
+            name, tmst, metadata, value);
     }
 }
diff --git a/service/src/main/java/org/apache/griffin/core/util/FSUtil.java b/service/src/main/java/org/apache/griffin/core/util/FSUtil.java
index b537ac6..b6985a4 100644
--- a/service/src/main/java/org/apache/griffin/core/util/FSUtil.java
+++ b/service/src/main/java/org/apache/griffin/core/util/FSUtil.java
@@ -54,6 +54,7 @@ public class FSUtil {
     private static FileSystem fileSystem;
 
     private static FileSystem defaultFS = getDefaultFileSystem();
+
     private static FileSystem getDefaultFileSystem() {
         FileSystem fs = null;
         Configuration conf = new Configuration();
@@ -173,12 +174,12 @@ public class FSUtil {
     }
 
     public static InputStream getSampleInputStream(String path)
-            throws IOException {
+        throws IOException {
         checkHDFSConf();
         if (isFileExist(path)) {
             FSDataInputStream missingData = fileSystem.open(new Path(path));
             BufferedReader bufReader = new BufferedReader(
-                    new InputStreamReader(missingData, Charsets.UTF_8));
+                new InputStreamReader(missingData, Charsets.UTF_8));
             try {
                 String line = null;
                 int rowCnt = 0;
@@ -205,16 +206,16 @@ public class FSUtil {
     private static void checkHDFSConf() {
         if (getFileSystem() == null) {
             throw new NullPointerException("FileSystem is null. " +
-                    "Please check your hdfs config default name.");
+                "Please check your hdfs config default name.");
         }
     }
 
     public static String getFirstMissRecordPath(String hdfsDir)
-            throws Exception {
+        throws Exception {
         List<FileStatus> fileList = listFileStatus(hdfsDir);
         for (int i = 0; i < fileList.size(); i++) {
             if (fileList.get(i).getPath().toUri().toString().toLowerCase()
-                    .contains("missrecord")) {
+                .contains("missrecord")) {
                 return fileList.get(i).getPath().toUri().toString();
             }
         }
@@ -222,12 +223,12 @@ public class FSUtil {
     }
 
     public static InputStream getMissSampleInputStream(String path)
-            throws Exception {
+        throws Exception {
         List<String> subDirList = listSubDir(path);
         //FIXME: only handle 1-sub dir here now
         for (int i = 0; i < subDirList.size(); i++) {
             return getSampleInputStream(getFirstMissRecordPath(
-                    subDirList.get(i)));
+                subDirList.get(i)));
         }
         return getSampleInputStream(getFirstMissRecordPath(path));
     }
diff --git a/service/src/main/java/org/apache/griffin/core/util/FileUtil.java b/service/src/main/java/org/apache/griffin/core/util/FileUtil.java
index d2209f9..03efaf2 100644
--- a/service/src/main/java/org/apache/griffin/core/util/FileUtil.java
+++ b/service/src/main/java/org/apache/griffin/core/util/FileUtil.java
@@ -27,7 +27,7 @@ import org.slf4j.LoggerFactory;
 
 public class FileUtil {
     private static final Logger LOGGER = LoggerFactory
-            .getLogger(FileUtil.class);
+        .getLogger(FileUtil.class);
 
     public static String getFilePath(String name, String location) {
         if (StringUtils.isEmpty(location)) {
@@ -39,7 +39,7 @@ public class FileUtil {
         File[] files = file.listFiles();
         if (files == null) {
             LOGGER.warn("The external location '{}' does not exist.Read from"
-                    + "default path.", location);
+                + "default path.", location);
             return null;
         }
         return getFilePath(name, files, location);
diff --git a/service/src/main/java/org/apache/griffin/core/util/JsonUtil.java b/service/src/main/java/org/apache/griffin/core/util/JsonUtil.java
index 0fe6449..0c1ef98 100644
--- a/service/src/main/java/org/apache/griffin/core/util/JsonUtil.java
+++ b/service/src/main/java/org/apache/griffin/core/util/JsonUtil.java
@@ -35,7 +35,7 @@ import org.slf4j.LoggerFactory;
 
 public class JsonUtil {
     private static final Logger LOGGER = LoggerFactory
-            .getLogger(JsonUtil.class);
+        .getLogger(JsonUtil.class);
 
     public static String toJson(Object obj) throws JsonProcessingException {
         if (obj == null) {
@@ -47,30 +47,30 @@ public class JsonUtil {
     }
 
     public static String toJsonWithFormat(Object obj)
-            throws JsonProcessingException {
+        throws JsonProcessingException {
         if (obj == null) {
             LOGGER.warn("Object to be formatted cannot be empty!");
             return null;
         }
         ObjectWriter mapper = new ObjectMapper().writer()
-                .withDefaultPrettyPrinter();
+            .withDefaultPrettyPrinter();
         return mapper.writeValueAsString(obj);
     }
 
     public static <T> T toEntity(String jsonStr, Class<T> type)
-            throws IOException {
+        throws IOException {
         if (StringUtils.isEmpty(jsonStr)) {
             LOGGER.warn("Json string {} is empty!", type);
             return null;
         }
         ObjectMapper mapper = new ObjectMapper();
         mapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES,
-                false);
+            false);
         return mapper.readValue(jsonStr, type);
     }
 
     public static <T> T toEntity(File file, TypeReference type)
-            throws IOException {
+        throws IOException {
         if (file == null) {
             LOGGER.warn("File cannot be empty!");
             return null;
@@ -80,7 +80,7 @@ public class JsonUtil {
     }
 
     public static <T> T toEntity(InputStream in, TypeReference type)
-            throws IOException {
+        throws IOException {
         if (in == null) {
             throw new NullPointerException("Input stream cannot be null.");
         }
@@ -89,7 +89,7 @@ public class JsonUtil {
     }
 
     public static <T> T toEntity(String jsonStr, TypeReference type)
-            throws IOException {
+        throws IOException {
         if (StringUtils.isEmpty(jsonStr)) {
             LOGGER.warn("Json string {} is empty!", type);
             return null;
diff --git a/service/src/main/java/org/apache/griffin/core/util/MeasureUtil.java b/service/src/main/java/org/apache/griffin/core/util/MeasureUtil.java
index 2a36886..7e57d87 100644
--- a/service/src/main/java/org/apache/griffin/core/util/MeasureUtil.java
+++ b/service/src/main/java/org/apache/griffin/core/util/MeasureUtil.java
@@ -42,7 +42,7 @@ import org.slf4j.LoggerFactory;
 
 public class MeasureUtil {
     private static final Logger LOGGER = LoggerFactory
-            .getLogger(MeasureUtil.class);
+        .getLogger(MeasureUtil.class);
 
     public static void validateMeasure(Measure measure) {
         if (measure instanceof GriffinMeasure) {
@@ -56,7 +56,7 @@ public class MeasureUtil {
     private static void validateGriffinMeasure(GriffinMeasure measure) {
         if (getConnectorNamesIfValid(measure) == null) {
             throw new GriffinException.BadRequestException
-                    (INVALID_CONNECTOR_NAME);
+                (INVALID_CONNECTOR_NAME);
         }
         if (!validatePredicates(measure)) {
             throw new GriffinException.BadRequestException(INVALID_MEASURE_PREDICATE);
@@ -65,7 +65,7 @@ public class MeasureUtil {
 
     private static boolean validatePredicates(GriffinMeasure measure) {
         for (DataSource dataSource : measure.getDataSources()) {
-            for (DataConnector dataConnector: dataSource.getConnectors()) {
+            for (DataConnector dataConnector : dataSource.getConnectors()) {
                 for (SegmentPredicate segmentPredicate : dataConnector.getPredicates()) {
                     try {
                         PredicatorFactory.newPredicateInstance(segmentPredicate);
@@ -81,7 +81,7 @@ public class MeasureUtil {
     private static void validateExternalMeasure(ExternalMeasure measure) {
         if (StringUtils.isBlank(measure.getMetricName())) {
             LOGGER.warn("Failed to create external measure {}. " +
-                    "Its metric name is blank.", measure.getName());
+                "Its metric name is blank.", measure.getName());
             throw new GriffinException.BadRequestException(MISSING_METRIC_NAME);
         }
     }
@@ -91,7 +91,7 @@ public class MeasureUtil {
         List<DataSource> sources = measure.getDataSources();
         for (DataSource source : sources) {
             source.getConnectors().stream().filter(dc -> dc.getName() != null)
-                    .forEach(dc -> sets.add(dc.getName()));
+                .forEach(dc -> sets.add(dc.getName()));
         }
         if (sets.size() == 0 || sets.size() < sources.size()) {
             LOGGER.warn("Connector names cannot be repeated or empty.");
diff --git a/service/src/main/java/org/apache/griffin/core/util/PropertiesUtil.java b/service/src/main/java/org/apache/griffin/core/util/PropertiesUtil.java
index 65f1707..6e5a8f7 100644
--- a/service/src/main/java/org/apache/griffin/core/util/PropertiesUtil.java
+++ b/service/src/main/java/org/apache/griffin/core/util/PropertiesUtil.java
@@ -35,7 +35,7 @@ import org.springframework.core.io.Resource;
 
 public class PropertiesUtil {
     private static final Logger LOGGER = LoggerFactory.getLogger(
-            PropertiesUtil.class);
+        PropertiesUtil.class);
 
     public static Properties getProperties(String path, Resource resource) {
         PropertiesFactoryBean propFactoryBean = new PropertiesFactoryBean();
@@ -61,7 +61,7 @@ public class PropertiesUtil {
      */
     public static Properties getConf(String name, String defaultPath,
                                      String location)
-            throws FileNotFoundException {
+        throws FileNotFoundException {
         String path = getConfPath(name, location);
         Resource resource;
         if (path == null) {
diff --git a/service/src/main/java/org/apache/griffin/core/util/TimeUtil.java b/service/src/main/java/org/apache/griffin/core/util/TimeUtil.java
index 17e08d6..077b658 100644
--- a/service/src/main/java/org/apache/griffin/core/util/TimeUtil.java
+++ b/service/src/main/java/org/apache/griffin/core/util/TimeUtil.java
@@ -34,17 +34,17 @@ import org.slf4j.LoggerFactory;
 
 public class TimeUtil {
     private static final Logger LOGGER = LoggerFactory.getLogger(TimeUtil
-            .class);
+        .class);
     private static final String MILLISECONDS_PATTERN =
-            "(?i)m(illi)?s(ec(ond)?)?";
+        "(?i)m(illi)?s(ec(ond)?)?";
     private static final String SECONDS_PATTERN =
-            "(?i)s(ec(ond)?)?";
+        "(?i)s(ec(ond)?)?";
     private static final String MINUTES_PATTERN =
-            "(?i)m(in(ute)?)?";
+        "(?i)m(in(ute)?)?";
     private static final String HOURS_PATTERN =
-            "(?i)h((ou)?r)?";
+        "(?i)h((ou)?r)?";
     private static final String DAYS_PATTERN =
-            "(?i)d(ay)?";
+        "(?i)d(ay)?";
 
     private static class TimeUnitPair {
         private long t;
@@ -114,9 +114,9 @@ public class TimeUtil {
             return milliseconds(t, TimeUnit.DAYS);
         } else {
             LOGGER.warn("Time string format ERROR. " +
-                    "It only supports d(day),h(hour), m(minute), " +
-                    "s(second), ms(millsecond). " +
-                    "Please check your time format.");
+                "It only supports d(day),h(hour), m(minute), " +
+                "s(second), ms(millsecond). " +
+                "Please check your time format.");
             return 0L;
         }
     }
diff --git a/service/src/main/java/org/apache/griffin/core/util/YarnNetUtil.java b/service/src/main/java/org/apache/griffin/core/util/YarnNetUtil.java
index f8b3766..10b41ae 100644
--- a/service/src/main/java/org/apache/griffin/core/util/YarnNetUtil.java
+++ b/service/src/main/java/org/apache/griffin/core/util/YarnNetUtil.java
@@ -35,13 +35,13 @@ import org.springframework.web.client.RestTemplate;
 
 public class YarnNetUtil {
     private static final Logger LOGGER = LoggerFactory
-            .getLogger(YarnNetUtil.class);
+        .getLogger(YarnNetUtil.class);
     private static RestTemplate restTemplate = new RestTemplate();
 
     /**
      * delete app task scheduling by yarn.
      *
-     * @param url prefix part of whole url
+     * @param url   prefix part of whole url
      * @param appId application id
      */
     public static void delete(String url, String appId) {
@@ -49,12 +49,12 @@ public class YarnNetUtil {
             if (appId != null) {
                 LOGGER.info("{} will delete by yarn", appId);
                 restTemplate.put(url + "ws/v1/cluster/apps/"
-                                + appId + "/state",
-                        "{\"state\": \"KILLED\"}");
+                        + appId + "/state",
+                    "{\"state\": \"KILLED\"}");
             }
         } catch (HttpClientErrorException e) {
             LOGGER.warn("client error {} from yarn: {}",
-                    e.getMessage(), e.getResponseBodyAsString());
+                e.getMessage(), e.getResponseBodyAsString());
         } catch (Exception e) {
             LOGGER.error("delete exception happens by yarn. {}", e);
         }
@@ -63,7 +63,7 @@ public class YarnNetUtil {
     /**
      * update app task scheduling by yarn.
      *
-     * @param url prefix part of whole url
+     * @param url      prefix part of whole url
      * @param instance job instance
      * @return
      */
@@ -78,7 +78,7 @@ public class YarnNetUtil {
             return true;
         } catch (HttpClientErrorException e) {
             LOGGER.warn("client error {} from yarn: {}",
-                    e.getMessage(), e.getResponseBodyAsString());
+                e.getMessage(), e.getResponseBodyAsString());
             if (e.getStatusCode() == HttpStatus.NOT_FOUND) {
                 // in sync with Livy behavior, see com.cloudera.livy.utils.SparkYarnApp
                 instance.setState(DEAD);
diff --git a/service/src/test/java/org/apache/griffin/core/job/SparkSubmitJobTest.java b/service/src/test/java/org/apache/griffin/core/job/SparkSubmitJobTest.java
index 71b0887..543432e 100644
--- a/service/src/test/java/org/apache/griffin/core/job/SparkSubmitJobTest.java
+++ b/service/src/test/java/org/apache/griffin/core/job/SparkSubmitJobTest.java
@@ -51,7 +51,6 @@ import org.springframework.core.io.ClassPathResource;
 import org.springframework.test.context.junit4.SpringRunner;
 import org.springframework.web.client.RestTemplate;
 
-
 @RunWith(SpringRunner.class)
 public class SparkSubmitJobTest {
 
@@ -151,9 +150,9 @@ public class SparkSubmitJobTest {
         given(context.getJobDetail()).willReturn(jd);
         given(jobInstanceRepo.findByPredicateName(Matchers.anyString()))
                 .willReturn(instance);
-        Whitebox.setInternalState(sparkSubmitJob, "restTemplate", restTemplate);
-        given(restTemplate.postForObject(Matchers.anyString(), Matchers.any(),
-                Matchers.any())).willReturn(result);
+//        Whitebox.setInternalState(sparkSubmitJob, "restTemplate", restTemplate);
+//        given(restTemplate.postForObject(Matchers.anyString(), Matchers.any(),
+//                Matchers.any())).willReturn(result);
 
         sparkSubmitJob.execute(context);