You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@griffin.apache.org by gu...@apache.org on 2018/08/16 10:57:29 UTC
[4/4] incubator-griffin git commit: Fix bugs about code style
violation against Griffin rules
Fix bugs about code style violation against Griffin rules
Author: Eugene <to...@163.com>
Closes #388 from toyboxman/wrap.
Project: http://git-wip-us.apache.org/repos/asf/incubator-griffin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-griffin/commit/3bbbcb32
Tree: http://git-wip-us.apache.org/repos/asf/incubator-griffin/tree/3bbbcb32
Diff: http://git-wip-us.apache.org/repos/asf/incubator-griffin/diff/3bbbcb32
Branch: refs/heads/master
Commit: 3bbbcb32686bb691bdef9af71ef48685d04ea63f
Parents: 10f8103
Author: Eugene <to...@163.com>
Authored: Thu Aug 16 18:57:19 2018 +0800
Committer: William Guo <gu...@apache.org>
Committed: Thu Aug 16 18:57:19 2018 +0800
----------------------------------------------------------------------
.../griffin/core/GriffinWebApplication.java | 5 +-
.../core/config/EclipseLinkJpaConfig.java | 10 +-
.../apache/griffin/core/config/EnvConfig.java | 10 +-
.../apache/griffin/core/config/LoginConfig.java | 3 +-
.../griffin/core/config/PropertiesConfig.java | 32 ++--
.../griffin/core/config/SchedulerConfig.java | 6 +-
.../core/exception/GriffinExceptionHandler.java | 19 +-
.../core/exception/GriffinExceptionMessage.java | 37 +++-
.../exception/GriffinExceptionResponse.java | 6 +-
.../griffin/core/job/BatchJobOperatorImpl.java | 109 ++++++++----
.../griffin/core/job/FileExistPredicator.java | 12 +-
.../apache/griffin/core/job/JobController.java | 29 ++-
.../apache/griffin/core/job/JobInstance.java | 94 ++++++----
.../apache/griffin/core/job/JobOperator.java | 9 +-
.../apache/griffin/core/job/JobServiceImpl.java | 175 +++++++++++++------
.../apache/griffin/core/job/SparkSubmitJob.java | 50 ++++--
.../core/job/StreamingJobOperatorImpl.java | 97 ++++++----
.../griffin/core/job/entity/AbstractJob.java | 29 ++-
.../griffin/core/job/entity/BatchJob.java | 10 +-
.../griffin/core/job/entity/JobDataSegment.java | 12 +-
.../core/job/entity/JobInstanceBean.java | 12 +-
.../griffin/core/job/entity/JobState.java | 3 +-
.../core/job/entity/LivySessionStates.java | 15 +-
.../core/job/entity/SegmentPredicate.java | 8 +-
.../griffin/core/job/entity/StreamingJob.java | 3 +-
.../factory/AutowiringSpringBeanJobFactory.java | 3 +-
.../core/job/factory/PredicatorFactory.java | 3 +-
.../griffin/core/job/repo/JobInstanceRepo.java | 9 +-
.../apache/griffin/core/job/repo/JobRepo.java | 3 +-
.../griffin/core/login/LoginController.java | 3 +-
.../core/login/LoginServiceDefaultImpl.java | 2 +-
.../core/login/LoginServiceLdapImpl.java | 24 ++-
.../measure/ExternalMeasureOperatorImpl.java | 6 +-
.../measure/GriffinMeasureOperatorImpl.java | 3 +-
.../griffin/core/measure/MeasureController.java | 12 +-
.../core/measure/MeasureOrgController.java | 4 +-
.../griffin/core/measure/MeasureOrgService.java | 5 +-
.../core/measure/MeasureOrgServiceImpl.java | 19 +-
.../core/measure/MeasureServiceImpl.java | 27 ++-
.../core/measure/entity/DataConnector.java | 32 ++--
.../griffin/core/measure/entity/DataSource.java | 8 +-
.../core/measure/entity/EvaluateRule.java | 3 +-
.../core/measure/entity/ExternalMeasure.java | 3 +-
.../core/measure/entity/GriffinMeasure.java | 22 ++-
.../griffin/core/measure/entity/Measure.java | 10 +-
.../griffin/core/measure/entity/Rule.java | 25 ++-
.../measure/entity/StreamingPreProcess.java | 5 +-
.../metastore/hive/HiveMetaStoreController.java | 3 +-
.../core/metastore/hive/HiveMetaStoreProxy.java | 13 +-
.../hive/HiveMetaStoreServiceImpl.java | 32 ++--
.../metastore/kafka/KafkaSchemaController.java | 6 +-
.../metastore/kafka/KafkaSchemaServiceImpl.java | 21 ++-
.../griffin/core/metric/MetricController.java | 17 +-
.../griffin/core/metric/MetricService.java | 3 +-
.../griffin/core/metric/MetricServiceImpl.java | 48 +++--
.../apache/griffin/core/metric/MetricStore.java | 6 +-
.../griffin/core/metric/MetricStoreImpl.java | 81 ++++++---
.../griffin/core/metric/model/Metric.java | 3 +-
.../org/apache/griffin/core/util/FSUtil.java | 33 ++--
.../org/apache/griffin/core/util/FileUtil.java | 9 +-
.../org/apache/griffin/core/util/JsonUtil.java | 24 ++-
.../apache/griffin/core/util/MeasureUtil.java | 12 +-
.../griffin/core/util/PropertiesUtil.java | 10 +-
.../org/apache/griffin/core/util/TimeUtil.java | 23 ++-
.../apache/griffin/core/util/YarnNetUtil.java | 7 +-
.../config/EclipseLinkJpaConfigForTest.java | 11 +-
.../core/config/PropertiesConfigTest.java | 8 +-
.../core/info/GriffinInfoControllerTest.java | 2 +-
.../griffin/core/job/JobControllerTest.java | 35 ++--
.../core/job/JobInstanceBeanRepoTest.java | 27 ++-
.../griffin/core/job/JobInstanceTest.java | 44 +++--
.../griffin/core/job/SparkSubmitJobTest.java | 44 +++--
.../core/job/repo/JobInstanceRepoTest.java | 8 +-
.../griffin/core/job/repo/JobRepoTest.java | 11 +-
.../ExternalMeasureOperatorImplTest.java | 9 +-
.../measure/GriffinMeasureOperatorImplTest.java | 12 +-
.../core/measure/MeasureControllerTest.java | 36 ++--
.../core/measure/MeasureOrgControllerTest.java | 3 +-
.../core/measure/MeasureOrgServiceImplTest.java | 23 ++-
.../core/measure/MeasureServiceImplTest.java | 69 +++++---
.../measure/repo/DataConnectorRepoTest.java | 17 +-
.../core/measure/repo/MeasureRepoTest.java | 3 +-
.../hive/HiveMetaStoreControllerTest.java | 28 ++-
.../hive/HiveMetaStoreServiceImplTest.java | 9 +-
.../kafka/KafkaSchemaControllerTest.java | 12 +-
.../core/metric/MetricControllerTest.java | 49 ++++--
.../core/metric/MetricServiceImplTest.java | 54 ++++--
.../core/metric/MetricStoreImplTest.java | 9 +-
.../apache/griffin/core/util/EntityHelper.java | 115 +++++++-----
.../apache/griffin/core/util/JsonUtilTest.java | 15 +-
.../griffin/core/util/PropertiesUtilTest.java | 11 +-
.../apache/griffin/core/util/TimeUtilTest.java | 8 +-
92 files changed, 1374 insertions(+), 675 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/3bbbcb32/service/src/main/java/org/apache/griffin/core/GriffinWebApplication.java
----------------------------------------------------------------------
diff --git a/service/src/main/java/org/apache/griffin/core/GriffinWebApplication.java b/service/src/main/java/org/apache/griffin/core/GriffinWebApplication.java
index e5f877b..4ed4773 100644
--- a/service/src/main/java/org/apache/griffin/core/GriffinWebApplication.java
+++ b/service/src/main/java/org/apache/griffin/core/GriffinWebApplication.java
@@ -31,7 +31,8 @@ import org.springframework.scheduling.annotation.EnableScheduling;
@SpringBootApplication
@EnableScheduling
public class GriffinWebApplication {
- private static final Logger LOGGER = LoggerFactory.getLogger(GriffinWebApplication.class);
+ private static final Logger LOGGER = LoggerFactory
+ .getLogger(GriffinWebApplication.class);
public static void main(String[] args) {
LOGGER.info("application start");
@@ -43,4 +44,4 @@ public class GriffinWebApplication {
return new SimpleCORSFilter();
}
-}
\ No newline at end of file
+}
http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/3bbbcb32/service/src/main/java/org/apache/griffin/core/config/EclipseLinkJpaConfig.java
----------------------------------------------------------------------
diff --git a/service/src/main/java/org/apache/griffin/core/config/EclipseLinkJpaConfig.java b/service/src/main/java/org/apache/griffin/core/config/EclipseLinkJpaConfig.java
index ffb013a..1493569 100644
--- a/service/src/main/java/org/apache/griffin/core/config/EclipseLinkJpaConfig.java
+++ b/service/src/main/java/org/apache/griffin/core/config/EclipseLinkJpaConfig.java
@@ -36,9 +36,10 @@ import org.springframework.transaction.jta.JtaTransactionManager;
@Configuration
@ComponentScan("org.apache.griffin.core")
public class EclipseLinkJpaConfig extends JpaBaseConfiguration {
- protected EclipseLinkJpaConfig(DataSource ds, JpaProperties properties,
- ObjectProvider<JtaTransactionManager> jtm,
- ObjectProvider<TransactionManagerCustomizers> tmc) {
+ protected EclipseLinkJpaConfig(
+ DataSource ds, JpaProperties properties,
+ ObjectProvider<JtaTransactionManager> jtm,
+ ObjectProvider<TransactionManagerCustomizers> tmc) {
super(ds, properties, jtm, tmc);
}
@@ -51,7 +52,8 @@ public class EclipseLinkJpaConfig extends JpaBaseConfiguration {
protected Map<String, Object> getVendorProperties() {
Map<String, Object> map = new HashMap<>();
map.put(PersistenceUnitProperties.WEAVING, "false");
- map.put(PersistenceUnitProperties.DDL_GENERATION, "create-or-extend-tables");
+ map.put(PersistenceUnitProperties.DDL_GENERATION,
+ "create-or-extend-tables");
return map;
}
}
http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/3bbbcb32/service/src/main/java/org/apache/griffin/core/config/EnvConfig.java
----------------------------------------------------------------------
diff --git a/service/src/main/java/org/apache/griffin/core/config/EnvConfig.java b/service/src/main/java/org/apache/griffin/core/config/EnvConfig.java
index ebd5a46..8c075a4 100644
--- a/service/src/main/java/org/apache/griffin/core/config/EnvConfig.java
+++ b/service/src/main/java/org/apache/griffin/core/config/EnvConfig.java
@@ -74,7 +74,8 @@ public class EnvConfig {
* @return String
* @throws IOException io exception
*/
- private static String readEnvFromAbsolutePath(String path) throws IOException {
+ private static String readEnvFromAbsolutePath(String path)
+ throws IOException {
if (path == null) {
LOGGER.warn("Parameter path is null.");
return null;
@@ -101,7 +102,8 @@ public class EnvConfig {
* @return String
* @throws IOException io exception
*/
- static String getBatchEnv(String name, String defaultPath, String location) throws IOException {
+ static String getBatchEnv(String name, String defaultPath, String location)
+ throws IOException {
if (ENV_BATCH != null) {
return ENV_BATCH;
}
@@ -116,7 +118,9 @@ public class EnvConfig {
return ENV_BATCH;
}
- static String getStreamingEnv(String name, String defaultPath, String location)
+ static String getStreamingEnv(String name,
+ String defaultPath,
+ String location)
throws IOException {
if (ENV_STREAMING != null) {
return ENV_STREAMING;
http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/3bbbcb32/service/src/main/java/org/apache/griffin/core/config/LoginConfig.java
----------------------------------------------------------------------
diff --git a/service/src/main/java/org/apache/griffin/core/config/LoginConfig.java b/service/src/main/java/org/apache/griffin/core/config/LoginConfig.java
index 57066a8..ca43751 100644
--- a/service/src/main/java/org/apache/griffin/core/config/LoginConfig.java
+++ b/service/src/main/java/org/apache/griffin/core/config/LoginConfig.java
@@ -47,7 +47,8 @@ public class LoginConfig {
case "default":
return new LoginServiceDefaultImpl();
case "ldap":
- return new LoginServiceLdapImpl(url, email, searchBase, searchPattern);
+ return new LoginServiceLdapImpl(url, email, searchBase,
+ searchPattern);
default:
return null;
}
http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/3bbbcb32/service/src/main/java/org/apache/griffin/core/config/PropertiesConfig.java
----------------------------------------------------------------------
diff --git a/service/src/main/java/org/apache/griffin/core/config/PropertiesConfig.java b/service/src/main/java/org/apache/griffin/core/config/PropertiesConfig.java
index 16e5525..3bdb02b 100644
--- a/service/src/main/java/org/apache/griffin/core/config/PropertiesConfig.java
+++ b/service/src/main/java/org/apache/griffin/core/config/PropertiesConfig.java
@@ -45,7 +45,8 @@ import org.springframework.core.io.ClassPathResource;
@Configuration
public class PropertiesConfig {
- private static final Logger LOGGER = LoggerFactory.getLogger(PropertiesConfig.class);
+ private static final Logger LOGGER = LoggerFactory
+ .getLogger(PropertiesConfig.class);
public static Map<String, Object> livyConfMap;
@@ -53,9 +54,13 @@ public class PropertiesConfig {
private String envLocation;
- public PropertiesConfig(@Value("${external.config.location}") String configLocation, @Value("${external.env.location}") String envLocation) {
- LOGGER.info("external.config.location : {}", configLocation != null ? configLocation : "null");
- LOGGER.info("external.env.location : {}", envLocation != null ? envLocation : "null");
+ public PropertiesConfig(
+ @Value("${external.config.location}") String configLocation,
+ @Value("${external.env .location}") String envLocation) {
+ LOGGER.info("external.config.location : {}",
+ configLocation != null ? configLocation : "null");
+ LOGGER.info("external.env.location : {}",
+ envLocation != null ? envLocation : "null");
this.configLocation = configLocation;
this.envLocation = envLocation;
}
@@ -73,10 +78,12 @@ public class PropertiesConfig {
}
/**
- * Config quartz.properties will be replaced if it's found in external.config.location setting.
+ * Config quartz.properties will be replaced if it's found in external
+ * .config.location setting.
*
* @return Properties
- * @throws FileNotFoundException It'll throw FileNotFoundException when path is wrong.
+ * @throws FileNotFoundException It'll throw FileNotFoundException
+ * when path is wrong.
*/
@Bean(name = "quartzConf")
public Properties quartzConf() throws FileNotFoundException {
@@ -85,7 +92,10 @@ public class PropertiesConfig {
return getConf(name, defaultPath, configLocation);
}
- private static void genLivyConf(String name, String defaultPath, String location) throws IOException {
+ private static void genLivyConf(
+ String name,
+ String defaultPath,
+ String location) throws IOException {
if (livyConfMap != null) {
return;
}
@@ -106,13 +116,15 @@ public class PropertiesConfig {
* @return Map
* @throws IOException io exception
*/
- private static Map<String, Object> readPropertiesFromResource(String path) throws IOException {
+ private static Map<String, Object> readPropertiesFromResource(String path)
+ throws IOException {
if (path == null) {
LOGGER.warn("Parameter path is null.");
return null;
}
- // Be careful, here we use getInputStream() to convert path file to stream.
- // It'll cause FileNotFoundException if you use getFile() to convert path file to File Object
+ // Be careful, here we use getInputStream() to convert path file to
+ // stream. It'll cause FileNotFoundException if you use getFile()
+ // to convert path file to File Object
InputStream in = new ClassPathResource(path).getInputStream();
return toEntity(in, new TypeReference<Map<String, Object>>() {
});
http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/3bbbcb32/service/src/main/java/org/apache/griffin/core/config/SchedulerConfig.java
----------------------------------------------------------------------
diff --git a/service/src/main/java/org/apache/griffin/core/config/SchedulerConfig.java b/service/src/main/java/org/apache/griffin/core/config/SchedulerConfig.java
index 8c71e89..7b6af51 100644
--- a/service/src/main/java/org/apache/griffin/core/config/SchedulerConfig.java
+++ b/service/src/main/java/org/apache/griffin/core/config/SchedulerConfig.java
@@ -43,13 +43,15 @@ public class SchedulerConfig {
@Bean
public JobFactory jobFactory(ApplicationContext applicationContext) {
- AutowiringSpringBeanJobFactory jobFactory = new AutowiringSpringBeanJobFactory();
+ AutowiringSpringBeanJobFactory jobFactory =
+ new AutowiringSpringBeanJobFactory();
jobFactory.setApplicationContext(applicationContext);
return jobFactory;
}
@Bean
- public SchedulerFactoryBean schedulerFactoryBean(DataSource dataSource, JobFactory jobFactory) {
+ public SchedulerFactoryBean schedulerFactoryBean(DataSource dataSource,
+ JobFactory jobFactory) {
SchedulerFactoryBean factory = new SchedulerFactoryBean();
factory.setOverwriteExistingJobs(true);
factory.setDataSource(dataSource);
http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/3bbbcb32/service/src/main/java/org/apache/griffin/core/exception/GriffinExceptionHandler.java
----------------------------------------------------------------------
diff --git a/service/src/main/java/org/apache/griffin/core/exception/GriffinExceptionHandler.java b/service/src/main/java/org/apache/griffin/core/exception/GriffinExceptionHandler.java
index 0a105b4..d987da7 100644
--- a/service/src/main/java/org/apache/griffin/core/exception/GriffinExceptionHandler.java
+++ b/service/src/main/java/org/apache/griffin/core/exception/GriffinExceptionHandler.java
@@ -33,22 +33,29 @@ public class GriffinExceptionHandler {
@SuppressWarnings("rawtypes")
@ExceptionHandler(GriffinException.ServiceException.class)
- public ResponseEntity handleGriffinExceptionOfServer(HttpServletRequest request, GriffinException.ServiceException e) {
+ public ResponseEntity handleGriffinExceptionOfServer(
+ HttpServletRequest request,
+ GriffinException.ServiceException e) {
String message = e.getMessage();
Throwable cause = e.getCause();
- GriffinExceptionResponse body = new GriffinExceptionResponse(HttpStatus.INTERNAL_SERVER_ERROR,
+ GriffinExceptionResponse body = new GriffinExceptionResponse(
+ HttpStatus.INTERNAL_SERVER_ERROR,
message, request.getRequestURI(), cause.getClass().getName());
return new ResponseEntity<>(body, HttpStatus.INTERNAL_SERVER_ERROR);
}
@SuppressWarnings("rawtypes")
@ExceptionHandler(GriffinException.class)
- public ResponseEntity handleGriffinExceptionOfClient(HttpServletRequest request, GriffinException e) {
- ResponseStatus responseStatus = AnnotationUtils.findAnnotation(e.getClass(), ResponseStatus.class);
+ public ResponseEntity handleGriffinExceptionOfClient(
+ HttpServletRequest request, GriffinException e) {
+ ResponseStatus responseStatus = AnnotationUtils.findAnnotation(
+ e.getClass(), ResponseStatus.class);
HttpStatus status = responseStatus.code();
String code = e.getMessage();
- GriffinExceptionMessage message = GriffinExceptionMessage.valueOf(Integer.valueOf(code));
- GriffinExceptionResponse body = new GriffinExceptionResponse(status, message, request.getRequestURI());
+ GriffinExceptionMessage message = GriffinExceptionMessage
+ .valueOf(Integer.valueOf(code));
+ GriffinExceptionResponse body = new GriffinExceptionResponse(
+ status, message, request.getRequestURI());
return new ResponseEntity<>(body, status);
}
}
http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/3bbbcb32/service/src/main/java/org/apache/griffin/core/exception/GriffinExceptionMessage.java
----------------------------------------------------------------------
diff --git a/service/src/main/java/org/apache/griffin/core/exception/GriffinExceptionMessage.java b/service/src/main/java/org/apache/griffin/core/exception/GriffinExceptionMessage.java
index 14d987d..ae6a0ea 100644
--- a/service/src/main/java/org/apache/griffin/core/exception/GriffinExceptionMessage.java
+++ b/service/src/main/java/org/apache/griffin/core/exception/GriffinExceptionMessage.java
@@ -22,35 +22,57 @@ package org.apache.griffin.core.exception;
public enum GriffinExceptionMessage {
//400, "Bad Request"
- MEASURE_TYPE_DOES_NOT_MATCH(40001, "Property 'measure.type' does not match the type of measure in request body"),
- INVALID_CONNECTOR_NAME(40002, "Property 'name' in 'connectors' field is invalid"),
+ MEASURE_TYPE_DOES_NOT_MATCH(40001, "Property 'measure.type' does not match"
+ + "the type of measure in request body"),
+ INVALID_CONNECTOR_NAME(40002, "Property 'name' in 'connectors' " +
+ "field is invalid"),
MISSING_METRIC_NAME(40003, "Missing property 'metricName'"),
INVALID_JOB_NAME(40004, "Property 'job.name' is invalid"),
MISSING_BASELINE_CONFIG(40005, "Missing 'as.baseline' config in 'data.segments'"),
+
INVALID_METRIC_RECORDS_OFFSET(40006, "Offset must not be less than zero"),
+
INVALID_METRIC_RECORDS_SIZE(40007, "Size must not be less than zero"),
+
INVALID_METRIC_VALUE_FORMAT(40008, "Metric value format is invalid"),
+
INVALID_MEASURE_ID(40009, "Property 'measure.id' is invalid"),
+
INVALID_CRON_EXPRESSION(40010, "Property 'cron.expression' is invalid"),
+
MEASURE_TYPE_DOES_NOT_SUPPORT(40011, "We don't support such measure type."),
+
JOB_TYPE_DOES_NOT_SUPPORT(40011, "We don't support such job type."),
- STREAMING_JOB_IS_RUNNING(40012, "There is no need to start again as job is RUNNING."),
- STREAMING_JOB_IS_STOPPED(40012, "There is no need to stop again as job is STOPPED."),
+
+ STREAMING_JOB_IS_RUNNING(40012, "There is no need to start again " +
+ "as job is RUNNING."),
+ STREAMING_JOB_IS_STOPPED(40012, "There is no need to stop again " +
+ "as job is STOPPED."),
JOB_IS_NOT_SCHEDULED(40013, "The job isn't scheduled."),
+
JOB_IS_NOT_IN_PAUSED_STATUS(40014, "The job isn't in paused status."),
+
JOB_IS_IN_PAUSED_STATUS(40015, "The job is already in paused status."),
//404, "Not Found"
MEASURE_ID_DOES_NOT_EXIST(40401, "Measure id does not exist"),
+
JOB_ID_DOES_NOT_EXIST(40402, "Job id does not exist"),
+
JOB_NAME_DOES_NOT_EXIST(40403, "Job name does not exist"),
+
NO_SUCH_JOB_ACTION(40404, "No such job action"),
- JOB_KEY_DOES_NOT_EXIST(40405, "Job key which consists of group and name does not exist."),
- ORGANIZATION_NAME_DOES_NOT_EXIST(40406, "Organization name does not exist"),
+
+ JOB_KEY_DOES_NOT_EXIST(40405, "Job key which consists of " +
+ "group and name does not exist."),
+ ORGANIZATION_NAME_DOES_NOT_EXIST(40406, "Organization name " +
+ "does not exist"),
+
HDFS_FILE_NOT_EXIST(40407, "Hadoop data file not exist"),
//409, "Conflict"
MEASURE_NAME_ALREADY_EXIST(40901, "Measure name already exists"),
+
QUARTZ_JOB_ALREADY_EXIST(40902, "Quartz job already exist");
private final int code;
@@ -70,7 +92,8 @@ public enum GriffinExceptionMessage {
return message;
}
}
- throw new IllegalArgumentException("No matching constant for [" + code + "]");
+ throw new IllegalArgumentException("No matching constant for ["
+ + code + "]");
}
http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/3bbbcb32/service/src/main/java/org/apache/griffin/core/exception/GriffinExceptionResponse.java
----------------------------------------------------------------------
diff --git a/service/src/main/java/org/apache/griffin/core/exception/GriffinExceptionResponse.java b/service/src/main/java/org/apache/griffin/core/exception/GriffinExceptionResponse.java
index db8f766..47e28da 100644
--- a/service/src/main/java/org/apache/griffin/core/exception/GriffinExceptionResponse.java
+++ b/service/src/main/java/org/apache/griffin/core/exception/GriffinExceptionResponse.java
@@ -37,7 +37,8 @@ public class GriffinExceptionResponse {
private String path;
- GriffinExceptionResponse(HttpStatus status, GriffinExceptionMessage message, String path) {
+ GriffinExceptionResponse(HttpStatus status, GriffinExceptionMessage message,
+ String path) {
this.status = status.value();
this.error = status.getReasonPhrase();
this.code = Integer.toString(message.getCode());
@@ -45,7 +46,8 @@ public class GriffinExceptionResponse {
this.path = path;
}
- GriffinExceptionResponse(HttpStatus status, String message, String path, String exception) {
+ GriffinExceptionResponse(HttpStatus status, String message, String path,
+ String exception) {
this.status = status.value();
this.error = status.getReasonPhrase();
this.message = message;
http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/3bbbcb32/service/src/main/java/org/apache/griffin/core/job/BatchJobOperatorImpl.java
----------------------------------------------------------------------
diff --git a/service/src/main/java/org/apache/griffin/core/job/BatchJobOperatorImpl.java b/service/src/main/java/org/apache/griffin/core/job/BatchJobOperatorImpl.java
index bc73cd8..5b2f816 100644
--- a/service/src/main/java/org/apache/griffin/core/job/BatchJobOperatorImpl.java
+++ b/service/src/main/java/org/apache/griffin/core/job/BatchJobOperatorImpl.java
@@ -51,7 +51,8 @@ import org.springframework.util.StringUtils;
@Service
public class BatchJobOperatorImpl implements JobOperator {
- private static final Logger LOGGER = LoggerFactory.getLogger(BatchJobOperatorImpl.class);
+ private static final Logger LOGGER = LoggerFactory
+ .getLogger(BatchJobOperatorImpl.class);
@Autowired
private SchedulerFactoryBean factory;
@@ -64,7 +65,8 @@ public class BatchJobOperatorImpl implements JobOperator {
@Override
@Transactional(rollbackFor = Exception.class)
- public AbstractJob add(AbstractJob job, GriffinMeasure measure) throws Exception {
+ public AbstractJob add(AbstractJob job, GriffinMeasure measure)
+ throws Exception {
validateParams(job, measure);
String qName = jobService.getQuartzName(job);
String qGroup = jobService.getQuartzGroup();
@@ -75,7 +77,9 @@ public class BatchJobOperatorImpl implements JobOperator {
return job;
}
- private BatchJob genBatchJobBean(AbstractJob job, String qName, String qGroup) {
+ private BatchJob genBatchJobBean(AbstractJob job,
+ String qName,
+ String qGroup) {
BatchJob batchJob = (BatchJob) job;
batchJob.setMetricName(job.getJobName());
batchJob.setGroup(qGroup);
@@ -96,17 +100,21 @@ public class BatchJobOperatorImpl implements JobOperator {
String group = job.getGroup();
TriggerState state = getTriggerState(name, group);
if (state == null) {
- throw new GriffinException.BadRequestException(JOB_IS_NOT_SCHEDULED);
+ throw new GriffinException.BadRequestException(
+ JOB_IS_NOT_SCHEDULED);
}
- /* If job is not in paused state,we can't start it as it may be RUNNING. */
+ /* If job is not in paused state,we can't start it
+ as it may be RUNNING.*/
if (state != PAUSED) {
- throw new GriffinException.BadRequestException(JOB_IS_NOT_IN_PAUSED_STATUS);
+ throw new GriffinException.BadRequestException
+ (JOB_IS_NOT_IN_PAUSED_STATUS);
}
JobKey jobKey = jobKey(name, group);
try {
factory.getScheduler().resumeJob(jobKey);
} catch (SchedulerException e) {
- throw new GriffinException.ServiceException("Failed to start job.", e);
+ throw new GriffinException.ServiceException(
+ "Failed to start job.", e);
}
}
@@ -123,19 +131,23 @@ public class BatchJobOperatorImpl implements JobOperator {
@Override
- public JobHealth getHealth(JobHealth jobHealth, AbstractJob job) throws SchedulerException {
- List<? extends Trigger> triggers = jobService.getTriggers(job.getName(), job.getGroup());
+ public JobHealth getHealth(JobHealth jobHealth, AbstractJob job)
+ throws SchedulerException {
+ List<? extends Trigger> triggers = jobService
+ .getTriggers(job.getName(), job.getGroup());
if (!CollectionUtils.isEmpty(triggers)) {
jobHealth.setJobCount(jobHealth.getJobCount() + 1);
if (jobService.isJobHealthy(job.getId())) {
- jobHealth.setHealthyJobCount(jobHealth.getHealthyJobCount() + 1);
+ jobHealth.setHealthyJobCount(
+ jobHealth.getHealthyJobCount() + 1);
}
}
return jobHealth;
}
@Override
- public JobState getState(AbstractJob job, String action) throws SchedulerException {
+ public JobState getState(AbstractJob job, String action)
+ throws SchedulerException {
JobState jobState = new JobState();
Scheduler scheduler = factory.getScheduler();
if (job.getGroup() == null || job.getName() == null) {
@@ -150,24 +162,30 @@ public class BatchJobOperatorImpl implements JobOperator {
return jobState;
}
- private void setTriggerTime(AbstractJob job, JobState jobState) throws SchedulerException {
- List<? extends Trigger> triggers = jobService.getTriggers(job.getName(), job.getGroup());
- // If triggers are empty, in Griffin it means job is completed whose trigger state is NONE or not scheduled.
+ private void setTriggerTime(AbstractJob job, JobState jobState)
+ throws SchedulerException {
+ List<? extends Trigger> triggers = jobService
+ .getTriggers(job.getName(), job.getGroup());
+ // If triggers are empty, in Griffin it means job is completed whose
+ // trigger state is NONE or not scheduled.
if (CollectionUtils.isEmpty(triggers)) {
return;
}
Trigger trigger = triggers.get(0);
Date nextFireTime = trigger.getNextFireTime();
Date previousFireTime = trigger.getPreviousFireTime();
- jobState.setNextFireTime(nextFireTime != null ? nextFireTime.getTime() : -1);
- jobState.setPreviousFireTime(previousFireTime != null ? previousFireTime.getTime() : -1);
+ jobState.setNextFireTime(nextFireTime != null ?
+ nextFireTime.getTime() : -1);
+ jobState.setPreviousFireTime(previousFireTime != null ?
+ previousFireTime.getTime() : -1);
}
/**
* only PAUSED state of job can be started
*
* @param state job state
- * @return true: job can be started, false: job is running which cannot be started
+ * @return true: job can be started, false: job is running which cannot be
+ * started
*/
private boolean getStartStatus(TriggerState state) {
return state == PAUSED;
@@ -177,7 +195,8 @@ public class BatchJobOperatorImpl implements JobOperator {
* only NORMAL or BLOCKED state of job can be started
*
* @param state job state
- * @return true: job can be stopped, false: job is running which cannot be stopped
+ * @return true: job can be stopped, false: job is running which cannot be
+ * stopped
*/
private boolean getStopStatus(TriggerState state) {
return state == NORMAL || state == BLOCKED;
@@ -186,7 +205,8 @@ public class BatchJobOperatorImpl implements JobOperator {
private TriggerState getTriggerState(String name, String group) {
try {
- List<? extends Trigger> triggers = jobService.getTriggers(name, group);
+ List<? extends Trigger> triggers = jobService.getTriggers(name,
+ group);
if (CollectionUtils.isEmpty(triggers)) {
return null;
}
@@ -194,7 +214,8 @@ public class BatchJobOperatorImpl implements JobOperator {
return factory.getScheduler().getTriggerState(key);
} catch (SchedulerException e) {
LOGGER.error("Failed to delete job", e);
- throw new GriffinException.ServiceException("Failed to delete job", e);
+ throw new GriffinException
+ .ServiceException("Failed to delete job", e);
}
}
@@ -202,7 +223,8 @@ public class BatchJobOperatorImpl implements JobOperator {
/**
* @param job griffin job
- * @param delete if job needs to be deleted,set isNeedDelete true,otherwise it just will be paused.
+ * @param delete if job needs to be deleted,set isNeedDelete true,otherwise
+ * it just will be paused.
*/
private void pauseJob(BatchJob job, boolean delete) {
try {
@@ -212,7 +234,8 @@ public class BatchJobOperatorImpl implements JobOperator {
batchJobRepo.save(job);
} catch (Exception e) {
LOGGER.error("Job schedule happens exception.", e);
- throw new GriffinException.ServiceException("Job schedule happens exception.", e);
+ throw new GriffinException.ServiceException("Job schedule " +
+ "happens exception.", e);
}
}
@@ -220,7 +243,8 @@ public class BatchJobOperatorImpl implements JobOperator {
List<JobInstanceBean> instances = instanceRepo.findByJobId(job.getId());
for (JobInstanceBean instance : instances) {
if (!instance.isPredicateDeleted()) {
- deleteJob(instance.getPredicateGroup(), instance.getPredicateName());
+ deleteJob(instance.getPredicateGroup(), instance
+ .getPredicateName());
instance.setPredicateDeleted(true);
if (instance.getState().equals(LivySessionStates.State.FINDING)) {
instance.setState(LivySessionStates.State.NOT_FOUND);
@@ -233,7 +257,8 @@ public class BatchJobOperatorImpl implements JobOperator {
Scheduler scheduler = factory.getScheduler();
JobKey jobKey = new JobKey(name, group);
if (!scheduler.checkExists(jobKey)) {
- LOGGER.info("Job({},{}) does not exist.", jobKey.getGroup(), jobKey.getName());
+ LOGGER.info("Job({},{}) does not exist.", jobKey.getGroup(), jobKey
+ .getName());
return;
}
scheduler.deleteJob(jobKey);
@@ -247,8 +272,10 @@ public class BatchJobOperatorImpl implements JobOperator {
Scheduler scheduler = factory.getScheduler();
JobKey jobKey = new JobKey(name, group);
if (!scheduler.checkExists(jobKey)) {
- LOGGER.warn("Job({},{}) does not exist.", jobKey.getGroup(), jobKey.getName());
- throw new GriffinException.NotFoundException(JOB_KEY_DOES_NOT_EXIST);
+ LOGGER.warn("Job({},{}) does not exist.", jobKey.getGroup(), jobKey
+ .getName());
+ throw new GriffinException.NotFoundException
+ (JOB_KEY_DOES_NOT_EXIST);
}
scheduler.pauseJob(jobKey);
}
@@ -267,7 +294,8 @@ public class BatchJobOperatorImpl implements JobOperator {
return pauseStatus;
}
- private boolean pauseJobInstance(JobInstanceBean instance, List<JobInstanceBean> deletedInstances) {
+ private boolean pauseJobInstance(JobInstanceBean instance,
+ List<JobInstanceBean> deletedInstances) {
boolean status = true;
String pGroup = instance.getPredicateGroup();
String pName = instance.getPredicateName();
@@ -278,7 +306,8 @@ public class BatchJobOperatorImpl implements JobOperator {
deletedInstances.add(instance);
}
} catch (SchedulerException e) {
- LOGGER.error("Failed to pause predicate job({},{}).", pGroup, pName);
+ LOGGER.error("Failed to pause predicate job({},{}).", pGroup,
+ pName);
status = false;
}
return status;
@@ -289,14 +318,17 @@ public class BatchJobOperatorImpl implements JobOperator {
throw new GriffinException.BadRequestException(INVALID_JOB_NAME);
}
if (!isValidCronExpression(job.getCronExpression())) {
- throw new GriffinException.BadRequestException(INVALID_CRON_EXPRESSION);
+ throw new GriffinException.BadRequestException
+ (INVALID_CRON_EXPRESSION);
}
if (!isValidBaseLine(job.getSegments())) {
- throw new GriffinException.BadRequestException(MISSING_BASELINE_CONFIG);
+ throw new GriffinException.BadRequestException
+ (MISSING_BASELINE_CONFIG);
}
List<String> names = getConnectorNames(measure);
if (!isValidConnectorNames(job.getSegments(), names)) {
- throw new GriffinException.BadRequestException(INVALID_CONNECTOR_NAME);
+ throw new GriffinException.BadRequestException
+ (INVALID_CONNECTOR_NAME);
}
}
@@ -319,24 +351,29 @@ public class BatchJobOperatorImpl implements JobOperator {
return true;
}
}
- LOGGER.warn("Please set segment timestamp baseline in as.baseline field.");
+ LOGGER.warn("Please set segment timestamp baseline " +
+ "in as.baseline field.");
return false;
}
- private boolean isValidConnectorNames(List<JobDataSegment> segments, List<String> names) {
+ private boolean isValidConnectorNames(List<JobDataSegment> segments,
+ List<String> names) {
assert segments != null;
Set<String> sets = new HashSet<>();
for (JobDataSegment segment : segments) {
String dcName = segment.getDataConnectorName();
sets.add(dcName);
- boolean exist = names.stream().anyMatch(name -> name.equals(dcName));
+ boolean exist = names.stream().anyMatch(name -> name.equals
+ (dcName));
if (!exist) {
- LOGGER.warn("Param {} is a illegal string. Please input one of strings in {}.", dcName, names);
+ LOGGER.warn("Param {} is a illegal string. " +
+ "Please input one of strings in {}.", dcName, names);
return false;
}
}
if (sets.size() < segments.size()) {
- LOGGER.warn("Connector names in job data segment cannot duplicate.");
+ LOGGER.warn("Connector names in job data segment " +
+ "cannot duplicate.");
return false;
}
return true;
http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/3bbbcb32/service/src/main/java/org/apache/griffin/core/job/FileExistPredicator.java
----------------------------------------------------------------------
diff --git a/service/src/main/java/org/apache/griffin/core/job/FileExistPredicator.java b/service/src/main/java/org/apache/griffin/core/job/FileExistPredicator.java
index 35f981f..703a837 100644
--- a/service/src/main/java/org/apache/griffin/core/job/FileExistPredicator.java
+++ b/service/src/main/java/org/apache/griffin/core/job/FileExistPredicator.java
@@ -32,7 +32,8 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class FileExistPredicator implements Predicator {
- private static final Logger LOGGER = LoggerFactory.getLogger(FileExistPredicator.class);
+ private static final Logger LOGGER = LoggerFactory
+ .getLogger(FileExistPredicator.class);
private static final String PREDICT_PATH = "path";
private static final String PREDICT_ROOT_PATH = "root.path";
@@ -48,12 +49,15 @@ public class FileExistPredicator implements Predicator {
Map<String, Object> config = predicate.getConfigMap();
String[] paths = null;
String rootPath = null;
- if (config != null && !StringUtils.isEmpty((String) config.get(PREDICT_PATH))) {
- paths = ((String) config.get(PREDICT_PATH)).split(PATH_CONNECTOR_CHARACTER);
+ if (config != null && !StringUtils.isEmpty((String) config.get
+ (PREDICT_PATH))) {
+ paths = ((String) config.get(PREDICT_PATH)).split
+ (PATH_CONNECTOR_CHARACTER);
rootPath = (String) config.get(PREDICT_ROOT_PATH);
}
if (ArrayUtils.isEmpty(paths) || StringUtils.isEmpty(rootPath)) {
- LOGGER.error("Predicate path is null.Please check predicates config root.path and path.");
+ LOGGER.error("Predicate path is null.Please check predicates " +
+ "config root.path and path.");
throw new NullPointerException();
}
for (String path : paths) {
http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/3bbbcb32/service/src/main/java/org/apache/griffin/core/job/JobController.java
----------------------------------------------------------------------
diff --git a/service/src/main/java/org/apache/griffin/core/job/JobController.java b/service/src/main/java/org/apache/griffin/core/job/JobController.java
index 64b8e42..f4ee791 100644
--- a/service/src/main/java/org/apache/griffin/core/job/JobController.java
+++ b/service/src/main/java/org/apache/griffin/core/job/JobController.java
@@ -48,7 +48,8 @@ public class JobController {
private JobService jobService;
@RequestMapping(value = "/jobs", method = RequestMethod.GET)
- public List<AbstractJob> getJobs(@RequestParam(value = "type", defaultValue = "") String type) {
+ public List<AbstractJob> getJobs(@RequestParam(value = "type",
+ defaultValue = "") String type) {
return jobService.getAliveJobs(type);
}
@@ -65,24 +66,31 @@ public class JobController {
@RequestMapping(value = "/jobs/{id}", method = RequestMethod.PUT)
@ResponseStatus(HttpStatus.OK)
- public AbstractJob onActions(@PathVariable("id") Long jobId, @RequestParam String action) throws Exception {
+ public AbstractJob onActions(
+ @PathVariable("id") Long jobId,
+ @RequestParam String action) throws Exception {
return jobService.onAction(jobId, action);
}
@RequestMapping(value = "/jobs", method = RequestMethod.DELETE)
@ResponseStatus(HttpStatus.NO_CONTENT)
- public void deleteJob(@RequestParam("jobName") String jobName) throws SchedulerException {
+ public void deleteJob(@RequestParam("jobName") String jobName)
+ throws SchedulerException {
jobService.deleteJob(jobName);
}
@RequestMapping(value = "/jobs/{id}", method = RequestMethod.DELETE)
@ResponseStatus(HttpStatus.NO_CONTENT)
- public void deleteJob(@PathVariable("id") Long id) throws SchedulerException {
+ public void deleteJob(@PathVariable("id") Long id)
+ throws SchedulerException {
jobService.deleteJob(id);
}
@RequestMapping(value = "/jobs/instances", method = RequestMethod.GET)
- public List<JobInstanceBean> findInstancesOfJob(@RequestParam("jobId") Long id, @RequestParam("page") int page, @RequestParam("size") int size) {
+ public List<JobInstanceBean> findInstancesOfJob(
+ @RequestParam("jobId") Long id,
+ @RequestParam("page") int page,
+ @RequestParam("size") int size) {
return jobService.findInstancesOfJob(id, page, size);
}
@@ -92,11 +100,16 @@ public class JobController {
}
@RequestMapping(path = "/jobs/download", method = RequestMethod.GET)
- public ResponseEntity<Resource> download(@RequestParam("jobName") String jobName, @RequestParam("ts") long timestamp) throws Exception {
+ public ResponseEntity<Resource> download(
+ @RequestParam("jobName") String jobName,
+ @RequestParam("ts") long timestamp)
+ throws Exception {
String path = jobService.getJobHdfsSinksPath(jobName, timestamp);
- InputStreamResource resource = new InputStreamResource(FSUtil.getMissSampleInputStream(path));
+ InputStreamResource resource = new InputStreamResource(
+ FSUtil.getMissSampleInputStream(path));
return ResponseEntity.ok().
- header("content-disposition", "attachment; filename = sampleMissingData.json")
+ header("content-disposition",
+ "attachment; filename = sampleMissingData.json")
.contentType(MediaType.APPLICATION_OCTET_STREAM)
.body(resource);
}
http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/3bbbcb32/service/src/main/java/org/apache/griffin/core/job/JobInstance.java
----------------------------------------------------------------------
diff --git a/service/src/main/java/org/apache/griffin/core/job/JobInstance.java b/service/src/main/java/org/apache/griffin/core/job/JobInstance.java
index ab67c81..afb25d8 100644
--- a/service/src/main/java/org/apache/griffin/core/job/JobInstance.java
+++ b/service/src/main/java/org/apache/griffin/core/job/JobInstance.java
@@ -78,7 +78,8 @@ import org.springframework.transaction.annotation.Transactional;
@PersistJobDataAfterExecution
@DisallowConcurrentExecution
public class JobInstance implements Job {
- private static final Logger LOGGER = LoggerFactory.getLogger(JobInstance.class);
+ private static final Logger LOGGER = LoggerFactory
+ .getLogger(JobInstance.class);
public static final String MEASURE_KEY = "measure";
public static final String PREDICATES_KEY = "predicts";
public static final String PREDICATE_JOB_NAME = "predicateJobName";
@@ -86,7 +87,8 @@ public class JobInstance implements Job {
static final String PATH_CONNECTOR_CHARACTER = ",";
public static final String INTERVAL = "interval";
public static final String REPEAT = "repeat";
- public static final String CHECK_DONEFILE_SCHEDULE = "checkdonefile.schedule";
+ public static final String CHECK_DONEFILE_SCHEDULE =
+ "checkdonefile.schedule";
@Autowired
private SchedulerFactoryBean factory;
@@ -104,7 +106,6 @@ public class JobInstance implements Job {
private List<SegmentPredicate> mPredicates;
private Long jobStartTime;
-
@Override
@Transactional
public void execute(JobExecutionContext context) {
@@ -117,7 +118,8 @@ public class JobInstance implements Job {
}
}
- private void initParam(JobExecutionContext context) throws SchedulerException {
+ private void initParam(JobExecutionContext context)
+ throws SchedulerException {
mPredicates = new ArrayList<>();
JobDetail jobDetail = context.getJobDetail();
Long jobId = jobDetail.getJobDataMap().getLong(GRIFFIN_JOB_ID);
@@ -128,20 +130,24 @@ public class JobInstance implements Job {
}
@SuppressWarnings("unchecked")
- private void setJobStartTime(JobDetail jobDetail) throws SchedulerException {
+ private void setJobStartTime(JobDetail jobDetail)
+ throws SchedulerException {
Scheduler scheduler = factory.getScheduler();
JobKey jobKey = jobDetail.getKey();
- List<Trigger> triggers = (List<Trigger>) scheduler.getTriggersOfJob(jobKey);
+ List<Trigger> triggers =
+ (List<Trigger>) scheduler.getTriggersOfJob(jobKey);
Date triggerTime = triggers.get(0).getPreviousFireTime();
jobStartTime = triggerTime.getTime();
}
- private void setSourcesPartitionsAndPredicates(List<DataSource> sources) throws Exception {
+ private void setSourcesPartitionsAndPredicates(List<DataSource> sources)
+ throws Exception {
boolean isFirstBaseline = true;
for (JobDataSegment jds : job.getSegments()) {
if (jds.isAsTsBaseline() && isFirstBaseline) {
- Long tsOffset = TimeUtil.str2Long(jds.getSegmentRange().getBegin());
+ Long tsOffset = TimeUtil.str2Long(
+ jds.getSegmentRange().getBegin());
measure.setTimestamp(jobStartTime + tsOffset);
isFirstBaseline = false;
}
@@ -151,7 +157,8 @@ public class JobInstance implements Job {
}
}
- private void setDataSourcePartitions(JobDataSegment jds, DataSource ds) throws Exception {
+ private void setDataSourcePartitions(JobDataSegment jds, DataSource ds)
+ throws Exception {
List<DataConnector> connectors = ds.getConnectors();
for (DataConnector dc : connectors) {
setDataConnectorPartitions(jds, dc);
@@ -159,7 +166,9 @@ public class JobInstance implements Job {
}
- private void setDataConnectorPartitions(JobDataSegment jds, DataConnector dc) throws Exception {
+ private void setDataConnectorPartitions(
+ JobDataSegment jds,
+ DataConnector dc) throws Exception {
String dcName = jds.getDataConnectorName();
if (dcName.equals(dc.getName())) {
Long[] sampleTs = genSampleTs(jds.getSegmentRange(), dc);
@@ -179,7 +188,8 @@ public class JobInstance implements Job {
Long offset = TimeUtil.str2Long(segRange.getBegin());
Long range = TimeUtil.str2Long(segRange.getLength());
String unit = dc.getDataUnit();
- Long dataUnit = TimeUtil.str2Long(StringUtils.isEmpty(unit) ? dc.getDefaultDataUnit() : unit);
+ Long dataUnit = TimeUtil.str2Long(StringUtils.isEmpty(unit) ? dc
+ .getDefaultDataUnit() : unit);
//offset usually is negative
Long dataStartTime = jobStartTime + offset;
if (range < 0) {
@@ -206,7 +216,9 @@ public class JobInstance implements Job {
private void setConnectorPredicates(DataConnector dc, Long[] sampleTs) {
List<SegmentPredicate> predicates = dc.getPredicates();
for (SegmentPredicate predicate : predicates) {
- genConfMap(predicate.getConfigMap(), sampleTs, dc.getDataTimeZone());
+ genConfMap(predicate.getConfigMap(),
+ sampleTs,
+ dc.getDataTimeZone());
//Do not forget to update origin string config
predicate.setConfigMap(predicate.getConfigMap());
mPredicates.add(predicate);
@@ -222,11 +234,15 @@ public class JobInstance implements Job {
/**
* @param conf config map
* @param sampleTs collection of data split start timestamp
- * @return all config data combine,like {"where": "year=2017 AND month=11 AND dt=15 AND hour=09,year=2017 AND month=11 AND dt=15 AND hour=10"}
- * or like {"path": "/year=2017/month=11/dt=15/hour=09/_DONE,/year=2017/month=11/dt=15/hour=10/_DONE"}
+ * @return all config data combine,like {"where": "year=2017 AND month=11
+ * AND dt=15 AND hour=09,year=2017 AND month=11 AND
+ * dt=15 AND hour=10"}
+ * or like {"path": "/year=2017/month=11/dt=15/hour=09/_DONE
+ * ,/year=2017/month=11/dt=15/hour=10/_DONE"}
*/
- private void genConfMap(Map<String, Object> conf, Long[] sampleTs, String timezone) {
+ private void genConfMap(Map<String, Object> conf, Long[] sampleTs, String
+ timezone) {
if (conf == null) {
LOGGER.warn("Predicate config is null.");
return;
@@ -240,9 +256,11 @@ public class JobInstance implements Job {
continue;
}
for (Long timestamp : sampleTs) {
- set.add(TimeUtil.format(value, timestamp, getTimeZone(timezone)));
+ set.add(TimeUtil.format(value, timestamp,
+ getTimeZone(timezone)));
}
- conf.put(entry.getKey(), StringUtils.join(set, PATH_CONNECTOR_CHARACTER));
+ conf.put(entry.getKey(), StringUtils.join(set,
+ PATH_CONNECTOR_CHARACTER));
}
}
}
@@ -255,16 +273,20 @@ public class JobInstance implements Job {
}
@SuppressWarnings("unchecked")
- private void createJobInstance(Map<String, Object> confMap) throws Exception {
+ private void createJobInstance(Map<String, Object> confMap)
+ throws Exception {
confMap = checkConfMap(confMap != null ? confMap : new HashMap<>());
- Map<String, Object> config = (Map<String, Object>) confMap.get(CHECK_DONEFILE_SCHEDULE);
+ Map<String, Object> config = (Map<String, Object>) confMap
+ .get(CHECK_DONEFILE_SCHEDULE);
Long interval = TimeUtil.str2Long((String) config.get(INTERVAL));
Integer repeat = Integer.valueOf(config.get(REPEAT).toString());
String groupName = "PG";
- String jobName = job.getJobName() + "_predicate_" + System.currentTimeMillis();
+ String jobName = job.getJobName() + "_predicate_" + System
+ .currentTimeMillis();
TriggerKey tk = triggerKey(jobName, groupName);
if (factory.getScheduler().checkExists(tk)) {
- throw new GriffinException.ConflictException(QUARTZ_JOB_ALREADY_EXIST);
+ throw new GriffinException.ConflictException
+ (QUARTZ_JOB_ALREADY_EXIST);
}
saveJobInstance(jobName, groupName);
createJobInstance(tk, interval, repeat, jobName);
@@ -272,7 +294,8 @@ public class JobInstance implements Job {
@SuppressWarnings("unchecked")
Map<String, Object> checkConfMap(Map<String, Object> confMap) {
- Map<String, Object> config = (Map<String, Object>) confMap.get(CHECK_DONEFILE_SCHEDULE);
+ Map<String, Object> config = (Map<String, Object>) confMap.get
+ (CHECK_DONEFILE_SCHEDULE);
String interval = env.getProperty("predicate.job.interval");
interval = interval != null ? interval : "5m";
String repeat = env.getProperty("predicate.job.repeat.count");
@@ -294,30 +317,38 @@ public class JobInstance implements Job {
}
private void saveJobInstance(String pName, String pGroup) {
- ProcessType type = measure.getProcessType() == BATCH ? BATCH : STREAMING;
+ ProcessType type = measure.getProcessType() == BATCH ? BATCH :
+ STREAMING;
Long tms = System.currentTimeMillis();
String expired = env.getProperty("jobInstance.expired.milliseconds");
- Long expireTms = Long.valueOf(expired != null ? expired : "604800000") + tms;
- JobInstanceBean instance = new JobInstanceBean(FINDING, pName, pGroup, tms, expireTms, type);
+ Long expireTms = Long.valueOf(expired != null ? expired : "604800000")
+ + tms;
+ JobInstanceBean instance = new JobInstanceBean(FINDING, pName, pGroup,
+ tms, expireTms, type);
instance.setJob(job);
instanceRepo.save(instance);
}
- private void createJobInstance(TriggerKey tk, Long interval, Integer repeatCount, String pJobName) throws Exception {
+ private void createJobInstance(TriggerKey tk, Long interval, Integer
+ repeatCount, String pJobName) throws Exception {
JobDetail jobDetail = addJobDetail(tk, pJobName);
- Trigger trigger = genTriggerInstance(tk, jobDetail, interval, repeatCount);
+ Trigger trigger = genTriggerInstance(tk, jobDetail, interval,
+ repeatCount);
factory.getScheduler().scheduleJob(trigger);
}
- private Trigger genTriggerInstance(TriggerKey tk, JobDetail jd, Long interval, Integer repeatCount) {
+ private Trigger genTriggerInstance(TriggerKey tk, JobDetail jd, Long
+ interval, Integer repeatCount) {
return newTrigger().withIdentity(tk).forJob(jd).startNow()
- .withSchedule(simpleSchedule().withIntervalInMilliseconds(interval).withRepeatCount(repeatCount))
+ .withSchedule(simpleSchedule().withIntervalInMilliseconds
+ (interval).withRepeatCount(repeatCount))
.build();
}
- private JobDetail addJobDetail(TriggerKey tk, String pJobName) throws SchedulerException, IOException {
+ private JobDetail addJobDetail(TriggerKey tk, String pJobName)
+ throws SchedulerException, IOException {
Scheduler scheduler = factory.getScheduler();
JobKey jobKey = jobKey(tk.getName(), tk.getGroup());
JobDetail jobDetail;
@@ -335,7 +366,8 @@ public class JobInstance implements Job {
return jobDetail;
}
- private void setJobDataMap(JobDetail jobDetail, String pJobName) throws IOException {
+ private void setJobDataMap(JobDetail jobDetail, String pJobName)
+ throws IOException {
JobDataMap dataMap = jobDetail.getJobDataMap();
preProcessMeasure();
String result = toJson(measure);
http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/3bbbcb32/service/src/main/java/org/apache/griffin/core/job/JobOperator.java
----------------------------------------------------------------------
diff --git a/service/src/main/java/org/apache/griffin/core/job/JobOperator.java b/service/src/main/java/org/apache/griffin/core/job/JobOperator.java
index 7fdbe7d..81c3b17 100644
--- a/service/src/main/java/org/apache/griffin/core/job/JobOperator.java
+++ b/service/src/main/java/org/apache/griffin/core/job/JobOperator.java
@@ -26,7 +26,8 @@ import org.apache.griffin.core.measure.entity.GriffinMeasure;
import org.quartz.SchedulerException;
public interface JobOperator {
- AbstractJob add(AbstractJob job, GriffinMeasure measure) throws Exception;
+ AbstractJob add(AbstractJob job, GriffinMeasure measure)
+ throws Exception;
void start(AbstractJob job) throws Exception;
@@ -34,7 +35,9 @@ public interface JobOperator {
void delete(AbstractJob job) throws SchedulerException;
- JobHealth getHealth(JobHealth jobHealth, AbstractJob job) throws SchedulerException;
+ JobHealth getHealth(JobHealth jobHealth, AbstractJob job)
+ throws SchedulerException;
- JobState getState(AbstractJob job, String action) throws SchedulerException;
+ JobState getState(AbstractJob job, String action)
+ throws SchedulerException;
}
http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/3bbbcb32/service/src/main/java/org/apache/griffin/core/job/JobServiceImpl.java
----------------------------------------------------------------------
diff --git a/service/src/main/java/org/apache/griffin/core/job/JobServiceImpl.java b/service/src/main/java/org/apache/griffin/core/job/JobServiceImpl.java
index 5617065..ac14461 100644
--- a/service/src/main/java/org/apache/griffin/core/job/JobServiceImpl.java
+++ b/service/src/main/java/org/apache/griffin/core/job/JobServiceImpl.java
@@ -101,7 +101,8 @@ import org.springframework.web.client.RestTemplate;
@Service
public class JobServiceImpl implements JobService {
- private static final Logger LOGGER = LoggerFactory.getLogger(JobServiceImpl.class);
+ private static final Logger LOGGER = LoggerFactory
+ .getLogger(JobServiceImpl.class);
public static final String GRIFFIN_JOB_ID = "griffinJobId";
private static final int MAX_PAGE_SIZE = 1024;
private static final int DEFAULT_PAGE_SIZE = 10;
@@ -156,7 +157,8 @@ public class JobServiceImpl implements JobService {
}
} catch (SchedulerException e) {
LOGGER.error("Failed to get RUNNING jobs.", e);
- throw new GriffinException.ServiceException("Failed to get RUNNING jobs.", e);
+ throw new GriffinException
+ .ServiceException("Failed to get RUNNING jobs.", e);
}
return dataList;
}
@@ -174,7 +176,8 @@ public class JobServiceImpl implements JobService {
AbstractJob job = jobRepo.findByIdAndDeleted(jobId, false);
if (job == null) {
LOGGER.warn("Job id {} does not exist.", jobId);
- throw new GriffinException.NotFoundException(JOB_ID_DOES_NOT_EXIST);
+ throw new GriffinException
+ .NotFoundException(JOB_ID_DOES_NOT_EXIST);
}
return job;
}
@@ -194,7 +197,8 @@ public class JobServiceImpl implements JobService {
return job;
}
- private void doAction(String action, AbstractJob job, JobOperator op) throws Exception {
+ private void doAction(String action, AbstractJob job, JobOperator op)
+ throws Exception {
switch (action) {
case START:
op.start(job);
@@ -203,7 +207,8 @@ public class JobServiceImpl implements JobService {
op.stop(job);
break;
default:
- throw new GriffinException.NotFoundException(NO_SUCH_JOB_ACTION);
+ throw new GriffinException
+ .NotFoundException(NO_SUCH_JOB_ACTION);
}
}
@@ -233,7 +238,8 @@ public class JobServiceImpl implements JobService {
List<AbstractJob> jobs = jobRepo.findByJobNameAndDeleted(name, false);
if (CollectionUtils.isEmpty(jobs)) {
LOGGER.warn("There is no job with '{}' name.", name);
- throw new GriffinException.NotFoundException(JOB_NAME_DOES_NOT_EXIST);
+ throw new GriffinException
+ .NotFoundException(JOB_NAME_DOES_NOT_EXIST);
}
for (AbstractJob job : jobs) {
JobOperator op = getJobOperator(job);
@@ -242,16 +248,22 @@ public class JobServiceImpl implements JobService {
}
@Override
- public List<JobInstanceBean> findInstancesOfJob(Long jobId, int page, int size) {
+ public List<JobInstanceBean> findInstancesOfJob(
+ Long jobId,
+ int page,
+ int size) {
AbstractJob job = jobRepo.findByIdAndDeleted(jobId, false);
if (job == null) {
LOGGER.warn("Job id {} does not exist.", jobId);
- throw new GriffinException.NotFoundException(JOB_ID_DOES_NOT_EXIST);
+ throw new GriffinException
+ .NotFoundException(JOB_ID_DOES_NOT_EXIST);
}
size = size > MAX_PAGE_SIZE ? MAX_PAGE_SIZE : size;
size = size <= 0 ? DEFAULT_PAGE_SIZE : size;
- Pageable pageable = new PageRequest(page, size, Sort.Direction.DESC, "tms");
- List<JobInstanceBean> instances = instanceRepo.findByJobId(jobId, pageable);
+ Pageable pageable = new PageRequest(page, size,
+ Sort.Direction.DESC, "tms");
+ List<JobInstanceBean> instances = instanceRepo.findByJobId(jobId,
+ pageable);
return updateState(instances);
}
@@ -266,7 +278,8 @@ public class JobServiceImpl implements JobService {
}
/**
- * a job is regard as healthy job when its latest instance is in healthy state.
+ * a job is regard as healthy job when its latest instance is in healthy
+ * state.
*
* @return job healthy statistics
*/
@@ -280,7 +293,8 @@ public class JobServiceImpl implements JobService {
jobHealth = op.getHealth(jobHealth, job);
} catch (SchedulerException e) {
LOGGER.error("Job schedule exception. {}", e);
- throw new GriffinException.ServiceException("Fail to Get HealthInfo", e);
+ throw new GriffinException
+ .ServiceException("Fail to Get HealthInfo", e);
}
}
@@ -290,7 +304,9 @@ public class JobServiceImpl implements JobService {
@Scheduled(fixedDelayString = "${jobInstance.expired.milliseconds}")
public void deleteExpiredJobInstance() {
Long timeMills = System.currentTimeMillis();
- List<JobInstanceBean> instances = instanceRepo.findByExpireTmsLessThanEqual(timeMills);
+ List<JobInstanceBean> instances = instanceRepo
+ .findByExpireTmsLessThanEqual
+ (timeMills);
if (!batchJobOp.pauseJobInstances(instances)) {
LOGGER.error("Pause job failure.");
return;
@@ -312,7 +328,8 @@ public class JobServiceImpl implements JobService {
} else if (job instanceof StreamingJob) {
return streamingJobOp;
}
- throw new GriffinException.BadRequestException(JOB_TYPE_DOES_NOT_SUPPORT);
+ throw new GriffinException.BadRequestException
+ (JOB_TYPE_DOES_NOT_SUPPORT);
}
private JobOperator getJobOperator(ProcessType type) {
@@ -321,18 +338,22 @@ public class JobServiceImpl implements JobService {
} else if (type == STREAMING) {
return streamingJobOp;
}
- throw new GriffinException.BadRequestException(MEASURE_TYPE_DOES_NOT_SUPPORT);
+ throw new GriffinException.BadRequestException
+ (MEASURE_TYPE_DOES_NOT_SUPPORT);
}
- TriggerKey getTriggerKeyIfValid(String qName, String qGroup) throws SchedulerException {
+ TriggerKey getTriggerKeyIfValid(String qName, String qGroup) throws
+ SchedulerException {
TriggerKey triggerKey = triggerKey(qName, qGroup);
if (factory.getScheduler().checkExists(triggerKey)) {
- throw new GriffinException.ConflictException(QUARTZ_JOB_ALREADY_EXIST);
+ throw new GriffinException.ConflictException
+ (QUARTZ_JOB_ALREADY_EXIST);
}
return triggerKey;
}
- List<? extends Trigger> getTriggers(String name, String group) throws SchedulerException {
+ List<? extends Trigger> getTriggers(String name, String group) throws
+ SchedulerException {
if (name == null || group == null) {
return null;
}
@@ -341,7 +362,8 @@ public class JobServiceImpl implements JobService {
return scheduler.getTriggersOfJob(jobKey);
}
- private JobState genJobState(AbstractJob job, String action) throws SchedulerException {
+ private JobState genJobState(AbstractJob job, String action) throws
+ SchedulerException {
JobOperator op = getJobOperator(job);
JobState state = op.getState(job, action);
job.setJobState(state);
@@ -352,7 +374,8 @@ public class JobServiceImpl implements JobService {
return genJobState(job, null);
}
- void addJob(TriggerKey tk, AbstractJob job, ProcessType type) throws Exception {
+ void addJob(TriggerKey tk, AbstractJob job, ProcessType type) throws
+ Exception {
JobDetail jobDetail = addJobDetail(tk, job);
Trigger trigger = genTriggerInstance(tk, jobDetail, job, type);
factory.getScheduler().scheduleJob(trigger);
@@ -381,27 +404,35 @@ public class JobServiceImpl implements JobService {
private GriffinMeasure getMeasureIfValid(Long measureId) {
- GriffinMeasure measure = measureRepo.findByIdAndDeleted(measureId, false);
+ GriffinMeasure measure = measureRepo.findByIdAndDeleted(measureId,
+ false);
if (measure == null) {
- LOGGER.warn("The measure id {} isn't valid. Maybe it doesn't exist or is external measure type.", measureId);
+ LOGGER.warn("The measure id {} isn't valid. Maybe it doesn't " +
+ "exist or is external measure type.",
+ measureId);
throw new GriffinException.BadRequestException(INVALID_MEASURE_ID);
}
return measure;
}
- private Trigger genTriggerInstance(TriggerKey tk, JobDetail jd, AbstractJob job, ProcessType type) {
+ private Trigger genTriggerInstance(TriggerKey tk, JobDetail jd, AbstractJob
+ job, ProcessType type) {
TriggerBuilder builder = newTrigger().withIdentity(tk).forJob(jd);
if (type == BATCH) {
TimeZone timeZone = getTimeZone(job.getTimeZone());
- return builder.withSchedule(cronSchedule(job.getCronExpression()).inTimeZone(timeZone)).build();
+ return builder.withSchedule(cronSchedule(job.getCronExpression())
+ .inTimeZone(timeZone)).build();
} else if (type == STREAMING) {
- return builder.startNow().withSchedule(simpleSchedule().withRepeatCount(0)).build();
+ return builder.startNow().withSchedule(simpleSchedule()
+ .withRepeatCount(0)).build();
}
- throw new GriffinException.BadRequestException(JOB_TYPE_DOES_NOT_SUPPORT);
+ throw new GriffinException.BadRequestException
+ (JOB_TYPE_DOES_NOT_SUPPORT);
}
- private JobDetail addJobDetail(TriggerKey triggerKey, AbstractJob job) throws SchedulerException {
+ private JobDetail addJobDetail(TriggerKey triggerKey, AbstractJob job)
+ throws SchedulerException {
Scheduler scheduler = factory.getScheduler();
JobKey jobKey = jobKey(triggerKey.getName(), triggerKey.getGroup());
JobDetail jobDetail;
@@ -409,7 +440,8 @@ public class JobServiceImpl implements JobService {
if (isJobKeyExist) {
jobDetail = scheduler.getJobDetail(jobKey);
} else {
- jobDetail = newJob(JobInstance.class).storeDurably().withIdentity(jobKey).build();
+ jobDetail = newJob(JobInstance.class).storeDurably().withIdentity
+ (jobKey).build();
}
setJobDataMap(jobDetail, job);
scheduler.addJob(jobDetail, isJobKeyExist);
@@ -429,8 +461,10 @@ public class JobServiceImpl implements JobService {
*
* @param measureId measure id
*/
- public void deleteJobsRelateToMeasure(Long measureId) throws SchedulerException {
- List<AbstractJob> jobs = jobRepo.findByMeasureIdAndDeleted(measureId, false);
+ public void deleteJobsRelateToMeasure(Long measureId) throws
+ SchedulerException {
+ List<AbstractJob> jobs = jobRepo.findByMeasureIdAndDeleted(measureId,
+ false);
if (CollectionUtils.isEmpty(jobs)) {
LOGGER.info("Measure id {} has no related jobs.", measureId);
return;
@@ -443,7 +477,8 @@ public class JobServiceImpl implements JobService {
@Scheduled(fixedDelayString = "${jobInstance.fixedDelay.in.milliseconds}")
public void syncInstancesOfAllJobs() {
- LivySessionStates.State[] states = {STARTING, NOT_STARTED, RECOVERING, IDLE, RUNNING, BUSY};
+ LivySessionStates.State[] states = {STARTING, NOT_STARTED, RECOVERING,
+ IDLE, RUNNING, BUSY};
List<JobInstanceBean> beans = instanceRepo.findByActiveState(states);
for (JobInstanceBean jobInstance : beans) {
syncInstancesOfJob(jobInstance);
@@ -451,7 +486,8 @@ public class JobServiceImpl implements JobService {
}
/**
- * call livy to update part of job instance table data associated with group and jobName in mysql.
+ * call livy to update part of job instance table data associated with group
+ * and jobName in mysql.
*
* @param instance job instance livy info
*/
@@ -459,17 +495,22 @@ public class JobServiceImpl implements JobService {
if (instance.getSessionId() == null) {
return;
}
- String uri = env.getProperty("livy.uri") + "/" + instance.getSessionId();
- TypeReference<HashMap<String, Object>> type = new TypeReference<HashMap<String, Object>>() {
- };
+ String uri = env.getProperty("livy.uri") + "/"
+ + instance.getSessionId();
+ TypeReference<HashMap<String, Object>> type =
+ new TypeReference<HashMap<String, Object>>() {
+ };
try {
String resultStr = restTemplate.getForObject(uri, String.class);
- HashMap<String, Object> resultMap = JsonUtil.toEntity(resultStr, type);
+ HashMap<String, Object> resultMap = JsonUtil.toEntity(resultStr,
+ type);
setJobInstanceIdAndUri(instance, resultMap);
} catch (ResourceAccessException e) {
- LOGGER.error("Your url may be wrong. Please check {}.\n {}", uri, e.getMessage());
+ LOGGER.error("Your url may be wrong. Please check {}.\n {}", uri, e
+ .getMessage());
} catch (HttpClientErrorException e) {
- LOGGER.warn("sessionId({}) appId({}) {}.", instance.getSessionId(), instance.getAppId(), e.getMessage());
+ LOGGER.warn("sessionId({}) appId({}) {}.", instance.getSessionId(),
+ instance.getAppId(), e.getMessage());
setStateByYarn(instance, e);
} catch (Exception e) {
LOGGER.error(e.getMessage());
@@ -477,11 +518,14 @@ public class JobServiceImpl implements JobService {
}
- private void setStateByYarn(JobInstanceBean instance, HttpClientErrorException e) {
+ private void setStateByYarn(JobInstanceBean instance,
+ HttpClientErrorException e) {
if (!checkStatus(instance, e)) {
int code = e.getStatusCode().value();
- boolean match = (code == 400 || code == 404) && instance.getAppId() != null;
- //this means your url is correct,but your param is wrong or livy session may be overdue.
+ boolean match = (code == 400 || code == 404)
+ && instance.getAppId() != null;
+ //this means your url is correct,but your param is wrong or livy
+ //session may be overdue.
if (match) {
setStateByYarn(instance);
}
@@ -490,21 +534,26 @@ public class JobServiceImpl implements JobService {
}
/**
- * Check instance status in case that session id is overdue and app id is null and so we cannot update instance state.
+ * Check instance status in case that session id is overdue and app id is
+ * null and so we cannot update instance state
+ * .
*
* @param instance job instance bean
* @param e HttpClientErrorException
* @return boolean
*/
- private boolean checkStatus(JobInstanceBean instance, HttpClientErrorException e) {
+ private boolean checkStatus(JobInstanceBean instance,
+ HttpClientErrorException e) {
int code = e.getStatusCode().value();
String appId = instance.getAppId();
String responseBody = e.getResponseBodyAsString();
Long sessionId = instance.getSessionId();
sessionId = sessionId != null ? sessionId : -1;
- // If code is 404 and appId is null and response body is like 'Session {id} not found',
- // this means instance may not be scheduled for a long time by spark for too many tasks. It may be dead.
- if (code == 404 && appId == null && (responseBody != null && responseBody.contains(sessionId.toString()))) {
+ // If code is 404 and appId is null and response body is like 'Session
+ // {id} not found',this means instance may not be scheduled for
+ // a long time by spark for too many tasks. It may be dead.
+ if (code == 404 && appId == null && (responseBody != null &&
+ responseBody.contains(sessionId.toString()))) {
instance.setState(DEAD);
instance.setDeleted(true);
instanceRepo.save(instance);
@@ -514,7 +563,8 @@ public class JobServiceImpl implements JobService {
}
private void setStateByYarn(JobInstanceBean instance) {
- LOGGER.warn("Spark session {} may be overdue! Now we use yarn to update state.", instance.getSessionId());
+ LOGGER.warn("Spark session {} may be overdue! " +
+ "Now we use yarn to update state.", instance.getSessionId());
String yarnUrl = env.getProperty("yarn.uri");
boolean success = YarnNetUtil.update(yarnUrl, instance);
if (!success) {
@@ -527,34 +577,43 @@ public class JobServiceImpl implements JobService {
}
- private void setJobInstanceIdAndUri(JobInstanceBean instance, HashMap<String, Object> resultMap) {
+ private void setJobInstanceIdAndUri(JobInstanceBean instance, HashMap<String
+ , Object> resultMap) {
if (resultMap != null) {
Object state = resultMap.get("state");
Object appId = resultMap.get("appId");
- instance.setState(state == null ? null : LivySessionStates.State.valueOf(state.toString().toUpperCase()));
+ instance.setState(state == null ? null : LivySessionStates.State
+ .valueOf(state.toString().toUpperCase
+ ()));
instance.setAppId(appId == null ? null : appId.toString());
- instance.setAppUri(appId == null ? null : env.getProperty("yarn.uri") + "/cluster/app/" + appId);
+ instance.setAppUri(appId == null ? null : env
+ .getProperty("yarn.uri") + " /cluster/app/ " + appId);
instanceRepo.save(instance);
}
}
public Boolean isJobHealthy(Long jobId) {
Pageable pageable = new PageRequest(0, 1, Sort.Direction.DESC, "tms");
- List<JobInstanceBean> instances = instanceRepo.findByJobId(jobId, pageable);
- return !CollectionUtils.isEmpty(instances) && LivySessionStates.isHealthy(instances.get(0).getState());
+ List<JobInstanceBean> instances = instanceRepo.findByJobId(jobId,
+ pageable);
+ return !CollectionUtils.isEmpty(instances) && LivySessionStates
+ .isHealthy(instances.get(0).getState());
}
@Override
public String getJobHdfsSinksPath(String jobName, long timestamp) {
- List<AbstractJob> jobList = jobRepo.findByJobNameAndDeleted(jobName, false);
+ List<AbstractJob> jobList = jobRepo.findByJobNameAndDeleted(
+ jobName, false);
if (jobList.size() == 0) {
return null;
}
if (jobList.get(0).getType().toLowerCase().equals("batch")) {
- return getSinksPath(ENV_BATCH) + "/" + jobName + "/" + timestamp + "";
+ return getSinksPath(ENV_BATCH)
+ + "/" + jobName + "/" + timestamp + "";
}
- return getSinksPath(ENV_STREAMING) + "/" + jobName + "/" + timestamp + "";
+ return getSinksPath(ENV_STREAMING)
+ + "/" + jobName + "/" + timestamp + "";
}
private String getSinksPath(String jsonString) {
@@ -563,8 +622,10 @@ public class JobServiceImpl implements JobService {
JSONArray persistArray = obj.getJSONArray("sinks");
for (int i = 0; i < persistArray.length(); i++) {
Object type = persistArray.getJSONObject(i).get("type");
- if (type instanceof String && "hdfs".equalsIgnoreCase(String.valueOf(type))) {
- return persistArray.getJSONObject(i).getJSONObject("config").getString("path");
+ if (type instanceof String
+ && "hdfs".equalsIgnoreCase(String.valueOf(type))) {
+ return persistArray.getJSONObject(i)
+ .getJSONObject("config").getString("path");
}
}
http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/3bbbcb32/service/src/main/java/org/apache/griffin/core/job/SparkSubmitJob.java
----------------------------------------------------------------------
diff --git a/service/src/main/java/org/apache/griffin/core/job/SparkSubmitJob.java b/service/src/main/java/org/apache/griffin/core/job/SparkSubmitJob.java
index 19e746f..64478b8 100644
--- a/service/src/main/java/org/apache/griffin/core/job/SparkSubmitJob.java
+++ b/service/src/main/java/org/apache/griffin/core/job/SparkSubmitJob.java
@@ -68,7 +68,8 @@ import org.springframework.web.client.RestTemplate;
@DisallowConcurrentExecution
@Component
public class SparkSubmitJob implements Job {
- private static final Logger LOGGER = LoggerFactory.getLogger(SparkSubmitJob.class);
+ private static final Logger LOGGER =
+ LoggerFactory.getLogger(SparkSubmitJob.class);
@Autowired
private JobInstanceRepo jobInstanceRepo;
@@ -99,7 +100,8 @@ public class SparkSubmitJob implements Job {
}
}
- private void updateJobInstanceState(JobExecutionContext context) throws IOException {
+ private void updateJobInstanceState(JobExecutionContext context) throws
+ IOException {
SimpleTrigger simpleTrigger = (SimpleTrigger) context.getTrigger();
int repeatCount = simpleTrigger.getRepeatCount();
int fireCount = simpleTrigger.getTimesTriggered();
@@ -111,10 +113,13 @@ public class SparkSubmitJob implements Job {
private String post2Livy() {
String result = null;
try {
- result = restTemplate.postForObject(livyUri, livyConfMap, String.class);
+ result = restTemplate.postForObject(livyUri, livyConfMap,
+ String.class);
LOGGER.info(result);
} catch (HttpClientErrorException e) {
- LOGGER.error("Post to livy ERROR. \n {} {}", e.getMessage(), e.getResponseBodyAsString());
+ LOGGER.error("Post to livy ERROR. \n {} {}",
+ e.getMessage(),
+ e.getResponseBodyAsString());
} catch (Exception e) {
LOGGER.error("Post to livy ERROR. {}", e.getMessage());
}
@@ -126,7 +131,8 @@ public class SparkSubmitJob implements Job {
return true;
}
for (SegmentPredicate segPredicate : predicates) {
- Predicator predicator = PredicatorFactory.newPredicateInstance(segPredicate);
+ Predicator predicator = PredicatorFactory
+ .newPredicateInstance(segPredicate);
try {
if (predicator != null && !predicator.predicate()) {
return false;
@@ -141,11 +147,14 @@ public class SparkSubmitJob implements Job {
private void initParam(JobDetail jd) throws IOException {
mPredicates = new ArrayList<>();
- jobInstance = jobInstanceRepo.findByPredicateName(jd.getJobDataMap().getString(PREDICATE_JOB_NAME));
- measure = toEntity(jd.getJobDataMap().getString(MEASURE_KEY), GriffinMeasure.class);
+ jobInstance = jobInstanceRepo.findByPredicateName(jd.getJobDataMap()
+ .getString(PREDICATE_JOB_NAME));
+ measure = toEntity(jd.getJobDataMap().getString(MEASURE_KEY),
+ GriffinMeasure.class);
livyUri = env.getProperty("livy.uri");
setPredicates(jd.getJobDataMap().getString(PREDICATES_KEY));
- // in order to keep metric name unique, we set job name as measure name at present
+ // in order to keep metric name unique, we set job name
+ // as measure name at present
measure.setName(jd.getJobDataMap().getString(JOB_NAME));
}
@@ -154,8 +163,9 @@ public class SparkSubmitJob implements Job {
if (StringUtils.isEmpty(json)) {
return;
}
- List<Map<String, Object>> maps = toEntity(json, new TypeReference<List<Map>>() {
- });
+ List<Map<String, Object>> maps = toEntity(json,
+ new TypeReference<List<Map>>() {
+ });
for (Map<String, Object> map : maps) {
SegmentPredicate sp = new SegmentPredicate();
sp.setType((String) map.get("type"));
@@ -195,8 +205,10 @@ public class SparkSubmitJob implements Job {
}
- private void saveJobInstance(JobDetail jd) throws SchedulerException, IOException {
- // If result is null, it may livy uri is wrong or livy parameter is wrong.
+ private void saveJobInstance(JobDetail jd) throws SchedulerException,
+ IOException {
+ // If result is null, it may livy uri is wrong
+ // or livy parameter is wrong.
String result = post2Livy();
String group = jd.getKey().getGroup();
String name = jd.getKey().getName();
@@ -205,9 +217,11 @@ public class SparkSubmitJob implements Job {
saveJobInstance(result, FOUND);
}
- private void saveJobInstance(String result, State state) throws IOException {
- TypeReference<HashMap<String, Object>> type = new TypeReference<HashMap<String, Object>>() {
- };
+ private void saveJobInstance(String result, State state)
+ throws IOException {
+ TypeReference<HashMap<String, Object>> type =
+ new TypeReference<HashMap<String, Object>>() {
+ };
Map<String, Object> resultMap = null;
if (result != null) {
resultMap = toEntity(result, type);
@@ -223,8 +237,10 @@ public class SparkSubmitJob implements Job {
Object status = resultMap.get("state");
Object id = resultMap.get("id");
Object appId = resultMap.get("appId");
- jobInstance.setState(status == null ? null : State.valueOf(status.toString().toUpperCase()));
- jobInstance.setSessionId(id == null ? null : Long.parseLong(id.toString()));
+ jobInstance.setState(status == null ? null : State.valueOf(status
+ .toString().toUpperCase()));
+ jobInstance.setSessionId(id == null ? null : Long.parseLong(id
+ .toString()));
jobInstance.setAppId(appId == null ? null : appId.toString());
}
}