You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@griffin.apache.org by gu...@apache.org on 2019/08/20 13:59:23 UTC
[griffin] branch master updated: [GRIFFIN-271] Before 0.6.0 milestone release, We need to check and tr…
This is an automated email from the ASF dual-hosted git repository.
guoyp pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/griffin.git
The following commit(s) were added to refs/heads/master by this push:
new 255f147 [GRIFFIN-271] Before 0.6.0 milestone release, We need to check and tr…
255f147 is described below
commit 255f147c4484e9077083bc74b0a3af9bd83d4afb
Author: Eugene <li...@apache.org>
AuthorDate: Tue Aug 20 21:59:03 2019 +0800
[GRIFFIN-271] Before 0.6.0 milestone release, We need to check and tr…
…im current codes against apache griffin dev guide
trim current java code by dev guide document
Author: Eugene <li...@apache.org>
Closes #522 from toyboxman/g271.
---
.../apache/griffin/core/GriffinWebApplication.java | 4 +-
.../griffin/core/common/SimpleCORSFilter.java | 6 +-
.../griffin/core/config/EclipseLinkJpaConfig.java | 8 +-
.../org/apache/griffin/core/config/EnvConfig.java | 10 +-
.../apache/griffin/core/config/LoginConfig.java | 2 +-
.../griffin/core/config/PropertiesConfig.java | 25 ++--
.../griffin/core/config/SchedulerConfig.java | 3 +-
.../griffin/core/event/GriffinEventManager.java | 10 +-
.../org/apache/griffin/core/event/JobEvent.java | 24 ++--
.../core/exception/GriffinExceptionHandler.java | 16 +--
.../core/exception/GriffinExceptionMessage.java | 14 +--
.../griffin/core/job/BatchJobOperatorImpl.java | 56 ++++-----
.../griffin/core/job/FileExistPredicator.java | 11 +-
.../org/apache/griffin/core/job/JobController.java | 24 ++--
.../org/apache/griffin/core/job/JobInstance.java | 77 ++++++-------
.../org/apache/griffin/core/job/JobOperator.java | 6 +-
.../apache/griffin/core/job/JobServiceImpl.java | 7 +-
.../griffin/core/job/LivyTaskSubmitHelper.java | 128 ++++++++++-----------
.../org/apache/griffin/core/job/Predicator.java | 10 ++
.../apache/griffin/core/job/SparkSubmitJob.java | 85 ++++++++------
.../griffin/core/job/StreamingJobOperatorImpl.java | 70 +++++------
.../griffin/core/job/entity/AbstractJob.java | 23 ++--
.../griffin/core/job/entity/JobDataSegment.java | 6 +-
.../griffin/core/job/entity/JobInstanceBean.java | 2 +-
.../griffin/core/job/entity/LivySessionStates.java | 14 +--
.../griffin/core/job/entity/SegmentPredicate.java | 9 +-
.../griffin/core/job/entity/SegmentRange.java | 2 -
.../factory/AutowiringSpringBeanJobFactory.java | 14 ++-
.../core/job/factory/PredicatorFactory.java | 4 +-
.../griffin/core/job/repo/JobInstanceRepo.java | 4 +-
.../org/apache/griffin/core/job/repo/JobRepo.java | 2 +-
.../apache/griffin/core/login/LoginController.java | 2 +-
.../apache/griffin/core/login/LoginService.java | 8 ++
.../core/login/LoginServiceDefaultImpl.java | 3 +-
.../griffin/core/login/LoginServiceLdapImpl.java | 14 +--
.../core/login/ldap/SelfSignedSocketFactory.java | 2 +-
.../core/measure/ExternalMeasureOperatorImpl.java | 4 +-
.../griffin/core/measure/MeasureController.java | 6 +-
.../griffin/core/measure/MeasureOrgService.java | 4 +-
.../core/measure/MeasureOrgServiceImpl.java | 12 +-
.../griffin/core/measure/MeasureServiceImpl.java | 18 +--
.../measure/entity/AbstractAuditableEntity.java | 4 +
.../griffin/core/measure/entity/DataSource.java | 2 +-
.../griffin/core/measure/entity/EvaluateRule.java | 2 +-
.../griffin/core/measure/entity/Measure.java | 6 +-
.../apache/griffin/core/measure/entity/Rule.java | 4 +-
.../core/measure/entity/StreamingPreProcess.java | 4 +-
.../griffin/core/measure/repo/MeasureRepo.java | 6 +-
.../core/metastore/hive/HiveMetaStoreProxy.java | 4 +-
.../metastore/hive/HiveMetaStoreServiceImpl.java | 24 ++--
.../hive/HiveMetaStoreServiceJdbcImpl.java | 44 +++----
.../metastore/kafka/KafkaSchemaController.java | 2 +-
.../metastore/kafka/KafkaSchemaServiceImpl.java | 14 +--
.../griffin/core/metric/MetricController.java | 16 +--
.../griffin/core/metric/MetricServiceImpl.java | 34 +++---
.../apache/griffin/core/metric/MetricStore.java | 2 +-
.../griffin/core/metric/MetricStoreImpl.java | 86 +++++++-------
.../griffin/core/metric/model/MetricValue.java | 10 +-
.../java/org/apache/griffin/core/util/FSUtil.java | 15 +--
.../org/apache/griffin/core/util/FileUtil.java | 4 +-
.../org/apache/griffin/core/util/JsonUtil.java | 16 +--
.../org/apache/griffin/core/util/MeasureUtil.java | 10 +-
.../apache/griffin/core/util/PropertiesUtil.java | 4 +-
.../org/apache/griffin/core/util/TimeUtil.java | 18 +--
.../org/apache/griffin/core/util/YarnNetUtil.java | 14 +--
.../griffin/core/job/SparkSubmitJobTest.java | 7 +-
66 files changed, 565 insertions(+), 536 deletions(-)
diff --git a/service/src/main/java/org/apache/griffin/core/GriffinWebApplication.java b/service/src/main/java/org/apache/griffin/core/GriffinWebApplication.java
index 33fc049..62bdc1f 100644
--- a/service/src/main/java/org/apache/griffin/core/GriffinWebApplication.java
+++ b/service/src/main/java/org/apache/griffin/core/GriffinWebApplication.java
@@ -18,7 +18,6 @@ under the License.
*/
package org.apache.griffin.core;
-
import org.apache.griffin.core.common.SimpleCORSFilter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -27,12 +26,11 @@ import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.Bean;
import org.springframework.scheduling.annotation.EnableScheduling;
-
@SpringBootApplication
@EnableScheduling
public class GriffinWebApplication {
private static final Logger LOGGER = LoggerFactory
- .getLogger(GriffinWebApplication.class);
+ .getLogger(GriffinWebApplication.class);
public static void main(String[] args) {
SpringApplication.run(GriffinWebApplication.class, args);
diff --git a/service/src/main/java/org/apache/griffin/core/common/SimpleCORSFilter.java b/service/src/main/java/org/apache/griffin/core/common/SimpleCORSFilter.java
index bd39188..6c8e8c3 100644
--- a/service/src/main/java/org/apache/griffin/core/common/SimpleCORSFilter.java
+++ b/service/src/main/java/org/apache/griffin/core/common/SimpleCORSFilter.java
@@ -32,14 +32,14 @@ public class SimpleCORSFilter implements Filter {
public void doFilter(final ServletRequest req,
final ServletResponse res,
final FilterChain chain)
- throws IOException, ServletException {
+ throws IOException, ServletException {
HttpServletResponse response = (HttpServletResponse) res;
response.setHeader("Access-Control-Allow-Origin", "*");
response.setHeader("Access-Control-Allow-Methods",
- "POST, GET, OPTIONS, DELETE,PUT");
+ "POST, GET, OPTIONS, DELETE,PUT");
response.setHeader("Access-Control-Max-Age", "3600");
response.setHeader("Access-Control-Allow-Headers",
- "X-PINGOTHER, Origin, X-Requested-With, Content-Type, Accept");
+ "X-PINGOTHER, Origin, X-Requested-With, Content-Type, Accept");
chain.doFilter(req, res);
}
diff --git a/service/src/main/java/org/apache/griffin/core/config/EclipseLinkJpaConfig.java b/service/src/main/java/org/apache/griffin/core/config/EclipseLinkJpaConfig.java
index 1493569..1463d54 100644
--- a/service/src/main/java/org/apache/griffin/core/config/EclipseLinkJpaConfig.java
+++ b/service/src/main/java/org/apache/griffin/core/config/EclipseLinkJpaConfig.java
@@ -37,9 +37,9 @@ import org.springframework.transaction.jta.JtaTransactionManager;
@ComponentScan("org.apache.griffin.core")
public class EclipseLinkJpaConfig extends JpaBaseConfiguration {
protected EclipseLinkJpaConfig(
- DataSource ds, JpaProperties properties,
- ObjectProvider<JtaTransactionManager> jtm,
- ObjectProvider<TransactionManagerCustomizers> tmc) {
+ DataSource ds, JpaProperties properties,
+ ObjectProvider<JtaTransactionManager> jtm,
+ ObjectProvider<TransactionManagerCustomizers> tmc) {
super(ds, properties, jtm, tmc);
}
@@ -53,7 +53,7 @@ public class EclipseLinkJpaConfig extends JpaBaseConfiguration {
Map<String, Object> map = new HashMap<>();
map.put(PersistenceUnitProperties.WEAVING, "false");
map.put(PersistenceUnitProperties.DDL_GENERATION,
- "create-or-extend-tables");
+ "create-or-extend-tables");
return map;
}
}
diff --git a/service/src/main/java/org/apache/griffin/core/config/EnvConfig.java b/service/src/main/java/org/apache/griffin/core/config/EnvConfig.java
index 8c075a4..ef303a1 100644
--- a/service/src/main/java/org/apache/griffin/core/config/EnvConfig.java
+++ b/service/src/main/java/org/apache/griffin/core/config/EnvConfig.java
@@ -35,7 +35,7 @@ import org.springframework.core.io.ClassPathResource;
public class EnvConfig {
private static final Logger LOGGER = LoggerFactory
- .getLogger(EnvConfig.class);
+ .getLogger(EnvConfig.class);
public static String ENV_BATCH;
public static String ENV_STREAMING;
@@ -47,7 +47,7 @@ public class EnvConfig {
* @throws IOException io exception
*/
private static String readEnvFromResource(String path)
- throws IOException {
+ throws IOException {
if (path == null) {
LOGGER.warn("Parameter path is null.");
return null;
@@ -75,7 +75,7 @@ public class EnvConfig {
* @throws IOException io exception
*/
private static String readEnvFromAbsolutePath(String path)
- throws IOException {
+ throws IOException {
if (path == null) {
LOGGER.warn("Parameter path is null.");
return null;
@@ -103,7 +103,7 @@ public class EnvConfig {
* @throws IOException io exception
*/
static String getBatchEnv(String name, String defaultPath, String location)
- throws IOException {
+ throws IOException {
if (ENV_BATCH != null) {
return ENV_BATCH;
}
@@ -121,7 +121,7 @@ public class EnvConfig {
static String getStreamingEnv(String name,
String defaultPath,
String location)
- throws IOException {
+ throws IOException {
if (ENV_STREAMING != null) {
return ENV_STREAMING;
}
diff --git a/service/src/main/java/org/apache/griffin/core/config/LoginConfig.java b/service/src/main/java/org/apache/griffin/core/config/LoginConfig.java
index 8c9e4a2..c362755 100644
--- a/service/src/main/java/org/apache/griffin/core/config/LoginConfig.java
+++ b/service/src/main/java/org/apache/griffin/core/config/LoginConfig.java
@@ -53,7 +53,7 @@ public class LoginConfig {
return new LoginServiceDefaultImpl();
case "ldap":
return new LoginServiceLdapImpl(url, email, searchBase,
- searchPattern, sslSkipVerify, bindDN, bindPassword);
+ searchPattern, sslSkipVerify, bindDN, bindPassword);
default:
return null;
}
diff --git a/service/src/main/java/org/apache/griffin/core/config/PropertiesConfig.java b/service/src/main/java/org/apache/griffin/core/config/PropertiesConfig.java
index ebbd6cf..53c0dfc 100644
--- a/service/src/main/java/org/apache/griffin/core/config/PropertiesConfig.java
+++ b/service/src/main/java/org/apache/griffin/core/config/PropertiesConfig.java
@@ -42,11 +42,18 @@ import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.io.ClassPathResource;
+/**
+ * PropertiesConfig is responsible for initializing configuration objects
+ * from property files.
+ *
+ * @see EnvConfig
+ * @see org.apache.griffin.core.util.PropertiesUtil
+ */
@Configuration
public class PropertiesConfig {
private static final Logger LOGGER = LoggerFactory
- .getLogger(PropertiesConfig.class);
+ .getLogger(PropertiesConfig.class);
public static Map<String, Object> livyConfMap;
@@ -55,12 +62,12 @@ public class PropertiesConfig {
private String envLocation;
public PropertiesConfig(
- @Value("${external.config.location}") String configLocation,
- @Value("${external.env.location}") String envLocation) {
+ @Value("${external.config.location}") String configLocation,
+ @Value("${external.env.location}") String envLocation) {
LOGGER.info("external.config.location : {}",
- configLocation != null ? configLocation : "null");
+ configLocation != null ? configLocation : "null");
LOGGER.info("external.env.location : {}",
- envLocation != null ? envLocation : "null");
+ envLocation != null ? envLocation : "null");
this.configLocation = configLocation;
this.envLocation = envLocation;
}
@@ -93,9 +100,9 @@ public class PropertiesConfig {
}
private static void genLivyConf(
- String name,
- String defaultPath,
- String location) throws IOException {
+ String name,
+ String defaultPath,
+ String location) throws IOException {
if (livyConfMap != null) {
return;
}
@@ -117,7 +124,7 @@ public class PropertiesConfig {
* @throws IOException io exception
*/
private static Map<String, Object> readPropertiesFromResource(String path)
- throws IOException {
+ throws IOException {
if (path == null) {
LOGGER.warn("Parameter path is null.");
return null;
diff --git a/service/src/main/java/org/apache/griffin/core/config/SchedulerConfig.java b/service/src/main/java/org/apache/griffin/core/config/SchedulerConfig.java
index 7b6af51..843e321 100644
--- a/service/src/main/java/org/apache/griffin/core/config/SchedulerConfig.java
+++ b/service/src/main/java/org/apache/griffin/core/config/SchedulerConfig.java
@@ -44,7 +44,7 @@ public class SchedulerConfig {
@Bean
public JobFactory jobFactory(ApplicationContext applicationContext) {
AutowiringSpringBeanJobFactory jobFactory =
- new AutowiringSpringBeanJobFactory();
+ new AutowiringSpringBeanJobFactory();
jobFactory.setApplicationContext(applicationContext);
return jobFactory;
}
@@ -60,5 +60,4 @@ public class SchedulerConfig {
return factory;
}
-
}
diff --git a/service/src/main/java/org/apache/griffin/core/event/GriffinEventManager.java b/service/src/main/java/org/apache/griffin/core/event/GriffinEventManager.java
index 2ebad57..e0dee89 100644
--- a/service/src/main/java/org/apache/griffin/core/event/GriffinEventManager.java
+++ b/service/src/main/java/org/apache/griffin/core/event/GriffinEventManager.java
@@ -43,11 +43,11 @@ public class GriffinEventManager {
void initializeListeners() {
List<GriffinHook> eventListeners = new ArrayList<>();
applicationContext.getBeansOfType(GriffinHook.class)
- .forEach((beanName, listener) -> {
- if (enabledListeners.contains(beanName)) {
- eventListeners.add(listener);
- }
- });
+ .forEach((beanName, listener) -> {
+ if (enabledListeners.contains(beanName)) {
+ eventListeners.add(listener);
+ }
+ });
this.eventListeners = eventListeners;
}
diff --git a/service/src/main/java/org/apache/griffin/core/event/JobEvent.java b/service/src/main/java/org/apache/griffin/core/event/JobEvent.java
index 5fdea0f..467cf28 100644
--- a/service/src/main/java/org/apache/griffin/core/event/JobEvent.java
+++ b/service/src/main/java/org/apache/griffin/core/event/JobEvent.java
@@ -32,29 +32,29 @@ public class JobEvent extends GriffinAbstractEvent<AbstractJob> {
public static JobEvent yieldJobEventBeforeCreation(AbstractJob source) {
return new JobEvent(source,
- EventType.CREATION_EVENT,
- EventSourceType.JOB,
- EventPointcutType.BEFORE);
+ EventType.CREATION_EVENT,
+ EventSourceType.JOB,
+ EventPointcutType.BEFORE);
}
public static JobEvent yieldJobEventAfterCreation(AbstractJob source) {
return new JobEvent(source,
- EventType.CREATION_EVENT,
- EventSourceType.JOB,
- EventPointcutType.AFTER);
+ EventType.CREATION_EVENT,
+ EventSourceType.JOB,
+ EventPointcutType.AFTER);
}
public static JobEvent yieldJobEventBeforeRemoval(AbstractJob source) {
return new JobEvent(source,
- EventType.REMOVAL_EVENT,
- EventSourceType.JOB,
- EventPointcutType.BEFORE);
+ EventType.REMOVAL_EVENT,
+ EventSourceType.JOB,
+ EventPointcutType.BEFORE);
}
public static JobEvent yieldJobEventAfterRemoval(AbstractJob source) {
return new JobEvent(source,
- EventType.REMOVAL_EVENT,
- EventSourceType.JOB,
- EventPointcutType.AFTER);
+ EventType.REMOVAL_EVENT,
+ EventSourceType.JOB,
+ EventPointcutType.AFTER);
}
}
diff --git a/service/src/main/java/org/apache/griffin/core/exception/GriffinExceptionHandler.java b/service/src/main/java/org/apache/griffin/core/exception/GriffinExceptionHandler.java
index d987da7..5fb88fb 100644
--- a/service/src/main/java/org/apache/griffin/core/exception/GriffinExceptionHandler.java
+++ b/service/src/main/java/org/apache/griffin/core/exception/GriffinExceptionHandler.java
@@ -34,28 +34,28 @@ public class GriffinExceptionHandler {
@SuppressWarnings("rawtypes")
@ExceptionHandler(GriffinException.ServiceException.class)
public ResponseEntity handleGriffinExceptionOfServer(
- HttpServletRequest request,
- GriffinException.ServiceException e) {
+ HttpServletRequest request,
+ GriffinException.ServiceException e) {
String message = e.getMessage();
Throwable cause = e.getCause();
GriffinExceptionResponse body = new GriffinExceptionResponse(
- HttpStatus.INTERNAL_SERVER_ERROR,
- message, request.getRequestURI(), cause.getClass().getName());
+ HttpStatus.INTERNAL_SERVER_ERROR,
+ message, request.getRequestURI(), cause.getClass().getName());
return new ResponseEntity<>(body, HttpStatus.INTERNAL_SERVER_ERROR);
}
@SuppressWarnings("rawtypes")
@ExceptionHandler(GriffinException.class)
public ResponseEntity handleGriffinExceptionOfClient(
- HttpServletRequest request, GriffinException e) {
+ HttpServletRequest request, GriffinException e) {
ResponseStatus responseStatus = AnnotationUtils.findAnnotation(
- e.getClass(), ResponseStatus.class);
+ e.getClass(), ResponseStatus.class);
HttpStatus status = responseStatus.code();
String code = e.getMessage();
GriffinExceptionMessage message = GriffinExceptionMessage
- .valueOf(Integer.valueOf(code));
+ .valueOf(Integer.valueOf(code));
GriffinExceptionResponse body = new GriffinExceptionResponse(
- status, message, request.getRequestURI());
+ status, message, request.getRequestURI());
return new ResponseEntity<>(body, status);
}
}
diff --git a/service/src/main/java/org/apache/griffin/core/exception/GriffinExceptionMessage.java b/service/src/main/java/org/apache/griffin/core/exception/GriffinExceptionMessage.java
index 4f65248..5a39c66 100644
--- a/service/src/main/java/org/apache/griffin/core/exception/GriffinExceptionMessage.java
+++ b/service/src/main/java/org/apache/griffin/core/exception/GriffinExceptionMessage.java
@@ -23,9 +23,9 @@ public enum GriffinExceptionMessage {
//400, "Bad Request"
MEASURE_TYPE_DOES_NOT_MATCH(40001, "Property 'measure.type' does not match"
- + "the type of measure in request body"),
+ + "the type of measure in request body"),
INVALID_CONNECTOR_NAME(40002, "Property 'name' in 'connectors' " +
- "field is invalid"),
+ "field is invalid"),
MISSING_METRIC_NAME(40003, "Missing property 'metricName'"),
INVALID_JOB_NAME(40004, "Property 'job.name' is invalid"),
MISSING_BASELINE_CONFIG(40005, "Missing 'as.baseline' config in 'data.segments'"),
@@ -45,9 +45,9 @@ public enum GriffinExceptionMessage {
JOB_TYPE_DOES_NOT_SUPPORT(40011, "We don't support such job type."),
STREAMING_JOB_IS_RUNNING(40012, "There is no need to start again " +
- "as job is RUNNING."),
+ "as job is RUNNING."),
STREAMING_JOB_IS_STOPPED(40012, "There is no need to stop again " +
- "as job is STOPPED."),
+ "as job is STOPPED."),
JOB_IS_NOT_SCHEDULED(40013, "The job isn't scheduled."),
JOB_IS_NOT_IN_PAUSED_STATUS(40014, "The job isn't in paused status."),
@@ -66,9 +66,9 @@ public enum GriffinExceptionMessage {
NO_SUCH_JOB_ACTION(40404, "No such job action"),
JOB_KEY_DOES_NOT_EXIST(40405, "Job key which consists of " +
- "group and name does not exist."),
+ "group and name does not exist."),
ORGANIZATION_NAME_DOES_NOT_EXIST(40406, "Organization name " +
- "does not exist"),
+ "does not exist"),
HDFS_FILE_NOT_EXIST(40407, "Hadoop data file not exist"),
@@ -101,7 +101,7 @@ public enum GriffinExceptionMessage {
}
}
throw new IllegalArgumentException("No matching constant for ["
- + code + "]");
+ + code + "]");
}
diff --git a/service/src/main/java/org/apache/griffin/core/job/BatchJobOperatorImpl.java b/service/src/main/java/org/apache/griffin/core/job/BatchJobOperatorImpl.java
index 0210822..cdaa68c 100644
--- a/service/src/main/java/org/apache/griffin/core/job/BatchJobOperatorImpl.java
+++ b/service/src/main/java/org/apache/griffin/core/job/BatchJobOperatorImpl.java
@@ -71,7 +71,7 @@ import org.springframework.util.StringUtils;
@Service
public class BatchJobOperatorImpl implements JobOperator {
private static final Logger LOGGER = LoggerFactory
- .getLogger(BatchJobOperatorImpl.class);
+ .getLogger(BatchJobOperatorImpl.class);
@Autowired
private SchedulerFactoryBean factory;
@@ -85,7 +85,7 @@ public class BatchJobOperatorImpl implements JobOperator {
@Override
@Transactional(rollbackFor = Exception.class)
public AbstractJob add(AbstractJob job, GriffinMeasure measure)
- throws Exception {
+ throws Exception {
validateParams(job, measure);
String qName = jobService.getQuartzName(job);
String qGroup = jobService.getQuartzGroup();
@@ -120,20 +120,20 @@ public class BatchJobOperatorImpl implements JobOperator {
TriggerState state = getTriggerState(name, group);
if (state == null) {
throw new GriffinException.BadRequestException(
- JOB_IS_NOT_SCHEDULED);
+ JOB_IS_NOT_SCHEDULED);
}
/* If job is not in paused state,we can't start it
as it may be RUNNING.*/
if (state != PAUSED) {
throw new GriffinException.BadRequestException
- (JOB_IS_NOT_IN_PAUSED_STATUS);
+ (JOB_IS_NOT_IN_PAUSED_STATUS);
}
JobKey jobKey = jobKey(name, group);
try {
factory.getScheduler().resumeJob(jobKey);
} catch (SchedulerException e) {
throw new GriffinException.ServiceException(
- "Failed to start job.", e);
+ "Failed to start job.", e);
}
}
@@ -151,14 +151,14 @@ public class BatchJobOperatorImpl implements JobOperator {
@Override
public JobHealth getHealth(JobHealth jobHealth, AbstractJob job)
- throws SchedulerException {
+ throws SchedulerException {
List<? extends Trigger> triggers = jobService
- .getTriggers(job.getName(), job.getGroup());
+ .getTriggers(job.getName(), job.getGroup());
if (!CollectionUtils.isEmpty(triggers)) {
jobHealth.setJobCount(jobHealth.getJobCount() + 1);
if (jobService.isJobHealthy(job.getId())) {
jobHealth.setHealthyJobCount(
- jobHealth.getHealthyJobCount() + 1);
+ jobHealth.getHealthyJobCount() + 1);
}
}
return jobHealth;
@@ -166,7 +166,7 @@ public class BatchJobOperatorImpl implements JobOperator {
@Override
public JobState getState(AbstractJob job, String action)
- throws SchedulerException {
+ throws SchedulerException {
JobState jobState = new JobState();
Scheduler scheduler = factory.getScheduler();
if (job.getGroup() == null || job.getName() == null) {
@@ -182,9 +182,9 @@ public class BatchJobOperatorImpl implements JobOperator {
}
private void setTriggerTime(AbstractJob job, JobState jobState)
- throws SchedulerException {
+ throws SchedulerException {
List<? extends Trigger> triggers = jobService
- .getTriggers(job.getName(), job.getGroup());
+ .getTriggers(job.getName(), job.getGroup());
// If triggers are empty, in Griffin it means job is completed whose
// trigger state is NONE or not scheduled.
if (CollectionUtils.isEmpty(triggers)) {
@@ -194,9 +194,9 @@ public class BatchJobOperatorImpl implements JobOperator {
Date nextFireTime = trigger.getNextFireTime();
Date previousFireTime = trigger.getPreviousFireTime();
jobState.setNextFireTime(nextFireTime != null ?
- nextFireTime.getTime() : -1);
+ nextFireTime.getTime() : -1);
jobState.setPreviousFireTime(previousFireTime != null ?
- previousFireTime.getTime() : -1);
+ previousFireTime.getTime() : -1);
}
/**
@@ -225,7 +225,7 @@ public class BatchJobOperatorImpl implements JobOperator {
private TriggerState getTriggerState(String name, String group) {
try {
List<? extends Trigger> triggers = jobService.getTriggers(name,
- group);
+ group);
if (CollectionUtils.isEmpty(triggers)) {
return null;
}
@@ -234,7 +234,7 @@ public class BatchJobOperatorImpl implements JobOperator {
} catch (SchedulerException e) {
LOGGER.error("Failed to delete job", e);
throw new GriffinException
- .ServiceException("Failed to delete job", e);
+ .ServiceException("Failed to delete job", e);
}
}
@@ -254,7 +254,7 @@ public class BatchJobOperatorImpl implements JobOperator {
} catch (Exception e) {
LOGGER.error("Job schedule happens exception.", e);
throw new GriffinException.ServiceException("Job schedule " +
- "happens exception.", e);
+ "happens exception.", e);
}
}
@@ -263,7 +263,7 @@ public class BatchJobOperatorImpl implements JobOperator {
for (JobInstanceBean instance : instances) {
if (!instance.isPredicateDeleted()) {
deleteJob(instance.getPredicateGroup(), instance
- .getPredicateName());
+ .getPredicateName());
instance.setPredicateDeleted(true);
if (instance.getState().equals(LivySessionStates.State.FINDING)) {
instance.setState(LivySessionStates.State.NOT_FOUND);
@@ -277,7 +277,7 @@ public class BatchJobOperatorImpl implements JobOperator {
JobKey jobKey = new JobKey(name, group);
if (!scheduler.checkExists(jobKey)) {
LOGGER.info("Job({},{}) does not exist.", jobKey.getGroup(), jobKey
- .getName());
+ .getName());
return;
}
scheduler.deleteJob(jobKey);
@@ -292,9 +292,9 @@ public class BatchJobOperatorImpl implements JobOperator {
JobKey jobKey = new JobKey(name, group);
if (!scheduler.checkExists(jobKey)) {
LOGGER.warn("Job({},{}) does not exist.", jobKey.getGroup(), jobKey
- .getName());
+ .getName());
throw new GriffinException.NotFoundException
- (JOB_KEY_DOES_NOT_EXIST);
+ (JOB_KEY_DOES_NOT_EXIST);
}
scheduler.pauseJob(jobKey);
}
@@ -326,7 +326,7 @@ public class BatchJobOperatorImpl implements JobOperator {
}
} catch (SchedulerException e) {
LOGGER.error("Failed to pause predicate job({},{}).", pGroup,
- pName);
+ pName);
status = false;
}
return status;
@@ -338,16 +338,16 @@ public class BatchJobOperatorImpl implements JobOperator {
}
if (!isValidCronExpression(job.getCronExpression())) {
throw new GriffinException.BadRequestException
- (INVALID_CRON_EXPRESSION);
+ (INVALID_CRON_EXPRESSION);
}
if (!isValidBaseLine(job.getSegments())) {
throw new GriffinException.BadRequestException
- (MISSING_BASELINE_CONFIG);
+ (MISSING_BASELINE_CONFIG);
}
List<String> names = getConnectorNames(measure);
if (!isValidConnectorNames(job.getSegments(), names)) {
throw new GriffinException.BadRequestException
- (INVALID_CONNECTOR_NAME);
+ (INVALID_CONNECTOR_NAME);
}
}
@@ -371,7 +371,7 @@ public class BatchJobOperatorImpl implements JobOperator {
}
}
LOGGER.warn("Please set segment timestamp baseline " +
- "in as.baseline field.");
+ "in as.baseline field.");
return false;
}
@@ -383,16 +383,16 @@ public class BatchJobOperatorImpl implements JobOperator {
String dcName = segment.getDataConnectorName();
sets.add(dcName);
boolean exist = names.stream().anyMatch(name -> name.equals
- (dcName));
+ (dcName));
if (!exist) {
LOGGER.warn("Param {} is a illegal string. " +
- "Please input one of strings in {}.", dcName, names);
+ "Please input one of strings in {}.", dcName, names);
return false;
}
}
if (sets.size() < segments.size()) {
LOGGER.warn("Connector names in job data segment " +
- "cannot duplicate.");
+ "cannot duplicate.");
return false;
}
return true;
diff --git a/service/src/main/java/org/apache/griffin/core/job/FileExistPredicator.java b/service/src/main/java/org/apache/griffin/core/job/FileExistPredicator.java
index 703a837..1a9209b 100644
--- a/service/src/main/java/org/apache/griffin/core/job/FileExistPredicator.java
+++ b/service/src/main/java/org/apache/griffin/core/job/FileExistPredicator.java
@@ -33,7 +33,7 @@ import org.slf4j.LoggerFactory;
public class FileExistPredicator implements Predicator {
private static final Logger LOGGER = LoggerFactory
- .getLogger(FileExistPredicator.class);
+ .getLogger(FileExistPredicator.class);
private static final String PREDICT_PATH = "path";
private static final String PREDICT_ROOT_PATH = "root.path";
@@ -49,15 +49,14 @@ public class FileExistPredicator implements Predicator {
Map<String, Object> config = predicate.getConfigMap();
String[] paths = null;
String rootPath = null;
- if (config != null && !StringUtils.isEmpty((String) config.get
- (PREDICT_PATH))) {
- paths = ((String) config.get(PREDICT_PATH)).split
- (PATH_CONNECTOR_CHARACTER);
+ if (config != null && !StringUtils.isEmpty((String) config.get(PREDICT_PATH))) {
+ paths = ((String) config.get(PREDICT_PATH))
+ .split(PATH_CONNECTOR_CHARACTER);
rootPath = (String) config.get(PREDICT_ROOT_PATH);
}
if (ArrayUtils.isEmpty(paths) || StringUtils.isEmpty(rootPath)) {
LOGGER.error("Predicate path is null.Please check predicates " +
- "config root.path and path.");
+ "config root.path and path.");
throw new NullPointerException();
}
for (String path : paths) {
diff --git a/service/src/main/java/org/apache/griffin/core/job/JobController.java b/service/src/main/java/org/apache/griffin/core/job/JobController.java
index aca90d2..3b52ee1 100644
--- a/service/src/main/java/org/apache/griffin/core/job/JobController.java
+++ b/service/src/main/java/org/apache/griffin/core/job/JobController.java
@@ -51,7 +51,7 @@ public class JobController {
@RequestMapping(value = "/jobs", method = RequestMethod.GET)
public List<AbstractJob> getJobs(@RequestParam(value = "type",
- defaultValue = "") String type) {
+ defaultValue = "") String type) {
return jobService.getAliveJobs(type);
}
@@ -69,30 +69,30 @@ public class JobController {
@RequestMapping(value = "/jobs/{id}", method = RequestMethod.PUT)
@ResponseStatus(HttpStatus.OK)
public AbstractJob onActions(
- @PathVariable("id") Long jobId,
- @RequestParam String action) throws Exception {
+ @PathVariable("id") Long jobId,
+ @RequestParam String action) throws Exception {
return jobService.onAction(jobId, action);
}
@RequestMapping(value = "/jobs", method = RequestMethod.DELETE)
@ResponseStatus(HttpStatus.NO_CONTENT)
public void deleteJob(@RequestParam("jobName") String jobName)
- throws SchedulerException {
+ throws SchedulerException {
jobService.deleteJob(jobName);
}
@RequestMapping(value = "/jobs/{id}", method = RequestMethod.DELETE)
@ResponseStatus(HttpStatus.NO_CONTENT)
public void deleteJob(@PathVariable("id") Long id)
- throws SchedulerException {
+ throws SchedulerException {
jobService.deleteJob(id);
}
@RequestMapping(value = "/jobs/instances", method = RequestMethod.GET)
public List<JobInstanceBean> findInstancesOfJob(
- @RequestParam("jobId") Long id,
- @RequestParam("page") int page,
- @RequestParam("size") int size) {
+ @RequestParam("jobId") Long id,
+ @RequestParam("page") int page,
+ @RequestParam("size") int size) {
return jobService.findInstancesOfJob(id, page, size);
}
@@ -115,10 +115,10 @@ public class JobController {
InputStreamResource resource = new InputStreamResource(
FSUtil.getMissSampleInputStream(path));
return ResponseEntity.ok().
- header("content-disposition",
- "attachment; filename = sampleMissingData.json")
- .contentType(MediaType.APPLICATION_OCTET_STREAM)
- .body(resource);
+ header("content-disposition",
+ "attachment; filename = sampleMissingData.json")
+ .contentType(MediaType.APPLICATION_OCTET_STREAM)
+ .body(resource);
}
@RequestMapping(value = "/jobs/trigger/{id}", method = RequestMethod.POST)
diff --git a/service/src/main/java/org/apache/griffin/core/job/JobInstance.java b/service/src/main/java/org/apache/griffin/core/job/JobInstance.java
index c721d27..3b0f768 100644
--- a/service/src/main/java/org/apache/griffin/core/job/JobInstance.java
+++ b/service/src/main/java/org/apache/griffin/core/job/JobInstance.java
@@ -78,7 +78,7 @@ import org.springframework.transaction.annotation.Transactional;
@DisallowConcurrentExecution
public class JobInstance implements Job {
private static final Logger LOGGER = LoggerFactory
- .getLogger(JobInstance.class);
+ .getLogger(JobInstance.class);
public static final String MEASURE_KEY = "measure";
public static final String PREDICATES_KEY = "predicts";
public static final String PREDICATE_JOB_NAME = "predicateJobName";
@@ -88,7 +88,7 @@ public class JobInstance implements Job {
public static final String INTERVAL = "interval";
public static final String REPEAT = "repeat";
public static final String CHECK_DONEFILE_SCHEDULE =
- "checkdonefile.schedule";
+ "checkdonefile.schedule";
@Autowired
private SchedulerFactoryBean factory;
@@ -119,7 +119,7 @@ public class JobInstance implements Job {
}
private void initParam(JobExecutionContext context)
- throws SchedulerException {
+ throws SchedulerException {
mPredicates = new ArrayList<>();
JobDetail jobDetail = context.getJobDetail();
Long jobId = jobDetail.getJobDataMap().getLong(GRIFFIN_JOB_ID);
@@ -135,18 +135,17 @@ public class JobInstance implements Job {
@SuppressWarnings("unchecked")
private void setJobStartTime(JobDetail jobDetail)
- throws SchedulerException {
+ throws SchedulerException {
Scheduler scheduler = factory.getScheduler();
JobKey jobKey = jobDetail.getKey();
List<Trigger> triggers =
- (List<Trigger>) scheduler.getTriggersOfJob(jobKey);
+ (List<Trigger>) scheduler.getTriggersOfJob(jobKey);
Date triggerTime = triggers.get(0).getPreviousFireTime();
jobStartTime = triggerTime.getTime();
}
-
private void setSourcesPartitionsAndPredicates(List<DataSource> sources)
- throws Exception {
+ throws Exception {
boolean isFirstBaseline = true;
for (JobDataSegment jds : job.getSegments()) {
if (jds.isAsTsBaseline() && isFirstBaseline) {
@@ -162,17 +161,16 @@ public class JobInstance implements Job {
}
private void setDataSourcePartitions(JobDataSegment jds, DataSource ds)
- throws Exception {
+ throws Exception {
List<DataConnector> connectors = ds.getConnectors();
for (DataConnector dc : connectors) {
setDataConnectorPartitions(jds, dc);
}
}
-
private void setDataConnectorPartitions(
- JobDataSegment jds,
- DataConnector dc) throws Exception {
+ JobDataSegment jds,
+ DataConnector dc) throws Exception {
String dcName = jds.getDataConnectorName();
if (dcName.equals(dc.getName())) {
Long[] sampleTs = genSampleTs(jds.getSegmentRange(), dc);
@@ -193,7 +191,7 @@ public class JobInstance implements Job {
Long range = TimeUtil.str2Long(segRange.getLength());
String unit = dc.getDataUnit();
Long dataUnit = TimeUtil.str2Long(StringUtils.isEmpty(unit) ? dc
- .getDefaultDataUnit() : unit);
+ .getDefaultDataUnit() : unit);
//offset usually is negative
Long dataStartTime = jobStartTime + offset;
if (range < 0) {
@@ -221,8 +219,8 @@ public class JobInstance implements Job {
List<SegmentPredicate> predicates = dc.getPredicates();
for (SegmentPredicate predicate : predicates) {
genConfMap(predicate.getConfigMap(),
- sampleTs,
- dc.getDataTimeZone());
+ sampleTs,
+ dc.getDataTimeZone());
//Do not forget to update origin string config
predicate.setConfigMap(predicate.getConfigMap());
mPredicates.add(predicate);
@@ -234,7 +232,6 @@ public class JobInstance implements Job {
dc.setConfigMap(dc.getConfigMap());
}
-
/**
* @param conf config map
* @param sampleTs collection of data split start timestamp
@@ -244,9 +241,8 @@ public class JobInstance implements Job {
* or like {"path": "/year=2017/month=11/dt=15/hour=09/_DONE
* ,/year=2017/month=11/dt=15/hour=10/_DONE"}
*/
-
private void genConfMap(Map<String, Object> conf, Long[] sampleTs, String
- timezone) {
+ timezone) {
if (conf == null) {
LOGGER.warn("Predicate config is null.");
return;
@@ -261,29 +257,28 @@ public class JobInstance implements Job {
}
for (Long timestamp : sampleTs) {
set.add(TimeUtil.format(value, timestamp,
- TimeUtil.getTimeZone(timezone)));
+ TimeUtil.getTimeZone(timezone)));
}
conf.put(entry.getKey(), StringUtils.join(set,
- PATH_CONNECTOR_CHARACTER));
+ PATH_CONNECTOR_CHARACTER));
}
}
}
@SuppressWarnings("unchecked")
private void createJobInstance(Map<String, Object> confMap)
- throws Exception {
+ throws Exception {
confMap = checkConfMap(confMap != null ? confMap : new HashMap<>());
Map<String, Object> config = (Map<String, Object>) confMap
- .get(CHECK_DONEFILE_SCHEDULE);
+ .get(CHECK_DONEFILE_SCHEDULE);
Long interval = TimeUtil.str2Long((String) config.get(INTERVAL));
Integer repeat = Integer.valueOf(config.get(REPEAT).toString());
String groupName = "PG";
- String jobName = job.getJobName() + "_predicate_" + System
- .currentTimeMillis();
+ String jobName = job.getJobName() + "_predicate_"
+ + System.currentTimeMillis();
TriggerKey tk = triggerKey(jobName, groupName);
if (factory.getScheduler().checkExists(tk)) {
- throw new GriffinException.ConflictException
- (QUARTZ_JOB_ALREADY_EXIST);
+ throw new GriffinException.ConflictException(QUARTZ_JOB_ALREADY_EXIST);
}
String triggerKey = (String) confMap.get(TRIGGER_KEY);
saveJobInstance(jobName, groupName, triggerKey);
@@ -293,7 +288,7 @@ public class JobInstance implements Job {
@SuppressWarnings("unchecked")
Map<String, Object> checkConfMap(Map<String, Object> confMap) {
Map<String, Object> config = (Map<String, Object>) confMap.get
- (CHECK_DONEFILE_SCHEDULE);
+ (CHECK_DONEFILE_SCHEDULE);
String interval = env.getProperty("predicate.job.interval");
interval = interval != null ? interval : "5m";
String repeat = env.getProperty("predicate.job.repeat.count");
@@ -316,38 +311,36 @@ public class JobInstance implements Job {
private void saveJobInstance(String pName, String pGroup, String triggerKey) {
ProcessType type = measure.getProcessType() == BATCH ? BATCH :
- STREAMING;
+ STREAMING;
Long tms = System.currentTimeMillis();
String expired = env.getProperty("jobInstance.expired.milliseconds");
Long expireTms = Long.valueOf(expired != null ? expired : "604800000")
- + tms;
+ + tms;
JobInstanceBean instance = new JobInstanceBean(FINDING, pName, pGroup,
- tms, expireTms, type);
+ tms, expireTms, type);
instance.setJob(job);
instance.setTriggerKey(triggerKey);
instanceRepo.save(instance);
}
-
private void createJobInstance(TriggerKey tk, Long interval, Integer
- repeatCount, String pJobName) throws Exception {
+ repeatCount, String pJobName) throws Exception {
JobDetail jobDetail = addJobDetail(tk, pJobName);
Trigger trigger = genTriggerInstance(tk, jobDetail, interval,
- repeatCount);
+ repeatCount);
factory.getScheduler().scheduleJob(trigger);
}
-
private Trigger genTriggerInstance(TriggerKey tk, JobDetail jd, Long
- interval, Integer repeatCount) {
+ interval, Integer repeatCount) {
return newTrigger().withIdentity(tk).forJob(jd).startNow()
- .withSchedule(simpleSchedule().withIntervalInMilliseconds
- (interval).withRepeatCount(repeatCount))
- .build();
+ .withSchedule(simpleSchedule().withIntervalInMilliseconds
+ (interval).withRepeatCount(repeatCount))
+ .build();
}
private JobDetail addJobDetail(TriggerKey tk, String pJobName)
- throws SchedulerException, IOException {
+ throws SchedulerException, IOException {
Scheduler scheduler = factory.getScheduler();
JobKey jobKey = jobKey(tk.getName(), tk.getGroup());
JobDetail jobDetail;
@@ -356,9 +349,9 @@ public class JobInstance implements Job {
jobDetail = scheduler.getJobDetail(jobKey);
} else {
jobDetail = newJob(SparkSubmitJob.class)
- .storeDurably()
- .withIdentity(jobKey)
- .build();
+ .storeDurably()
+ .withIdentity(jobKey)
+ .build();
}
setJobDataMap(jobDetail, pJobName);
scheduler.addJob(jobDetail, isJobKeyExist);
@@ -366,7 +359,7 @@ public class JobInstance implements Job {
}
private void setJobDataMap(JobDetail jobDetail, String pJobName)
- throws IOException {
+ throws IOException {
JobDataMap dataMap = jobDetail.getJobDataMap();
preProcessMeasure();
String result = toJson(measure);
diff --git a/service/src/main/java/org/apache/griffin/core/job/JobOperator.java b/service/src/main/java/org/apache/griffin/core/job/JobOperator.java
index 81c3b17..05279f9 100644
--- a/service/src/main/java/org/apache/griffin/core/job/JobOperator.java
+++ b/service/src/main/java/org/apache/griffin/core/job/JobOperator.java
@@ -27,7 +27,7 @@ import org.quartz.SchedulerException;
public interface JobOperator {
AbstractJob add(AbstractJob job, GriffinMeasure measure)
- throws Exception;
+ throws Exception;
void start(AbstractJob job) throws Exception;
@@ -36,8 +36,8 @@ public interface JobOperator {
void delete(AbstractJob job) throws SchedulerException;
JobHealth getHealth(JobHealth jobHealth, AbstractJob job)
- throws SchedulerException;
+ throws SchedulerException;
JobState getState(AbstractJob job, String action)
- throws SchedulerException;
+ throws SchedulerException;
}
diff --git a/service/src/main/java/org/apache/griffin/core/job/JobServiceImpl.java b/service/src/main/java/org/apache/griffin/core/job/JobServiceImpl.java
index bd8988c..f41eb2a 100644
--- a/service/src/main/java/org/apache/griffin/core/job/JobServiceImpl.java
+++ b/service/src/main/java/org/apache/griffin/core/job/JobServiceImpl.java
@@ -101,7 +101,6 @@ import org.springframework.stereotype.Service;
import org.springframework.util.CollectionUtils;
import org.springframework.web.client.HttpClientErrorException;
import org.springframework.web.client.ResourceAccessException;
-import org.springframework.web.client.RestTemplate;
@Service
public class JobServiceImpl implements JobService {
@@ -686,9 +685,9 @@ public class JobServiceImpl implements JobService {
JobKey jobKey = jobKey(job.getName(), job.getGroup());
if (scheduler.checkExists(jobKey)) {
Trigger trigger = TriggerBuilder.newTrigger()
- .forJob(jobKey)
- .startNow()
- .build();
+ .forJob(jobKey)
+ .startNow()
+ .build();
scheduler.scheduleJob(trigger);
return trigger.getKey().toString();
} else {
diff --git a/service/src/main/java/org/apache/griffin/core/job/LivyTaskSubmitHelper.java b/service/src/main/java/org/apache/griffin/core/job/LivyTaskSubmitHelper.java
index d9ee9e8..cf50527 100644
--- a/service/src/main/java/org/apache/griffin/core/job/LivyTaskSubmitHelper.java
+++ b/service/src/main/java/org/apache/griffin/core/job/LivyTaskSubmitHelper.java
@@ -19,8 +19,26 @@ under the License.
package org.apache.griffin.core.job;
+import static org.apache.griffin.core.config.PropertiesConfig.livyConfMap;
+import static org.apache.griffin.core.job.entity.LivySessionStates.State.NOT_FOUND;
+import static org.apache.griffin.core.util.JsonUtil.toEntity;
+import static org.apache.griffin.core.util.JsonUtil.toJsonWithFormat;
+
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.core.type.TypeReference;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.atomic.AtomicInteger;
+import javax.annotation.PostConstruct;
+
import org.apache.commons.collections.map.HashedMap;
import org.quartz.JobDetail;
import org.slf4j.Logger;
@@ -35,29 +53,13 @@ import org.springframework.security.kerberos.client.KerberosRestTemplate;
import org.springframework.stereotype.Component;
import org.springframework.web.client.RestTemplate;
-import javax.annotation.PostConstruct;
-import java.io.IOException;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-
-import static org.apache.griffin.core.config.PropertiesConfig.livyConfMap;
-import static org.apache.griffin.core.job.entity.LivySessionStates.State.NOT_FOUND;
-import static org.apache.griffin.core.util.JsonUtil.toEntity;
-import static org.apache.griffin.core.util.JsonUtil.toJsonWithFormat;
-
@Component
public class LivyTaskSubmitHelper {
-
- private static final Logger logger = LoggerFactory.getLogger(LivyTaskSubmitHelper.class);
-
+ private static final Logger LOGGER = LoggerFactory.getLogger(LivyTaskSubmitHelper.class);
private static final String REQUEST_BY_HEADER = "X-Requested-By";
+ public static final int DEFAULT_QUEUE_SIZE = 20000;
+ private static final int SLEEP_TIME = 300;
+
private SparkSubmitJob sparkSubmitJob;
private ConcurrentMap<Long, Integer> taskAppIdMap = new ConcurrentHashMap<>();
// Current number of tasks
@@ -66,8 +68,6 @@ public class LivyTaskSubmitHelper {
private RestTemplate restTemplate = new RestTemplate();
// queue for pub or sub
private BlockingQueue<JobDetail> queue;
- public static final int DEFAULT_QUEUE_SIZE = 20000;
- private static final int SLEEP_TIME = 300;
private String uri;
@Value("${livy.task.max.concurrent.count:20}")
@@ -75,7 +75,6 @@ public class LivyTaskSubmitHelper {
@Value("${livy.task.submit.interval.second:3}")
private int batchIntervalSecond;
-
@Autowired
private Environment env;
@@ -86,7 +85,7 @@ public class LivyTaskSubmitHelper {
public void init() {
startWorker();
uri = env.getProperty("livy.uri");
- logger.info("Livy uri : {}", uri);
+ LOGGER.info("Livy uri : {}", uri);
}
public LivyTaskSubmitHelper() {
@@ -110,20 +109,19 @@ public class LivyTaskSubmitHelper {
*/
public void addTaskToWaitingQueue(JobDetail jd) throws IOException {
if (jd == null) {
- logger.warn("task is blank, workerNamePre: {}", workerNamePre);
+ LOGGER.warn("task is blank, workerNamePre: {}", workerNamePre);
return;
}
if (queue.remainingCapacity() <= 0) {
- logger.warn("task is discard, workerNamePre: {}, task: {}", workerNamePre, jd);
+ LOGGER.warn("task is discard, workerNamePre: {}, task: {}", workerNamePre, jd);
sparkSubmitJob.saveJobInstance(null, NOT_FOUND);
return;
}
queue.add(jd);
-
- logger.info("add_task_to_waiting_queue_success, workerNamePre: {}, task: {}",
- workerNamePre, jd);
+ LOGGER.info("add_task_to_waiting_queue_success, workerNamePre: {}, task: {}",
+ workerNamePre, jd);
}
/**
@@ -141,7 +139,7 @@ public class LivyTaskSubmitHelper {
while (true) {
try {
if (curConcurrentTaskNum.get() < maxConcurrentTaskCount
- && (System.currentTimeMillis() - insertTime) >= batchIntervalSecond * 1000) {
+ && (System.currentTimeMillis() - insertTime) >= batchIntervalSecond * 1000) {
JobDetail jd = queue.take();
sparkSubmitJob.saveJobInstance(jd);
insertTime = System.currentTimeMillis();
@@ -149,7 +147,7 @@ public class LivyTaskSubmitHelper {
Thread.sleep(SLEEP_TIME);
}
} catch (Exception e) {
- logger.error("Async_worker_doTask_failed, {}", e.getMessage(), e);
+ LOGGER.error("Async_worker_doTask_failed, {}", e.getMessage(), e);
es.execute(this);
}
}
@@ -181,12 +179,12 @@ public class LivyTaskSubmitHelper {
}
protected Map<String, Object> retryLivyGetAppId(String result, int appIdRetryCount)
- throws IOException {
+ throws IOException {
int retryCount = appIdRetryCount;
TypeReference<HashMap<String, Object>> type =
- new TypeReference<HashMap<String, Object>>() {
- };
+ new TypeReference<HashMap<String, Object>>() {
+ };
Map<String, Object> resultMap = toEntity(result, type);
if (retryCount <= 0) {
@@ -206,10 +204,10 @@ public class LivyTaskSubmitHelper {
try {
Thread.sleep(SLEEP_TIME);
} catch (InterruptedException e) {
- logger.error(e.getMessage(), e);
+ LOGGER.error(e.getMessage(), e);
}
resultMap = getResultByLivyId(livyBatchesId, type);
- logger.info("retry get livy resultMap: {}, batches id : {}", resultMap, livyBatchesId);
+ LOGGER.info("retry get livy resultMap: {}, batches id : {}", resultMap, livyBatchesId);
if (resultMap.get("appId") != null) {
break;
@@ -220,104 +218,102 @@ public class LivyTaskSubmitHelper {
}
private Map<String, Object> getResultByLivyId(Object livyBatchesId, TypeReference<HashMap<String, Object>> type)
- throws IOException {
+ throws IOException {
Map<String, Object> resultMap = new HashedMap();
String livyUri = uri + "/" + livyBatchesId;
String result = getFromLivy(livyUri);
- logger.info(result);
+ LOGGER.info(result);
return result == null ? resultMap : toEntity(result, type);
}
public String postToLivy(String uri) {
- logger.info("Post To Livy URI is: " + uri);
+ LOGGER.info("Post To Livy URI is: " + uri);
String needKerberos = env.getProperty("livy.need.kerberos");
- logger.info("Need Kerberos:" + needKerberos);
+ LOGGER.info("Need Kerberos:" + needKerberos);
HttpHeaders headers = new HttpHeaders();
headers.setContentType(MediaType.APPLICATION_JSON);
- headers.set(REQUEST_BY_HEADER,"admin");
+ headers.set(REQUEST_BY_HEADER, "admin");
if (needKerberos == null || needKerberos.isEmpty()) {
- logger.error("The property \"livy.need.kerberos\" is empty");
+ LOGGER.error("The property \"livy.need.kerberos\" is empty");
return null;
}
if (needKerberos.equalsIgnoreCase("false")) {
- logger.info("The livy server doesn't need Kerberos Authentication");
+ LOGGER.info("The livy server doesn't need Kerberos Authentication");
String result = null;
try {
-
- HttpEntity<String> springEntity = new HttpEntity<>(toJsonWithFormat(livyConfMap),headers);
- result = restTemplate.postForObject(uri,springEntity,String.class);
-
- logger.info(result);
+ HttpEntity<String> springEntity = new HttpEntity<>(toJsonWithFormat(livyConfMap), headers);
+ result = restTemplate.postForObject(uri, springEntity, String.class);
+ LOGGER.info(result);
} catch (JsonProcessingException e) {
- logger.error("Post to livy ERROR. \n {}", e.getMessage());
+ LOGGER.error("Post to livy ERROR. \n {}", e.getMessage());
}
return result;
} else {
- logger.info("The livy server needs Kerberos Authentication");
+ LOGGER.info("The livy server needs Kerberos Authentication");
String userPrincipal = env.getProperty("livy.server.auth.kerberos.principal");
String keyTabLocation = env.getProperty("livy.server.auth.kerberos.keytab");
- logger.info("principal:{}, lcoation:{}", userPrincipal, keyTabLocation);
+ LOGGER.info("principal:{}, lcoation:{}", userPrincipal, keyTabLocation);
KerberosRestTemplate restTemplate = new KerberosRestTemplate(keyTabLocation, userPrincipal);
HttpEntity<String> springEntity = null;
try {
springEntity = new HttpEntity<>(toJsonWithFormat(livyConfMap), headers);
} catch (JsonProcessingException e) {
- e.printStackTrace();
+ LOGGER.error("Json Parsing failed, {}", e.getMessage(), e);
}
String result = restTemplate.postForObject(uri, springEntity, String.class);
- logger.info(result);
+ LOGGER.info(result);
return result;
}
}
public String getFromLivy(String uri) {
- logger.info("Get From Livy URI is: " + uri);
+ LOGGER.info("Get From Livy URI is: " + uri);
String needKerberos = env.getProperty("livy.need.kerberos");
- logger.info("Need Kerberos:" + needKerberos);
+ LOGGER.info("Need Kerberos:" + needKerberos);
if (needKerberos == null || needKerberos.isEmpty()) {
- logger.error("The property \"livy.need.kerberos\" is empty");
+ LOGGER.error("The property \"livy.need.kerberos\" is empty");
return null;
}
if (needKerberos.equalsIgnoreCase("false")) {
- logger.info("The livy server doesn't need Kerberos Authentication");
+ LOGGER.info("The livy server doesn't need Kerberos Authentication");
return restTemplate.getForObject(uri, String.class);
} else {
- logger.info("The livy server needs Kerberos Authentication");
+ LOGGER.info("The livy server needs Kerberos Authentication");
String userPrincipal = env.getProperty("livy.server.auth.kerberos.principal");
String keyTabLocation = env.getProperty("livy.server.auth.kerberos.keytab");
- logger.info("principal:{}, lcoation:{}", userPrincipal, keyTabLocation);
+ LOGGER.info("principal:{}, lcoation:{}", userPrincipal, keyTabLocation);
KerberosRestTemplate restTemplate = new KerberosRestTemplate(keyTabLocation, userPrincipal);
String result = restTemplate.getForObject(uri, String.class);
- logger.info(result);
+ LOGGER.info(result);
return result;
}
}
public void deleteByLivy(String uri) {
- logger.info("Delete by Livy URI is: " + uri);
+ LOGGER.info("Delete by Livy URI is: " + uri);
String needKerberos = env.getProperty("livy.need.kerberos");
- logger.info("Need Kerberos:" + needKerberos);
+ LOGGER.info("Need Kerberos:" + needKerberos);
if (needKerberos == null || needKerberos.isEmpty()) {
- logger.error("The property \"livy.need.kerberos\" is empty");
+ LOGGER.error("The property \"livy.need.kerberos\" is empty");
return;
}
if (needKerberos.equalsIgnoreCase("false")) {
- logger.info("The livy server doesn't need Kerberos Authentication");
+ LOGGER.info("The livy server doesn't need Kerberos Authentication");
new RestTemplate().delete(uri);
} else {
- logger.info("The livy server needs Kerberos Authentication");
+ LOGGER.info("The livy server needs Kerberos Authentication");
String userPrincipal = env.getProperty("livy.server.auth.kerberos.principal");
String keyTabLocation = env.getProperty("livy.server.auth.kerberos.keytab");
- logger.info("principal:{}, lcoation:{}", userPrincipal, keyTabLocation);
+ LOGGER.info("principal:{}, lcoation:{}", userPrincipal, keyTabLocation);
KerberosRestTemplate restTemplate = new KerberosRestTemplate(keyTabLocation, userPrincipal);
restTemplate.delete(uri);
diff --git a/service/src/main/java/org/apache/griffin/core/job/Predicator.java b/service/src/main/java/org/apache/griffin/core/job/Predicator.java
index dd9e105..153157e 100644
--- a/service/src/main/java/org/apache/griffin/core/job/Predicator.java
+++ b/service/src/main/java/org/apache/griffin/core/job/Predicator.java
@@ -21,6 +21,16 @@ package org.apache.griffin.core.job;
import java.io.IOException;
+/**
+ * Predicator is an object that judges if one condition is met.
+ * This interface only has one method {@link #predicate()}
+ */
public interface Predicator {
+ /**
+ * predicate a condition
+ *
+ * @return True condition is met, otherwise False
+ * @throws IOException
+ */
boolean predicate() throws IOException;
}
diff --git a/service/src/main/java/org/apache/griffin/core/job/SparkSubmitJob.java b/service/src/main/java/org/apache/griffin/core/job/SparkSubmitJob.java
index 01e41ca..e48053d 100644
--- a/service/src/main/java/org/apache/griffin/core/job/SparkSubmitJob.java
+++ b/service/src/main/java/org/apache/griffin/core/job/SparkSubmitJob.java
@@ -19,7 +19,27 @@ under the License.
package org.apache.griffin.core.job;
+import static org.apache.griffin.core.config.EnvConfig.ENV_BATCH;
+import static org.apache.griffin.core.config.EnvConfig.ENV_STREAMING;
+import static org.apache.griffin.core.config.PropertiesConfig.livyConfMap;
+import static org.apache.griffin.core.job.JobInstance.JOB_NAME;
+import static org.apache.griffin.core.job.JobInstance.MEASURE_KEY;
+import static org.apache.griffin.core.job.JobInstance.PREDICATES_KEY;
+import static org.apache.griffin.core.job.JobInstance.PREDICATE_JOB_NAME;
+import static org.apache.griffin.core.job.entity.LivySessionStates.State;
+import static org.apache.griffin.core.job.entity.LivySessionStates.State.FOUND;
+import static org.apache.griffin.core.job.entity.LivySessionStates.State.NOT_FOUND;
+import static org.apache.griffin.core.measure.entity.GriffinMeasure.ProcessType.BATCH;
+import static org.apache.griffin.core.util.JsonUtil.toEntity;
+
import com.fasterxml.jackson.core.type.TypeReference;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
import org.apache.griffin.core.job.entity.JobInstanceBean;
import org.apache.griffin.core.job.entity.SegmentPredicate;
import org.apache.griffin.core.job.factory.PredicatorFactory;
@@ -42,31 +62,21 @@ import org.springframework.core.env.Environment;
import org.springframework.stereotype.Component;
import org.springframework.util.CollectionUtils;
import org.springframework.util.StringUtils;
-import org.springframework.web.client.RestTemplate;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-import static org.apache.griffin.core.config.EnvConfig.ENV_BATCH;
-import static org.apache.griffin.core.config.EnvConfig.ENV_STREAMING;
-import static org.apache.griffin.core.config.PropertiesConfig.livyConfMap;
-import static org.apache.griffin.core.job.JobInstance.*;
-import static org.apache.griffin.core.job.entity.LivySessionStates.State;
-import static org.apache.griffin.core.job.entity.LivySessionStates.State.FOUND;
-import static org.apache.griffin.core.job.entity.LivySessionStates.State.NOT_FOUND;
-import static org.apache.griffin.core.measure.entity.GriffinMeasure.ProcessType.BATCH;
-import static org.apache.griffin.core.util.JsonUtil.toEntity;
+/**
+ * Simple implementation of the Quartz Job interface, submitting the
+ * griffin job to spark cluster via livy
+ *
+ * @see LivyTaskSubmitHelper#postToLivy(String)
+ * @see Job#execute(JobExecutionContext)
+ */
@PersistJobDataAfterExecution
@DisallowConcurrentExecution
@Component
public class SparkSubmitJob implements Job {
private static final Logger LOGGER =
- LoggerFactory.getLogger(SparkSubmitJob.class);
- private static final String REQUEST_BY_HEADER = "X-Requested-By";
+ LoggerFactory.getLogger(SparkSubmitJob.class);
+
@Autowired
private JobInstanceRepo jobInstanceRepo;
@Autowired
@@ -85,7 +95,6 @@ public class SparkSubmitJob implements Job {
private String livyUri;
private List<SegmentPredicate> mPredicates;
private JobInstanceBean jobInstance;
- private RestTemplate restTemplate = new RestTemplate();
@Override
public void execute(JobExecutionContext context) {
@@ -108,8 +117,8 @@ public class SparkSubmitJob implements Job {
}
}
- private void updateJobInstanceState(JobExecutionContext context) throws
- IOException {
+ private void updateJobInstanceState(JobExecutionContext context)
+ throws IOException {
SimpleTrigger simpleTrigger = (SimpleTrigger) context.getTrigger();
int repeatCount = simpleTrigger.getRepeatCount();
int fireCount = simpleTrigger.getTimesTriggered();
@@ -126,9 +135,10 @@ public class SparkSubmitJob implements Job {
if (CollectionUtils.isEmpty(predicates)) {
return true;
}
+
for (SegmentPredicate segPredicate : predicates) {
Predicator predicator = PredicatorFactory
- .newPredicateInstance(segPredicate);
+ .newPredicateInstance(segPredicate);
try {
if (predicator != null && !predicator.predicate()) {
return false;
@@ -136,7 +146,6 @@ public class SparkSubmitJob implements Job {
} catch (Exception e) {
return false;
}
-
}
return true;
}
@@ -144,9 +153,9 @@ public class SparkSubmitJob implements Job {
private void initParam(JobDetail jd) throws IOException {
mPredicates = new ArrayList<>();
jobInstance = jobInstanceRepo.findByPredicateName(jd.getJobDataMap()
- .getString(PREDICATE_JOB_NAME));
+ .getString(PREDICATE_JOB_NAME));
measure = toEntity(jd.getJobDataMap().getString(MEASURE_KEY),
- GriffinMeasure.class);
+ GriffinMeasure.class);
livyUri = env.getProperty("livy.uri");
setPredicates(jd.getJobDataMap().getString(PREDICATES_KEY));
// in order to keep metric name unique, we set job name
@@ -160,12 +169,13 @@ public class SparkSubmitJob implements Job {
return;
}
List<SegmentPredicate> predicates = toEntity(json,
- new TypeReference<List<SegmentPredicate>>() {
- });
+ new TypeReference<List<SegmentPredicate>>() {
+ });
if (predicates != null) {
mPredicates.addAll(predicates);
}
}
+
private String escapeCharacter(String str, String regex) {
if (StringUtils.isEmpty(str)) {
return str;
@@ -196,9 +206,8 @@ public class SparkSubmitJob implements Job {
livyConfMap.put("args", args);
}
-
protected void saveJobInstance(JobDetail jd) throws SchedulerException,
- IOException {
+ IOException {
// If result is null, it may livy uri is wrong
// or livy parameter is wrong.
Map<String, Object> resultMap = post2LivyWithRetry();
@@ -211,13 +220,14 @@ public class SparkSubmitJob implements Job {
}
private Map<String, Object> post2LivyWithRetry()
- throws IOException {
+ throws IOException {
String result = post2Livy();
Map<String, Object> resultMap = null;
if (result != null) {
- resultMap = livyTaskSubmitHelper.retryLivyGetAppId(result,appIdRetryCount);
+ resultMap = livyTaskSubmitHelper.retryLivyGetAppId(result, appIdRetryCount);
if (resultMap != null) {
- livyTaskSubmitHelper.increaseCurTaskNum(Long.valueOf(String.valueOf(resultMap.get("id"))).longValue());
+ livyTaskSubmitHelper.increaseCurTaskNum(Long.valueOf(
+ String.valueOf(resultMap.get("id"))).longValue());
}
}
@@ -225,9 +235,10 @@ public class SparkSubmitJob implements Job {
}
protected void saveJobInstance(String result, State state)
- throws IOException {
+ throws IOException {
TypeReference<HashMap<String, Object>> type =
- new TypeReference<HashMap<String, Object>>() {};
+ new TypeReference<HashMap<String, Object>>() {
+ };
Map<String, Object> resultMap = null;
if (result != null) {
resultMap = toEntity(result, type);
@@ -244,9 +255,9 @@ public class SparkSubmitJob implements Job {
Object id = resultMap.get("id");
Object appId = resultMap.get("appId");
jobInstance.setState(status == null ? null : State.valueOf(status
- .toString().toUpperCase()));
+ .toString().toUpperCase()));
jobInstance.setSessionId(id == null ? null : Long.parseLong(id
- .toString()));
+ .toString()));
jobInstance.setAppId(appId == null ? null : appId.toString());
}
}
diff --git a/service/src/main/java/org/apache/griffin/core/job/StreamingJobOperatorImpl.java b/service/src/main/java/org/apache/griffin/core/job/StreamingJobOperatorImpl.java
index e7ff758..e3fab06 100644
--- a/service/src/main/java/org/apache/griffin/core/job/StreamingJobOperatorImpl.java
+++ b/service/src/main/java/org/apache/griffin/core/job/StreamingJobOperatorImpl.java
@@ -60,7 +60,7 @@ import org.springframework.web.client.RestClientException;
@Service
public class StreamingJobOperatorImpl implements JobOperator {
private static final Logger LOGGER = LoggerFactory
- .getLogger(StreamingJobOperatorImpl.class);
+ .getLogger(StreamingJobOperatorImpl.class);
@Autowired
private StreamingJobRepo streamingJobRepo;
@Autowired
@@ -84,7 +84,7 @@ public class StreamingJobOperatorImpl implements JobOperator {
@Override
@Transactional(rollbackFor = Exception.class)
public AbstractJob add(AbstractJob job, GriffinMeasure measure) throws
- Exception {
+ Exception {
validateParams(job);
String qName = jobService.getQuartzName(job);
String qGroup = jobService.getQuartzGroup();
@@ -126,24 +126,24 @@ public class StreamingJobOperatorImpl implements JobOperator {
/* Firstly you should check whether job is scheduled.
If it is scheduled, triggers are empty. */
List<? extends Trigger> triggers = jobService.getTriggers(
- job.getName(),
- job.getGroup());
+ job.getName(),
+ job.getGroup());
if (!CollectionUtils.isEmpty(triggers)) {
throw new GriffinException.BadRequestException
- (STREAMING_JOB_IS_RUNNING);
+ (STREAMING_JOB_IS_RUNNING);
}
/* Secondly you should check whether job instance is running. */
List<JobInstanceBean> instances = instanceRepo.findByJobId(job.getId());
instances.stream().filter(instance -> !instance.isDeleted()).forEach
- (instance -> {
- State state = instance.getState();
- String quartzState = convert2QuartzState(state);
- if (!getStartStatus(quartzState)) {
- throw new GriffinException.BadRequestException
- (STREAMING_JOB_IS_RUNNING);
- }
- instance.setDeleted(true);
- });
+ (instance -> {
+ State state = instance.getState();
+ String quartzState = convert2QuartzState(state);
+ if (!getStartStatus(quartzState)) {
+ throw new GriffinException.BadRequestException
+ (STREAMING_JOB_IS_RUNNING);
+ }
+ instance.setDeleted(true);
+ });
}
@@ -174,7 +174,7 @@ public class StreamingJobOperatorImpl implements JobOperator {
public JobState getState(AbstractJob job, String action) {
JobState jobState = new JobState();
List<JobInstanceBean> instances = instanceRepo
- .findByJobId(job.getId());
+ .findByJobId(job.getId());
for (JobInstanceBean instance : instances) {
State state = instance.getState();
if (!instance.isDeleted() && state != null) {
@@ -208,7 +208,7 @@ public class StreamingJobOperatorImpl implements JobOperator {
* started
*/
private boolean getStartStatus(String state) {
- return !"NORMAL" .equals(state) && !"BLOCKED" .equals(state);
+ return !"NORMAL".equals(state) && !"BLOCKED".equals(state);
}
/**
@@ -219,15 +219,15 @@ public class StreamingJobOperatorImpl implements JobOperator {
* stopped
*/
private boolean getStopStatus(String state) {
- return !"COMPLETE" .equals(state) && !"ERROR" .equals(state);
+ return !"COMPLETE".equals(state) && !"ERROR".equals(state);
}
private void deleteByLivy(JobInstanceBean instance) {
Long sessionId = instance.getSessionId();
if (sessionId == null) {
LOGGER.warn("Session id of instance({},{}) is null.", instance
- .getPredicateGroup(), instance.getPredicateName
- ());
+ .getPredicateGroup(), instance.getPredicateName
+ ());
return;
}
String url = livyUri + "/" + instance.getSessionId();
@@ -236,15 +236,15 @@ public class StreamingJobOperatorImpl implements JobOperator {
livyTaskSubmitHelper.deleteByLivy(url);
LOGGER.info("Job instance({}) has been deleted. {}", instance
- .getSessionId(), url);
+ .getSessionId(), url);
} catch (ResourceAccessException e) {
LOGGER.error("Your url may be wrong. Please check {}.\n {}",
- livyUri, e.getMessage());
+ livyUri, e.getMessage());
} catch (RestClientException e) {
LOGGER.warn("sessionId({}) appId({}) {}.", instance.getSessionId(),
- instance.getAppId(), e.getMessage());
+ instance.getAppId(), e.getMessage());
YarnNetUtil.delete(env.getProperty("yarn.uri"),
- instance.getAppId());
+ instance.getAppId());
}
}
@@ -255,23 +255,23 @@ public class StreamingJobOperatorImpl implements JobOperator {
* job
*/
private void stop(StreamingJob job, boolean delete) throws
- SchedulerException {
+ SchedulerException {
pauseJob(job);
/* to prevent situation that streaming job is submitted
before pause or when pausing. */
List<JobInstanceBean> instances = instanceRepo
- .findByJobId(job.getId());
+ .findByJobId(job.getId());
instances.stream().filter(instance -> !instance.isDeleted())
- .forEach(instance -> {
- State state = instance.getState();
- String quartzState = convert2QuartzState(state);
- if (getStopStatus(quartzState)) {
- deleteByLivy(instance);
-
- }
- instance.setState(STOPPED);
- instance.setDeleted(true);
- });
+ .forEach(instance -> {
+ State state = instance.getState();
+ String quartzState = convert2QuartzState(state);
+ if (getStopStatus(quartzState)) {
+ deleteByLivy(instance);
+
+ }
+ instance.setState(STOPPED);
+ instance.setDeleted(true);
+ });
job.setDeleted(delete);
streamingJobRepo.save(job);
}
diff --git a/service/src/main/java/org/apache/griffin/core/job/entity/AbstractJob.java b/service/src/main/java/org/apache/griffin/core/job/entity/AbstractJob.java
index 660eb27..1c48adf 100644
--- a/service/src/main/java/org/apache/griffin/core/job/entity/AbstractJob.java
+++ b/service/src/main/java/org/apache/griffin/core/job/entity/AbstractJob.java
@@ -57,20 +57,20 @@ import org.slf4j.LoggerFactory;
@Table(name = "job")
@Inheritance(strategy = InheritanceType.SINGLE_TABLE)
@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, include = JsonTypeInfo.As.PROPERTY,
- property = "job.type")
+ property = "job.type")
@JsonSubTypes({@JsonSubTypes.Type(value = BatchJob.class, name = "batch"),
- @JsonSubTypes.Type(
- value = StreamingJob.class,
- name = "streaming"),
- @JsonSubTypes.Type(
- value = VirtualJob.class,
- name = "virtual")})
+ @JsonSubTypes.Type(
+ value = StreamingJob.class,
+ name = "streaming"),
+ @JsonSubTypes.Type(
+ value = VirtualJob.class,
+ name = "virtual")})
@DiscriminatorColumn(name = "type")
public abstract class AbstractJob extends AbstractAuditableEntity {
private static final long serialVersionUID = 7569493377868453677L;
private static final Logger LOGGER = LoggerFactory
- .getLogger(AbstractJob.class);
+ .getLogger(AbstractJob.class);
protected Long measureId;
@@ -107,7 +107,7 @@ public abstract class AbstractJob extends AbstractAuditableEntity {
@NotNull
@OneToMany(fetch = FetchType.EAGER, cascade = {CascadeType.PERSIST,
- CascadeType.REMOVE, CascadeType.MERGE})
+ CascadeType.REMOVE, CascadeType.MERGE})
@JoinColumn(name = "job_id")
private List<JobDataSegment> segments = new ArrayList<>();
@@ -236,8 +236,8 @@ public abstract class AbstractJob extends AbstractAuditableEntity {
public void load() throws IOException {
if (!StringUtils.isEmpty(predicateConfig)) {
this.configMap = JsonUtil.toEntity(predicateConfig,
- new TypeReference<Map<String, Object>>() {
- });
+ new TypeReference<Map<String, Object>>() {
+ });
}
}
@@ -265,7 +265,6 @@ public abstract class AbstractJob extends AbstractAuditableEntity {
this.deleted = deleted;
}
-
AbstractJob(String jobName, Long measureId, String metricName) {
this.jobName = jobName;
this.measureId = measureId;
diff --git a/service/src/main/java/org/apache/griffin/core/job/entity/JobDataSegment.java b/service/src/main/java/org/apache/griffin/core/job/entity/JobDataSegment.java
index 2cc394f..2272e14 100644
--- a/service/src/main/java/org/apache/griffin/core/job/entity/JobDataSegment.java
+++ b/service/src/main/java/org/apache/griffin/core/job/entity/JobDataSegment.java
@@ -39,7 +39,7 @@ public class JobDataSegment extends AbstractAuditableEntity {
private static final long serialVersionUID = -9056531122243340484L;
private static final Logger LOGGER = LoggerFactory
- .getLogger(JobDataSegment.class);
+ .getLogger(JobDataSegment.class);
@NotNull
private String dataConnectorName;
@@ -47,7 +47,7 @@ public class JobDataSegment extends AbstractAuditableEntity {
private boolean asTsBaseline = false;
@OneToOne(fetch = FetchType.EAGER, cascade = {CascadeType.PERSIST,
- CascadeType.REMOVE, CascadeType.MERGE})
+ CascadeType.REMOVE, CascadeType.MERGE})
@JoinColumn(name = "segment_range_id")
private SegmentRange segmentRange = new SegmentRange();
@@ -77,7 +77,7 @@ public class JobDataSegment extends AbstractAuditableEntity {
public void setDataConnectorName(String dataConnectorName) {
if (StringUtils.isEmpty(dataConnectorName)) {
LOGGER.warn(" Data connector name is invalid. " +
- "Please check your connector name.");
+ "Please check your connector name.");
throw new NullPointerException();
}
this.dataConnectorName = dataConnectorName;
diff --git a/service/src/main/java/org/apache/griffin/core/job/entity/JobInstanceBean.java b/service/src/main/java/org/apache/griffin/core/job/entity/JobInstanceBean.java
index ef0b139..23b70a4 100644
--- a/service/src/main/java/org/apache/griffin/core/job/entity/JobInstanceBean.java
+++ b/service/src/main/java/org/apache/griffin/core/job/entity/JobInstanceBean.java
@@ -203,7 +203,7 @@ public class JobInstanceBean extends AbstractAuditableEntity {
this.state = state;
this.tms = tms;
this.expireTms = expireTms;
- this.appId=appId;
+ this.appId = appId;
}
public JobInstanceBean(State state, Long tms, Long expireTms) {
diff --git a/service/src/main/java/org/apache/griffin/core/job/entity/LivySessionStates.java b/service/src/main/java/org/apache/griffin/core/job/entity/LivySessionStates.java
index de122f5..2e892f6 100644
--- a/service/src/main/java/org/apache/griffin/core/job/entity/LivySessionStates.java
+++ b/service/src/main/java/org/apache/griffin/core/job/entity/LivySessionStates.java
@@ -127,7 +127,7 @@ public class LivySessionStates {
public static boolean isActive(State state) {
if (UNKNOWN.equals(state) || STOPPED.equals(state) || NOT_FOUND.equals
- (state) || FOUND.equals(state)) {
+ (state) || FOUND.equals(state)) {
// set UNKNOWN isActive() as false.
return false;
} else if (FINDING.equals(state)) {
@@ -143,8 +143,8 @@ public class LivySessionStates {
return "COMPLETE";
}
if (UNKNOWN.equals(state) || NOT_FOUND.equals(state)
- || FOUND.equals(state) || sessionState == null
- || !sessionState.isActive()) {
+ || FOUND.equals(state) || sessionState == null
+ || !sessionState.isActive()) {
return "ERROR";
}
return "NORMAL";
@@ -153,9 +153,9 @@ public class LivySessionStates {
public static boolean isHealthy(State state) {
return !(State.ERROR.equals(state) || State.DEAD.equals(state)
- || State.SHUTTING_DOWN.equals(state)
- || State.FINDING.equals(state)
- || State.NOT_FOUND.equals(state)
- || State.FOUND.equals(state));
+ || State.SHUTTING_DOWN.equals(state)
+ || State.FINDING.equals(state)
+ || State.NOT_FOUND.equals(state)
+ || State.FOUND.equals(state));
}
}
diff --git a/service/src/main/java/org/apache/griffin/core/job/entity/SegmentPredicate.java b/service/src/main/java/org/apache/griffin/core/job/entity/SegmentPredicate.java
index 138ab0f..843ad43 100644
--- a/service/src/main/java/org/apache/griffin/core/job/entity/SegmentPredicate.java
+++ b/service/src/main/java/org/apache/griffin/core/job/entity/SegmentPredicate.java
@@ -17,7 +17,6 @@ specific language governing permissions and limitations
under the License.
*/
-
package org.apache.griffin.core.job.entity;
import com.fasterxml.jackson.annotation.JsonIgnore;
@@ -87,16 +86,16 @@ public class SegmentPredicate extends AbstractAuditableEntity {
public void load() throws IOException {
if (!StringUtils.isEmpty(config)) {
this.configMap = JsonUtil.toEntity(config,
- new TypeReference<Map<String, Object>>() {
- });
+ new TypeReference<Map<String, Object>>() {
+ });
}
}
public SegmentPredicate() {
}
- public SegmentPredicate(String type, Map<String, String> configMap) throws
- JsonProcessingException {
+ public SegmentPredicate(String type, Map<String, String> configMap)
+ throws JsonProcessingException {
this.type = type;
this.config = JsonUtil.toJson(configMap);
}
diff --git a/service/src/main/java/org/apache/griffin/core/job/entity/SegmentRange.java b/service/src/main/java/org/apache/griffin/core/job/entity/SegmentRange.java
index f9fcc72..a486f8c 100644
--- a/service/src/main/java/org/apache/griffin/core/job/entity/SegmentRange.java
+++ b/service/src/main/java/org/apache/griffin/core/job/entity/SegmentRange.java
@@ -17,7 +17,6 @@ specific language governing permissions and limitations
under the License.
*/
-
package org.apache.griffin.core.job.entity;
import javax.persistence.Column;
@@ -35,7 +34,6 @@ public class SegmentRange extends AbstractAuditableEntity {
private String length = "1h";
-
public String getBegin() {
return begin;
}
diff --git a/service/src/main/java/org/apache/griffin/core/job/factory/AutowiringSpringBeanJobFactory.java b/service/src/main/java/org/apache/griffin/core/job/factory/AutowiringSpringBeanJobFactory.java
index 8bb3d60..78d1d78 100644
--- a/service/src/main/java/org/apache/griffin/core/job/factory/AutowiringSpringBeanJobFactory.java
+++ b/service/src/main/java/org/apache/griffin/core/job/factory/AutowiringSpringBeanJobFactory.java
@@ -27,10 +27,18 @@ import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
import org.springframework.scheduling.quartz.SpringBeanJobFactory;
+/**
+ * Auto-wiring SpringBeanJobFactory is special BeanJobFactory that adds auto-wiring support against
+ * {@link SpringBeanJobFactory} allowing you to inject properties from the scheduler context, job data map
+ * and trigger data entries into the job bean.
+ *
+ * @see SpringBeanJobFactory
+ * @see ApplicationContextAware
+ */
public final class AutowiringSpringBeanJobFactory extends SpringBeanJobFactory
- implements ApplicationContextAware {
+ implements ApplicationContextAware {
private static final Logger LOGGER = LoggerFactory
- .getLogger(AutowiringSpringBeanJobFactory.class);
+ .getLogger(AutowiringSpringBeanJobFactory.class);
private transient AutowireCapableBeanFactory beanFactory;
@@ -41,12 +49,10 @@ public final class AutowiringSpringBeanJobFactory extends SpringBeanJobFactory
@Override
protected Object createJobInstance(final TriggerFiredBundle bundle) {
-
try {
final Object job = super.createJobInstance(bundle);
beanFactory.autowireBean(job);
return job;
-
} catch (Exception e) {
LOGGER.error("fail to create job instance. {}", e);
}
diff --git a/service/src/main/java/org/apache/griffin/core/job/factory/PredicatorFactory.java b/service/src/main/java/org/apache/griffin/core/job/factory/PredicatorFactory.java
index 7671a42..7f16ef7 100644
--- a/service/src/main/java/org/apache/griffin/core/job/factory/PredicatorFactory.java
+++ b/service/src/main/java/org/apache/griffin/core/job/factory/PredicatorFactory.java
@@ -33,7 +33,7 @@ import org.slf4j.LoggerFactory;
public class PredicatorFactory {
private static final Logger LOGGER = LoggerFactory
- .getLogger(PredicatorFactory.class);
+ .getLogger(PredicatorFactory.class);
public static Predicator newPredicateInstance(SegmentPredicate segPredicate) {
Predicator predicate;
@@ -63,7 +63,7 @@ public class PredicatorFactory {
throw new GriffinException.ServiceException(message, e);
} catch (NoSuchMethodException e) {
String message = "For predicate with type " + predicateClassName +
- " constructor with parameter of type " + SegmentPredicate.class.getName() + " not found";
+ " constructor with parameter of type " + SegmentPredicate.class.getName() + " not found";
LOGGER.error(message, e);
throw new GriffinException.ServiceException(message, e);
} catch (InstantiationException | IllegalAccessException | InvocationTargetException e) {
diff --git a/service/src/main/java/org/apache/griffin/core/job/repo/JobInstanceRepo.java b/service/src/main/java/org/apache/griffin/core/job/repo/JobInstanceRepo.java
index 218717a..4db291d 100644
--- a/service/src/main/java/org/apache/griffin/core/job/repo/JobInstanceRepo.java
+++ b/service/src/main/java/org/apache/griffin/core/job/repo/JobInstanceRepo.java
@@ -30,7 +30,7 @@ import org.springframework.data.repository.CrudRepository;
import org.springframework.transaction.annotation.Transactional;
public interface JobInstanceRepo
- extends CrudRepository<JobInstanceBean, Long> {
+ extends CrudRepository<JobInstanceBean, Long> {
JobInstanceBean findByPredicateName(String name);
@@ -48,7 +48,7 @@ public interface JobInstanceRepo
@Transactional(rollbackFor = Exception.class)
@Modifying
@Query("delete from JobInstanceBean j " +
- "where j.expireTms <= ?1 and j.deleted = false ")
+ "where j.expireTms <= ?1 and j.deleted = false ")
int deleteByExpireTimestamp(Long expireTms);
@Query("select DISTINCT s from JobInstanceBean s where s.state in ?1")
diff --git a/service/src/main/java/org/apache/griffin/core/job/repo/JobRepo.java b/service/src/main/java/org/apache/griffin/core/job/repo/JobRepo.java
index 75f8048..a9bfbe2 100644
--- a/service/src/main/java/org/apache/griffin/core/job/repo/JobRepo.java
+++ b/service/src/main/java/org/apache/griffin/core/job/repo/JobRepo.java
@@ -28,7 +28,7 @@ import org.springframework.data.repository.CrudRepository;
public interface JobRepo<T extends AbstractJob> extends CrudRepository<T, Long> {
@Query("select count(j) from #{#entityName} j " +
- "where j.jobName = ?1 and j.deleted = ?2")
+ "where j.jobName = ?1 and j.deleted = ?2")
int countByJobNameAndDeleted(String jobName, Boolean deleted);
List<T> findByDeleted(boolean deleted);
diff --git a/service/src/main/java/org/apache/griffin/core/login/LoginController.java b/service/src/main/java/org/apache/griffin/core/login/LoginController.java
index 3d08551..51b4cae 100644
--- a/service/src/main/java/org/apache/griffin/core/login/LoginController.java
+++ b/service/src/main/java/org/apache/griffin/core/login/LoginController.java
@@ -37,7 +37,7 @@ public class LoginController {
@RequestMapping(value = "/authenticate", method = RequestMethod.POST)
public ResponseEntity<Map<String, Object>> login(
- @RequestBody Map<String, String> map) {
+ @RequestBody Map<String, String> map) {
return loginService.login(map);
}
}
diff --git a/service/src/main/java/org/apache/griffin/core/login/LoginService.java b/service/src/main/java/org/apache/griffin/core/login/LoginService.java
index 11c3a3d..3c78bb1 100644
--- a/service/src/main/java/org/apache/griffin/core/login/LoginService.java
+++ b/service/src/main/java/org/apache/griffin/core/login/LoginService.java
@@ -23,6 +23,14 @@ import java.util.Map;
import org.springframework.http.ResponseEntity;
+/**
+ * LoginService defines an abstract validation method for login action, you can implement
+ * it to customize authentication business.
+ *
+ * @see org.apache.griffin.core.config.LoginConfig
+ * @see LoginServiceDefaultImpl
+ * @see LoginServiceLdapImpl
+ */
public interface LoginService {
ResponseEntity<Map<String, Object>> login(Map<String, String> map);
diff --git a/service/src/main/java/org/apache/griffin/core/login/LoginServiceDefaultImpl.java b/service/src/main/java/org/apache/griffin/core/login/LoginServiceDefaultImpl.java
index fc54ea4..fc4041c 100644
--- a/service/src/main/java/org/apache/griffin/core/login/LoginServiceDefaultImpl.java
+++ b/service/src/main/java/org/apache/griffin/core/login/LoginServiceDefaultImpl.java
@@ -34,10 +34,9 @@ public class LoginServiceDefaultImpl implements LoginService {
if (StringUtils.isBlank(username)) {
username = "Anonymous";
}
- String fullName = username;
Map<String, Object> message = new HashMap<>();
message.put("ntAccount", username);
- message.put("fullName", fullName);
+ message.put("fullName", username);
message.put("status", 0);
return new ResponseEntity<>(message, HttpStatus.OK);
}
diff --git a/service/src/main/java/org/apache/griffin/core/login/LoginServiceLdapImpl.java b/service/src/main/java/org/apache/griffin/core/login/LoginServiceLdapImpl.java
index 56f6765..1389d12 100644
--- a/service/src/main/java/org/apache/griffin/core/login/LoginServiceLdapImpl.java
+++ b/service/src/main/java/org/apache/griffin/core/login/LoginServiceLdapImpl.java
@@ -43,10 +43,10 @@ import org.springframework.http.ResponseEntity;
public class LoginServiceLdapImpl implements LoginService {
private static final Logger LOGGER = LoggerFactory.getLogger
- (LoginServiceLdapImpl.class);
+ (LoginServiceLdapImpl.class);
private static final String LDAP_FACTORY =
- "com.sun.jndi.ldap.LdapCtxFactory";
+ "com.sun.jndi.ldap.LdapCtxFactory";
private String url;
private String email;
@@ -86,7 +86,7 @@ public class LoginServiceLdapImpl implements LoginService {
ctx = getContextInstance(toPrincipal(bindAccount), bindPassword);
NamingEnumeration<SearchResult> results = ctx.search(searchBase,
- searchFilter, searchControls);
+ searchFilter, searchControls);
SearchResult userObject = getSingleUser(results);
// verify password if different bind user is used
@@ -102,7 +102,7 @@ public class LoginServiceLdapImpl implements LoginService {
return new ResponseEntity<>(message, HttpStatus.OK);
} catch (AuthenticationException e) {
LOGGER.warn("User {} failed to login with LDAP auth. {}", username,
- e.getMessage());
+ e.getMessage());
} catch (NamingException e) {
LOGGER.warn(String.format("User %s failed to login with LDAP auth.", username), e);
} finally {
@@ -129,7 +129,7 @@ public class LoginServiceLdapImpl implements LoginService {
if (results.hasMoreElements()) {
SearchResult second = results.nextElement();
throw new NamingException(String.format("Ambiguous search, found two users: %s, %s",
- result.getNameInNamespace(), second.getNameInNamespace()));
+ result.getNameInNamespace(), second.getNameInNamespace()));
}
return result;
}
@@ -161,7 +161,7 @@ public class LoginServiceLdapImpl implements LoginService {
}
} catch (NamingException e) {
LOGGER.warn("User {} successfully login with LDAP auth, " +
- "but failed to get full name.", ntAccount);
+ "but failed to get full name.", ntAccount);
return ntAccount;
}
}
@@ -175,7 +175,7 @@ public class LoginServiceLdapImpl implements LoginService {
}
private LdapContext getContextInstance(String principal, String password)
- throws NamingException {
+ throws NamingException {
Hashtable<String, String> ht = new Hashtable<>();
ht.put(Context.INITIAL_CONTEXT_FACTORY, LDAP_FACTORY);
ht.put(Context.PROVIDER_URL, url);
diff --git a/service/src/main/java/org/apache/griffin/core/login/ldap/SelfSignedSocketFactory.java b/service/src/main/java/org/apache/griffin/core/login/ldap/SelfSignedSocketFactory.java
index 187365f..028d878 100644
--- a/service/src/main/java/org/apache/griffin/core/login/ldap/SelfSignedSocketFactory.java
+++ b/service/src/main/java/org/apache/griffin/core/login/ldap/SelfSignedSocketFactory.java
@@ -35,7 +35,7 @@ import org.apache.griffin.core.exception.GriffinException;
/**
* SocketFactory ignoring insecure (self-signed, expired) certificates.
- *
+ * <p>
* Maintains internal {@code SSLSocketFactory} configured with {@code NoopTrustManager}.
* All SocketFactory methods are proxied to internal SSLSocketFactory instance.
* Accepts all client and server certificates, from any issuers.
diff --git a/service/src/main/java/org/apache/griffin/core/measure/ExternalMeasureOperatorImpl.java b/service/src/main/java/org/apache/griffin/core/measure/ExternalMeasureOperatorImpl.java
index c941f28..dda373a 100644
--- a/service/src/main/java/org/apache/griffin/core/measure/ExternalMeasureOperatorImpl.java
+++ b/service/src/main/java/org/apache/griffin/core/measure/ExternalMeasureOperatorImpl.java
@@ -55,9 +55,9 @@ public class ExternalMeasureOperatorImpl implements MeasureOperator {
ExternalMeasure latestMeasure = (ExternalMeasure) measure;
validateMeasure(latestMeasure);
ExternalMeasure originMeasure = measureRepo.findOne(
- latestMeasure.getId());
+ latestMeasure.getId());
VirtualJob vj = genVirtualJob(latestMeasure,
- originMeasure.getVirtualJob());
+ originMeasure.getVirtualJob());
latestMeasure.setVirtualJob(vj);
measure = measureRepo.save(latestMeasure);
return measure;
diff --git a/service/src/main/java/org/apache/griffin/core/measure/MeasureController.java b/service/src/main/java/org/apache/griffin/core/measure/MeasureController.java
index 3c5e0b0..a6921bf 100644
--- a/service/src/main/java/org/apache/griffin/core/measure/MeasureController.java
+++ b/service/src/main/java/org/apache/griffin/core/measure/MeasureController.java
@@ -42,7 +42,7 @@ public class MeasureController {
@RequestMapping(value = "/measures", method = RequestMethod.GET)
public List<? extends Measure> getAllAliveMeasures(@RequestParam(value =
- "type", defaultValue = "") String type) {
+ "type", defaultValue = "") String type) {
return measureService.getAllAliveMeasures(type);
}
@@ -54,7 +54,7 @@ public class MeasureController {
@RequestMapping(value = "/measures/{id}", method = RequestMethod.DELETE)
@ResponseStatus(HttpStatus.NO_CONTENT)
public void deleteMeasureById(@PathVariable("id") Long id) throws
- SchedulerException {
+ SchedulerException {
measureService.deleteMeasureById(id);
}
@@ -71,7 +71,7 @@ public class MeasureController {
}
@RequestMapping(value = "/measures/owner/{owner}", method =
- RequestMethod.GET)
+ RequestMethod.GET)
public List<Measure> getAliveMeasuresByOwner(@PathVariable("owner")
@Valid String owner) {
return measureService.getAliveMeasuresByOwner(owner);
diff --git a/service/src/main/java/org/apache/griffin/core/measure/MeasureOrgService.java b/service/src/main/java/org/apache/griffin/core/measure/MeasureOrgService.java
index accf1a5..4801ec2 100644
--- a/service/src/main/java/org/apache/griffin/core/measure/MeasureOrgService.java
+++ b/service/src/main/java/org/apache/griffin/core/measure/MeasureOrgService.java
@@ -32,6 +32,6 @@ public interface MeasureOrgService {
Map<String, Map<String, List<Map<String, Object>>>>
getMeasureWithJobDetailsGroupByOrg(Map<String,
- List<Map<String, Object>>>
- jobDetailsGroupByMeasure);
+ List<Map<String, Object>>>
+ jobDetailsGroupByMeasure);
}
diff --git a/service/src/main/java/org/apache/griffin/core/measure/MeasureOrgServiceImpl.java b/service/src/main/java/org/apache/griffin/core/measure/MeasureOrgServiceImpl.java
index 64ec06a..13344c9 100644
--- a/service/src/main/java/org/apache/griffin/core/measure/MeasureOrgServiceImpl.java
+++ b/service/src/main/java/org/apache/griffin/core/measure/MeasureOrgServiceImpl.java
@@ -50,7 +50,7 @@ public class MeasureOrgServiceImpl implements MeasureOrgService {
List<String> orgs = measureRepo.findNameByOrganization(org, false);
if (CollectionUtils.isEmpty(orgs)) {
throw new GriffinException.NotFoundException
- (ORGANIZATION_NAME_DOES_NOT_EXIST);
+ (ORGANIZATION_NAME_DOES_NOT_EXIST);
}
return orgs;
}
@@ -64,7 +64,7 @@ public class MeasureOrgServiceImpl implements MeasureOrgService {
orgName = orgName == null ? "null" : orgName;
String measureName = measure.getName();
List<String> measureList = orgWithMetricsMap.getOrDefault(orgName,
- new ArrayList<>());
+ new ArrayList<>());
measureList.add(measureName);
orgWithMetricsMap.put(orgName, measureList);
}
@@ -74,9 +74,9 @@ public class MeasureOrgServiceImpl implements MeasureOrgService {
@Override
public Map<String, Map<String, List<Map<String, Object>>>>
getMeasureWithJobDetailsGroupByOrg(Map<String,
- List<Map<String, Object>>> jobDetails) {
+ List<Map<String, Object>>> jobDetails) {
Map<String, Map<String, List<Map<String, Object>>>> result =
- new HashMap<>();
+ new HashMap<>();
List<GriffinMeasure> measures = measureRepo.findByDeleted(false);
if (measures == null) {
return null;
@@ -86,9 +86,9 @@ public class MeasureOrgServiceImpl implements MeasureOrgService {
String measureName = measure.getName();
String measureId = measure.getId().toString();
List<Map<String, Object>> jobList = jobDetails
- .getOrDefault(measureId, new ArrayList<>());
+ .getOrDefault(measureId, new ArrayList<>());
Map<String, List<Map<String, Object>>> measureWithJobs = result
- .getOrDefault(orgName, new HashMap<>());
+ .getOrDefault(orgName, new HashMap<>());
measureWithJobs.put(measureName, jobList);
result.put(orgName, measureWithJobs);
}
diff --git a/service/src/main/java/org/apache/griffin/core/measure/MeasureServiceImpl.java b/service/src/main/java/org/apache/griffin/core/measure/MeasureServiceImpl.java
index 1ea945e..97e8117 100644
--- a/service/src/main/java/org/apache/griffin/core/measure/MeasureServiceImpl.java
+++ b/service/src/main/java/org/apache/griffin/core/measure/MeasureServiceImpl.java
@@ -45,7 +45,7 @@ import org.springframework.util.CollectionUtils;
@Service
public class MeasureServiceImpl implements MeasureService {
private static final Logger LOGGER = LoggerFactory
- .getLogger(MeasureServiceImpl.class);
+ .getLogger(MeasureServiceImpl.class);
private static final String GRIFFIN = "griffin";
private static final String EXTERNAL = "external";
@@ -77,7 +77,7 @@ public class MeasureServiceImpl implements MeasureService {
Measure measure = measureRepo.findByIdAndDeleted(id, false);
if (measure == null) {
throw new GriffinException
- .NotFoundException(MEASURE_ID_DOES_NOT_EXIST);
+ .NotFoundException(MEASURE_ID_DOES_NOT_EXIST);
}
return measure;
}
@@ -90,12 +90,12 @@ public class MeasureServiceImpl implements MeasureService {
@Override
public Measure createMeasure(Measure measure) {
List<Measure> aliveMeasureList = measureRepo
- .findByNameAndDeleted(measure.getName(), false);
+ .findByNameAndDeleted(measure.getName(), false);
if (!CollectionUtils.isEmpty(aliveMeasureList)) {
LOGGER.warn("Failed to create new measure {}, it already exists.",
- measure.getName());
+ measure.getName());
throw new GriffinException.ConflictException(
- MEASURE_NAME_ALREADY_EXIST);
+ MEASURE_NAME_ALREADY_EXIST);
}
MeasureOperator op = getOperation(measure);
return op.create(measure);
@@ -106,12 +106,12 @@ public class MeasureServiceImpl implements MeasureService {
Measure m = measureRepo.findByIdAndDeleted(measure.getId(), false);
if (m == null) {
throw new GriffinException.NotFoundException(
- MEASURE_ID_DOES_NOT_EXIST);
+ MEASURE_ID_DOES_NOT_EXIST);
}
if (!m.getType().equals(measure.getType())) {
LOGGER.warn("Can't update measure to different type.");
throw new GriffinException.BadRequestException(
- MEASURE_TYPE_DOES_NOT_MATCH);
+ MEASURE_TYPE_DOES_NOT_MATCH);
}
MeasureOperator op = getOperation(measure);
return op.update(measure);
@@ -122,7 +122,7 @@ public class MeasureServiceImpl implements MeasureService {
Measure measure = measureRepo.findByIdAndDeleted(measureId, false);
if (measure == null) {
throw new GriffinException.NotFoundException(
- MEASURE_ID_DOES_NOT_EXIST);
+ MEASURE_ID_DOES_NOT_EXIST);
}
MeasureOperator op = getOperation(measure);
op.delete(measure);
@@ -144,7 +144,7 @@ public class MeasureServiceImpl implements MeasureService {
return externalOp;
}
throw new GriffinException.BadRequestException(
- MEASURE_TYPE_DOES_NOT_SUPPORT);
+ MEASURE_TYPE_DOES_NOT_SUPPORT);
}
}
diff --git a/service/src/main/java/org/apache/griffin/core/measure/entity/AbstractAuditableEntity.java b/service/src/main/java/org/apache/griffin/core/measure/entity/AbstractAuditableEntity.java
index cb5a399..5bcf9fa 100644
--- a/service/src/main/java/org/apache/griffin/core/measure/entity/AbstractAuditableEntity.java
+++ b/service/src/main/java/org/apache/griffin/core/measure/entity/AbstractAuditableEntity.java
@@ -28,6 +28,10 @@ import javax.persistence.GenerationType;
import javax.persistence.Id;
import javax.persistence.MappedSuperclass;
+/**
+ * AbstractAuditableEntity is base entity class definition in Apache Griffin, all
+ * {@link javax.persistence.Entity} classes should extend it.
+ */
@MappedSuperclass
public abstract class AbstractAuditableEntity implements Serializable {
diff --git a/service/src/main/java/org/apache/griffin/core/measure/entity/DataSource.java b/service/src/main/java/org/apache/griffin/core/measure/entity/DataSource.java
index f0a0038..970977b 100644
--- a/service/src/main/java/org/apache/griffin/core/measure/entity/DataSource.java
+++ b/service/src/main/java/org/apache/griffin/core/measure/entity/DataSource.java
@@ -52,7 +52,7 @@ public class DataSource extends AbstractAuditableEntity {
private String name;
@OneToMany(fetch = FetchType.EAGER, cascade = {CascadeType.PERSIST,
- CascadeType.REMOVE, CascadeType.MERGE})
+ CascadeType.REMOVE, CascadeType.MERGE})
@JoinColumn(name = "data_source_id")
private List<DataConnector> connectors = new ArrayList<>();
diff --git a/service/src/main/java/org/apache/griffin/core/measure/entity/EvaluateRule.java b/service/src/main/java/org/apache/griffin/core/measure/entity/EvaluateRule.java
index 9584800..3107852 100644
--- a/service/src/main/java/org/apache/griffin/core/measure/entity/EvaluateRule.java
+++ b/service/src/main/java/org/apache/griffin/core/measure/entity/EvaluateRule.java
@@ -34,7 +34,7 @@ public class EvaluateRule extends AbstractAuditableEntity {
private static final long serialVersionUID = 4240072518233967528L;
@OneToMany(fetch = FetchType.EAGER, cascade = {CascadeType.PERSIST,
- CascadeType.REMOVE, CascadeType.MERGE})
+ CascadeType.REMOVE, CascadeType.MERGE})
@JoinColumn(name = "evaluate_rule_id")
private List<Rule> rules = new ArrayList<>();
diff --git a/service/src/main/java/org/apache/griffin/core/measure/entity/Measure.java b/service/src/main/java/org/apache/griffin/core/measure/entity/Measure.java
index f75e9d6..9949949 100644
--- a/service/src/main/java/org/apache/griffin/core/measure/entity/Measure.java
+++ b/service/src/main/java/org/apache/griffin/core/measure/entity/Measure.java
@@ -47,10 +47,10 @@ import org.apache.griffin.core.util.JsonUtil;
@Entity
@Inheritance(strategy = InheritanceType.JOINED)
@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, include = JsonTypeInfo.As.PROPERTY,
- property = "measure.type")
+ property = "measure.type")
@JsonSubTypes({
- @JsonSubTypes.Type(value = GriffinMeasure.class, name = "griffin"),
- @JsonSubTypes.Type(value = ExternalMeasure.class, name = "external")})
+ @JsonSubTypes.Type(value = GriffinMeasure.class, name = "griffin"),
+ @JsonSubTypes.Type(value = ExternalMeasure.class, name = "external")})
public abstract class Measure extends AbstractAuditableEntity {
private static final long serialVersionUID = -4748881017029815714L;
diff --git a/service/src/main/java/org/apache/griffin/core/measure/entity/Rule.java b/service/src/main/java/org/apache/griffin/core/measure/entity/Rule.java
index 3792e4d..de3a9b2 100644
--- a/service/src/main/java/org/apache/griffin/core/measure/entity/Rule.java
+++ b/service/src/main/java/org/apache/griffin/core/measure/entity/Rule.java
@@ -186,12 +186,12 @@ public class Rule extends AbstractAuditableEntity {
if (!StringUtils.isEmpty(details)) {
this.detailsMap = JsonUtil.toEntity(
details, new TypeReference<Map<String, Object>>() {
- });
+ });
}
if (!StringUtils.isEmpty(out)) {
this.outList = JsonUtil.toEntity(
out, new TypeReference<List<Map<String, Object>>>() {
- });
+ });
}
}
diff --git a/service/src/main/java/org/apache/griffin/core/measure/entity/StreamingPreProcess.java b/service/src/main/java/org/apache/griffin/core/measure/entity/StreamingPreProcess.java
index 963304b..1387c7e 100644
--- a/service/src/main/java/org/apache/griffin/core/measure/entity/StreamingPreProcess.java
+++ b/service/src/main/java/org/apache/griffin/core/measure/entity/StreamingPreProcess.java
@@ -122,8 +122,8 @@ public class StreamingPreProcess extends AbstractAuditableEntity {
public void load() throws IOException {
if (!StringUtils.isEmpty(details)) {
this.detailsMap = JsonUtil.toEntity(details,
- new TypeReference<Map<String, Object>>() {
- });
+ new TypeReference<Map<String, Object>>() {
+ });
}
}
diff --git a/service/src/main/java/org/apache/griffin/core/measure/repo/MeasureRepo.java b/service/src/main/java/org/apache/griffin/core/measure/repo/MeasureRepo.java
index eb24b1d..a96415c 100644
--- a/service/src/main/java/org/apache/griffin/core/measure/repo/MeasureRepo.java
+++ b/service/src/main/java/org/apache/griffin/core/measure/repo/MeasureRepo.java
@@ -32,7 +32,7 @@ import org.springframework.data.repository.CrudRepository;
* @param <T> Measure and its subclass
*/
public interface MeasureRepo<T extends Measure>
- extends CrudRepository<T, Long> {
+ extends CrudRepository<T, Long> {
/**
* search repository by name and deletion state
@@ -76,7 +76,7 @@ public interface MeasureRepo<T extends Measure>
* @return organization collection
*/
@Query("select DISTINCT m.organization from #{#entityName} m "
- + "where m.deleted = ?1 and m.organization is not null")
+ + "where m.deleted = ?1 and m.organization is not null")
List<String> findOrganizations(Boolean deleted);
/**
@@ -87,6 +87,6 @@ public interface MeasureRepo<T extends Measure>
* @return organization collection
*/
@Query("select m.name from #{#entityName} m "
- + "where m.organization= ?1 and m.deleted= ?2")
+ + "where m.organization= ?1 and m.deleted= ?2")
List<String> findNameByOrganization(String organization, Boolean deleted);
}
diff --git a/service/src/main/java/org/apache/griffin/core/metastore/hive/HiveMetaStoreProxy.java b/service/src/main/java/org/apache/griffin/core/metastore/hive/HiveMetaStoreProxy.java
index adb71e9..f9c1236 100644
--- a/service/src/main/java/org/apache/griffin/core/metastore/hive/HiveMetaStoreProxy.java
+++ b/service/src/main/java/org/apache/griffin/core/metastore/hive/HiveMetaStoreProxy.java
@@ -33,7 +33,7 @@ import org.springframework.stereotype.Component;
@Component
public class HiveMetaStoreProxy {
private static final Logger LOGGER = LoggerFactory
- .getLogger(HiveMetaStoreProxy.class);
+ .getLogger(HiveMetaStoreProxy.class);
@Value("${hive.metastore.uris}")
private String uris;
@@ -60,7 +60,7 @@ public class HiveMetaStoreProxy {
HiveConf hiveConf = new HiveConf();
hiveConf.set("hive.metastore.local", "false");
hiveConf.setIntVar(HiveConf.ConfVars.METASTORETHRIFTCONNECTIONRETRIES,
- 3);
+ 3);
hiveConf.setVar(HiveConf.ConfVars.METASTOREURIS, uris);
hiveConf.setIntVar(HiveConf.ConfVars.HMSHANDLERATTEMPTS, attempts);
hiveConf.setVar(HiveConf.ConfVars.HMSHANDLERINTERVAL, interval);
diff --git a/service/src/main/java/org/apache/griffin/core/metastore/hive/HiveMetaStoreServiceImpl.java b/service/src/main/java/org/apache/griffin/core/metastore/hive/HiveMetaStoreServiceImpl.java
index a12dc08..d855183 100644
--- a/service/src/main/java/org/apache/griffin/core/metastore/hive/HiveMetaStoreServiceImpl.java
+++ b/service/src/main/java/org/apache/griffin/core/metastore/hive/HiveMetaStoreServiceImpl.java
@@ -47,7 +47,7 @@ import org.springframework.util.StringUtils;
public class HiveMetaStoreServiceImpl implements HiveMetaStoreService {
private static final Logger LOGGER = LoggerFactory
- .getLogger(HiveMetaStoreService.class);
+ .getLogger(HiveMetaStoreService.class);
@Autowired
private IMetaStoreClient client = null;
@@ -70,7 +70,7 @@ public class HiveMetaStoreServiceImpl implements HiveMetaStoreService {
try {
if (client == null) {
LOGGER.warn("Hive client is null. " +
- "Please check your hive config.");
+ "Please check your hive config.");
return new ArrayList<>();
}
results = client.getAllDatabases();
@@ -89,7 +89,7 @@ public class HiveMetaStoreServiceImpl implements HiveMetaStoreService {
try {
if (client == null) {
LOGGER.warn("Hive client is null. " +
- "Please check your hive config.");
+ "Please check your hive config.");
return new ArrayList<>();
}
results = client.getAllTables(getUseDbName(dbName));
@@ -112,7 +112,7 @@ public class HiveMetaStoreServiceImpl implements HiveMetaStoreService {
@Cacheable(unless = "#result==null || #result.isEmpty()")
public Map<String, List<String>> getAllTableNames() {
Map<String, List<String>> result = new HashMap<>();
- for (String dbName: getAllDatabases()) {
+ for (String dbName : getAllDatabases()) {
result.put(dbName, Lists.newArrayList(getAllTableNames(dbName)));
}
return result;
@@ -143,30 +143,30 @@ public class HiveMetaStoreServiceImpl implements HiveMetaStoreService {
@Override
- @Cacheable(unless="#result==null")
+ @Cacheable(unless = "#result==null")
public Table getTable(String dbName, String tableName) {
Table result = null;
try {
if (client == null) {
LOGGER.warn("Hive client is null. " +
- "Please check your hive config.");
+ "Please check your hive config.");
return null;
}
result = client.getTable(getUseDbName(dbName), tableName);
} catch (Exception e) {
reconnect();
LOGGER.error("Exception fetching table info : {}. {}", tableName,
- e);
+ e);
}
return result;
}
@Scheduled(fixedRateString =
- "${cache.evict.hive.fixedRate.in.milliseconds}")
+ "${cache.evict.hive.fixedRate.in.milliseconds}")
@CacheEvict(
- cacheNames = "hive",
- allEntries = true,
- beforeInvocation = true)
+ cacheNames = "hive",
+ allEntries = true,
+ beforeInvocation = true)
public void evictHiveCache() {
LOGGER.info("Evict hive cache");
// TODO: calls within same bean are not cached -- this call is not populating anything
@@ -182,7 +182,7 @@ public class HiveMetaStoreServiceImpl implements HiveMetaStoreService {
try {
if (client == null) {
LOGGER.warn("Hive client is null. " +
- "Please check your hive config.");
+ "Please check your hive config.");
return allTables;
}
Iterable<String> tables = client.getAllTables(useDbName);
diff --git a/service/src/main/java/org/apache/griffin/core/metastore/hive/HiveMetaStoreServiceJdbcImpl.java b/service/src/main/java/org/apache/griffin/core/metastore/hive/HiveMetaStoreServiceJdbcImpl.java
index dcc83e7..0df028f 100644
--- a/service/src/main/java/org/apache/griffin/core/metastore/hive/HiveMetaStoreServiceJdbcImpl.java
+++ b/service/src/main/java/org/apache/griffin/core/metastore/hive/HiveMetaStoreServiceJdbcImpl.java
@@ -55,7 +55,7 @@ import org.springframework.stereotype.Service;
public class HiveMetaStoreServiceJdbcImpl implements HiveMetaStoreService {
private static final Logger LOGGER = LoggerFactory
- .getLogger(HiveMetaStoreService.class);
+ .getLogger(HiveMetaStoreService.class);
private static final String SHOW_TABLES_IN = "show tables in ";
@@ -190,17 +190,18 @@ public class HiveMetaStoreServiceJdbcImpl implements HiveMetaStoreService {
}
@Scheduled(fixedRateString =
- "${cache.evict.hive.fixedRate.in.milliseconds}")
+ "${cache.evict.hive.fixedRate.in.milliseconds}")
@CacheEvict(
- cacheNames = "jdbcHive",
- allEntries = true,
- beforeInvocation = true)
+ cacheNames = "jdbcHive",
+ allEntries = true,
+ beforeInvocation = true)
public void evictHiveCache() {
LOGGER.info("Evict hive cache");
}
/**
* Query Hive for Show tables or show databases, which will return List of String
+ *
* @param sql sql string
* @return
*/
@@ -248,6 +249,7 @@ public class HiveMetaStoreServiceJdbcImpl implements HiveMetaStoreService {
/**
* Get the Hive table location from hive table metadata string
+ *
* @param tableMetadata hive table metadata string
* @return Hive table location
*/
@@ -271,28 +273,28 @@ public class HiveMetaStoreServiceJdbcImpl implements HiveMetaStoreService {
/**
* Get the Hive table schema: column name, column type, column comment
* The input String looks like following:
- *
+ * <p>
* CREATE TABLE `employee`(
- * `eid` int,
- * `name` string,
- * `salary` string,
- * `destination` string)
+ * `eid` int,
+ * `name` string,
+ * `salary` string,
+ * `destination` string)
* COMMENT 'Employee details'
* ROW FORMAT SERDE
- * 'org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe'
+ * 'org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe'
* WITH SERDEPROPERTIES (
- * 'field.delim'='\t',
- * 'line.delim'='\n',
- * 'serialization.format'='\t')
+ * 'field.delim'='\t',
+ * 'line.delim'='\n',
+ * 'serialization.format'='\t')
* STORED AS INPUTFORMAT
- * 'org.apache.hadoop.mapred.TextInputFormat'
+ * 'org.apache.hadoop.mapred.TextInputFormat'
* OUTPUTFORMAT
- * 'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat'
+ * 'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat'
* LOCATION
- * 'file:/user/hive/warehouse/employee'
+ * 'file:/user/hive/warehouse/employee'
* TBLPROPERTIES (
- * 'bucketing_version'='2',
- * 'transient_lastDdlTime'='1562086077')
+ * 'bucketing_version'='2',
+ * 'transient_lastDdlTime'='1562086077')
*
* @param tableMetadata hive table metadata string
* @return List of FieldSchema
@@ -316,9 +318,9 @@ public class HiveMetaStoreServiceJdbcImpl implements HiveMetaStoreService {
/**
* Parse one column string
- *
+ * <p>
* Input example:
- * `merch_date` string COMMENT 'this is merch process date'
+ * `merch_date` string COMMENT 'this is merch process date'
*
* @param colStr column string
* @return
diff --git a/service/src/main/java/org/apache/griffin/core/metastore/kafka/KafkaSchemaController.java b/service/src/main/java/org/apache/griffin/core/metastore/kafka/KafkaSchemaController.java
index 732a59e..63df077 100644
--- a/service/src/main/java/org/apache/griffin/core/metastore/kafka/KafkaSchemaController.java
+++ b/service/src/main/java/org/apache/griffin/core/metastore/kafka/KafkaSchemaController.java
@@ -48,7 +48,7 @@ public class KafkaSchemaController {
@RequestMapping(value = "/versions", method = RequestMethod.GET)
public Iterable<Integer> getSubjectVersions(
- @RequestParam("subject") String subject) {
+ @RequestParam("subject") String subject) {
return kafkaSchemaService.getSubjectVersions(subject);
}
diff --git a/service/src/main/java/org/apache/griffin/core/metastore/kafka/KafkaSchemaServiceImpl.java b/service/src/main/java/org/apache/griffin/core/metastore/kafka/KafkaSchemaServiceImpl.java
index ba6bba2..89b8985 100644
--- a/service/src/main/java/org/apache/griffin/core/metastore/kafka/KafkaSchemaServiceImpl.java
+++ b/service/src/main/java/org/apache/griffin/core/metastore/kafka/KafkaSchemaServiceImpl.java
@@ -36,7 +36,7 @@ import org.springframework.web.client.RestTemplate;
public class KafkaSchemaServiceImpl implements KafkaSchemaService {
private static final Logger log = LoggerFactory
- .getLogger(KafkaSchemaServiceImpl.class);
+ .getLogger(KafkaSchemaServiceImpl.class);
@Value("${kafka.schema.registry.url}")
private String url;
@@ -59,7 +59,7 @@ public class KafkaSchemaServiceImpl implements KafkaSchemaService {
String path = "/schemas/ids/" + id;
String regUrl = registryUrl(path);
ResponseEntity<SchemaString> res = restTemplate.getForEntity(regUrl,
- SchemaString.class);
+ SchemaString.class);
SchemaString result = res.getBody();
return result;
}
@@ -69,7 +69,7 @@ public class KafkaSchemaServiceImpl implements KafkaSchemaService {
String path = "/subjects";
String regUrl = registryUrl(path);
ResponseEntity<String[]> res = restTemplate.getForEntity(regUrl,
- String[].class);
+ String[].class);
Iterable<String> result = Arrays.asList(res.getBody());
return result;
}
@@ -79,7 +79,7 @@ public class KafkaSchemaServiceImpl implements KafkaSchemaService {
String path = "/subjects/" + subject + "/versions";
String regUrl = registryUrl(path);
ResponseEntity<Integer[]> res = restTemplate.getForEntity(regUrl,
- Integer[].class);
+ Integer[].class);
Iterable<Integer> result = Arrays.asList(res.getBody());
return result;
}
@@ -89,7 +89,7 @@ public class KafkaSchemaServiceImpl implements KafkaSchemaService {
String path = "/subjects/" + subject + "/versions/" + version;
String regUrl = registryUrl(path);
ResponseEntity<Schema> res = restTemplate.getForEntity(regUrl,
- Schema.class);
+ Schema.class);
Schema result = res.getBody();
return result;
}
@@ -99,7 +99,7 @@ public class KafkaSchemaServiceImpl implements KafkaSchemaService {
String path = "/config";
String regUrl = registryUrl(path);
ResponseEntity<Config> res = restTemplate.getForEntity(regUrl,
- Config.class);
+ Config.class);
Config result = res.getBody();
return result;
}
@@ -109,7 +109,7 @@ public class KafkaSchemaServiceImpl implements KafkaSchemaService {
String path = "/config/" + subject;
String regUrl = registryUrl(path);
ResponseEntity<Config> res = restTemplate.getForEntity(regUrl,
- Config.class);
+ Config.class);
Config result = res.getBody();
return result;
}
diff --git a/service/src/main/java/org/apache/griffin/core/metric/MetricController.java b/service/src/main/java/org/apache/griffin/core/metric/MetricController.java
index 5cd43af..23b1238 100644
--- a/service/src/main/java/org/apache/griffin/core/metric/MetricController.java
+++ b/service/src/main/java/org/apache/griffin/core/metric/MetricController.java
@@ -49,32 +49,32 @@ public class MetricController {
@RequestMapping(value = "/metrics/values", method = RequestMethod.GET)
public List<MetricValue> getMetricValues(@RequestParam("metricName")
- String metricName,
+ String metricName,
@RequestParam("size") int size,
@RequestParam(value = "offset",
- defaultValue = "0")
- int offset,
+ defaultValue = "0")
+ int offset,
@RequestParam(value = "tmst",
- defaultValue = "0")
- long tmst) {
+ defaultValue = "0")
+ long tmst) {
return metricService.getMetricValues(metricName, offset, size, tmst);
}
@RequestMapping(value = "/metrics/values", method = RequestMethod.POST)
public ResponseEntity<?> addMetricValues(@RequestBody List<MetricValue>
- values) {
+ values) {
return metricService.addMetricValues(values);
}
@RequestMapping(value = "/metrics/values", method = RequestMethod.DELETE)
@ResponseStatus(HttpStatus.NO_CONTENT)
public ResponseEntity<?> deleteMetricValues(@RequestParam("metricName")
- String metricName) {
+ String metricName) {
return metricService.deleteMetricValues(metricName);
}
@RequestMapping(value = "/metrics/values/{instanceId}", method = RequestMethod.GET)
- public MetricValue getMetric(@PathVariable("instanceId") Long id){
+ public MetricValue getMetric(@PathVariable("instanceId") Long id) {
return metricService.findMetric(id);
}
}
diff --git a/service/src/main/java/org/apache/griffin/core/metric/MetricServiceImpl.java b/service/src/main/java/org/apache/griffin/core/metric/MetricServiceImpl.java
index beca19e..886e6b9 100644
--- a/service/src/main/java/org/apache/griffin/core/metric/MetricServiceImpl.java
+++ b/service/src/main/java/org/apache/griffin/core/metric/MetricServiceImpl.java
@@ -54,7 +54,7 @@ import org.springframework.stereotype.Service;
@Service
public class MetricServiceImpl implements MetricService {
private static final Logger LOGGER = LoggerFactory
- .getLogger(MetricServiceImpl.class);
+ .getLogger(MetricServiceImpl.class);
@Autowired
private MeasureRepo<Measure> measureRepo;
@@ -71,9 +71,9 @@ public class MetricServiceImpl implements MetricService {
List<AbstractJob> jobs = jobRepo.findByDeleted(false);
List<Measure> measures = measureRepo.findByDeleted(false);
Map<Long, Measure> measureMap = measures.stream().collect(Collectors
- .toMap(Measure::getId, Function.identity()));
+ .toMap(Measure::getId, Function.identity()));
Map<Long, List<AbstractJob>> jobMap = jobs.stream().collect(Collectors
- .groupingBy(AbstractJob::getMeasureId, Collectors.toList()));
+ .groupingBy(AbstractJob::getMeasureId, Collectors.toList()));
for (Map.Entry<Long, List<AbstractJob>> entry : jobMap.entrySet()) {
Long measureId = entry.getKey();
Measure measure = measureMap.get(measureId);
@@ -81,9 +81,9 @@ public class MetricServiceImpl implements MetricService {
List<Metric> metrics = new ArrayList<>();
for (AbstractJob job : jobList) {
List<MetricValue> metricValues = getMetricValues(job
- .getMetricName(), 0, 300, job.getCreatedDate());
+ .getMetricName(), 0, 300, job.getCreatedDate());
metrics.add(new Metric(job.getMetricName(), measure.getDqType(),
- measure.getOwner(), metricValues));
+ measure.getOwner(), metricValues));
}
metricMap.put(measure.getName(), metrics);
@@ -96,19 +96,19 @@ public class MetricServiceImpl implements MetricService {
int size, long tmst) {
if (offset < 0) {
throw new GriffinException.BadRequestException
- (INVALID_METRIC_RECORDS_OFFSET);
+ (INVALID_METRIC_RECORDS_OFFSET);
}
if (size < 0) {
throw new GriffinException.BadRequestException
- (INVALID_METRIC_RECORDS_SIZE);
+ (INVALID_METRIC_RECORDS_SIZE);
}
try {
return metricStore.getMetricValues(metricName, offset, size, tmst);
} catch (IOException e) {
LOGGER.error("Failed to get metric values named {}. {}",
- metricName, e.getMessage());
+ metricName, e.getMessage());
throw new GriffinException.ServiceException(
- "Failed to get metric values", e);
+ "Failed to get metric values", e);
}
}
@@ -123,11 +123,11 @@ public class MetricServiceImpl implements MetricService {
} catch (JsonProcessingException e) {
LOGGER.warn("Failed to parse metric value.", e.getMessage());
throw new GriffinException.BadRequestException
- (INVALID_METRIC_VALUE_FORMAT);
+ (INVALID_METRIC_VALUE_FORMAT);
} catch (IOException e) {
LOGGER.error("Failed to add metric values", e);
throw new GriffinException.ServiceException(
- "Failed to add metric values", e);
+ "Failed to add metric values", e);
}
}
@@ -138,19 +138,19 @@ public class MetricServiceImpl implements MetricService {
return metricStore.deleteMetricValues(metricName);
} catch (IOException e) {
LOGGER.error("Failed to delete metric values named {}. {}",
- metricName, e.getMessage());
+ metricName, e.getMessage());
throw new GriffinException.ServiceException(
- "Failed to delete metric values.", e);
+ "Failed to delete metric values.", e);
}
}
@Override
public MetricValue findMetric(Long id) {
JobInstanceBean jobInstanceBean = jobInstanceRepo.findByInstanceId(id);
- if (jobInstanceBean == null){
+ if (jobInstanceBean == null) {
LOGGER.warn("There are no job instances with id {} ", id);
throw new GriffinException
- .NotFoundException(JOB_INSTANCE_NOT_FOUND);
+ .NotFoundException(JOB_INSTANCE_NOT_FOUND);
}
String appId = jobInstanceBean.getAppId();
try {
@@ -163,9 +163,9 @@ public class MetricServiceImpl implements MetricService {
private void checkFormat(MetricValue value) {
if (StringUtils.isBlank(value.getName()) || value.getTmst() == null
- || MapUtils.isEmpty(value.getValue())) {
+ || MapUtils.isEmpty(value.getValue())) {
throw new GriffinException.BadRequestException
- (INVALID_METRIC_VALUE_FORMAT);
+ (INVALID_METRIC_VALUE_FORMAT);
}
}
}
diff --git a/service/src/main/java/org/apache/griffin/core/metric/MetricStore.java b/service/src/main/java/org/apache/griffin/core/metric/MetricStore.java
index f85c4db..9510cce 100644
--- a/service/src/main/java/org/apache/griffin/core/metric/MetricStore.java
+++ b/service/src/main/java/org/apache/griffin/core/metric/MetricStore.java
@@ -31,7 +31,7 @@ public interface MetricStore {
long tmst) throws IOException;
ResponseEntity<?> addMetricValues(List<MetricValue> metricValues)
- throws IOException;
+ throws IOException;
ResponseEntity<?> deleteMetricValues(String metricName) throws IOException;
diff --git a/service/src/main/java/org/apache/griffin/core/metric/MetricStoreImpl.java b/service/src/main/java/org/apache/griffin/core/metric/MetricStoreImpl.java
index bf1d680..cc4ade8 100644
--- a/service/src/main/java/org/apache/griffin/core/metric/MetricStoreImpl.java
+++ b/service/src/main/java/org/apache/griffin/core/metric/MetricStoreImpl.java
@@ -77,8 +77,8 @@ public class MetricStoreImpl implements MetricStore {
if (!user.isEmpty() && !password.isEmpty()) {
String encodedAuth = buildBasicAuthString(user, password);
Header[] requestHeaders = new Header[]{
- new BasicHeader(org.apache.http.HttpHeaders.AUTHORIZATION,
- encodedAuth)};
+ new BasicHeader(org.apache.http.HttpHeaders.AUTHORIZATION,
+ encodedAuth)};
builder.setDefaultHeaders(requestHeaders);
}
this.client = builder.build();
@@ -90,22 +90,22 @@ public class MetricStoreImpl implements MetricStore {
this.urlPost = urlBase.concat("/_bulk");
this.urlDelete = urlBase.concat("/_delete_by_query");
this.indexMetaData = String.format(
- "{ \"index\" : { \"_index\" : " +
- "\"%s\",\"_type\" : \"%s\" } }%n",
- INDEX,
- TYPE);
+ "{ \"index\" : { \"_index\" : " +
+ "\"%s\",\"_type\" : \"%s\" } }%n",
+ INDEX,
+ TYPE);
this.mapper = new ObjectMapper();
}
@Override
public List<MetricValue> getMetricValues(String metricName, int from,
int size, long tmst)
- throws IOException {
+ throws IOException {
HttpEntity entity = getHttpEntityForSearch(metricName, from, size,
- tmst);
+ tmst);
try {
Response response = client.performRequest("GET", urlGet,
- Collections.emptyMap(), entity);
+ Collections.emptyMap(), entity);
return getMetricValuesFromResponse(response);
} catch (ResponseException e) {
if (e.getResponse().getStatusLine().getStatusCode() == 404) {
@@ -116,44 +116,46 @@ public class MetricStoreImpl implements MetricStore {
}
private HttpEntity getHttpEntityForSearch(String metricName, int from, int
- size, long tmst)
- throws JsonProcessingException {
+ size, long tmst)
+ throws JsonProcessingException {
Map<String, Object> map = new HashMap<>();
Map<String, Object> queryParam = new HashMap<>();
Map<String, Object> termQuery = Collections.singletonMap("name.keyword",
- metricName);
+ metricName);
queryParam.put("filter", Collections.singletonMap("term", termQuery));
Map<String, Object> sortParam = Collections
- .singletonMap("tmst", Collections.singletonMap("order",
- "desc"));
+ .singletonMap("tmst", Collections.singletonMap("order",
+ "desc"));
map.put("query", Collections.singletonMap("bool", queryParam));
map.put("sort", sortParam);
map.put("from", from);
map.put("size", size);
return new NStringEntity(JsonUtil.toJson(map),
- ContentType.APPLICATION_JSON);
+ ContentType.APPLICATION_JSON);
}
private List<MetricValue> getMetricValuesFromResponse(Response response)
- throws IOException {
+ throws IOException {
List<MetricValue> metricValues = new ArrayList<>();
JsonNode jsonNode = mapper.readTree(EntityUtils.toString(response
- .getEntity()));
+ .getEntity()));
if (jsonNode.hasNonNull("hits") && jsonNode.get("hits")
- .hasNonNull("hits")) {
+ .hasNonNull("hits")) {
for (JsonNode node : jsonNode.get("hits").get("hits")) {
JsonNode sourceNode = node.get("_source");
Map<String, Object> value = JsonUtil.toEntity(
- sourceNode.get("value").toString(),
- new TypeReference<Map<String, Object>>() {});
+ sourceNode.get("value").toString(),
+ new TypeReference<Map<String, Object>>() {
+ });
Map<String, Object> meta = JsonUtil.toEntity(
- Objects.toString(sourceNode.get("metadata"), null),
- new TypeReference<Map<String, Object>>() {});
+ Objects.toString(sourceNode.get("metadata"), null),
+ new TypeReference<Map<String, Object>>() {
+ });
MetricValue metricValue = new MetricValue(
- sourceNode.get("name").asText(),
- Long.parseLong(sourceNode.get("tmst").asText()),
- meta,
- value);
+ sourceNode.get("name").asText(),
+ Long.parseLong(sourceNode.get("tmst").asText()),
+ meta,
+ value);
metricValues.add(metricValue);
}
}
@@ -162,17 +164,17 @@ public class MetricStoreImpl implements MetricStore {
@Override
public ResponseEntity<?> addMetricValues(List<MetricValue> metricValues)
- throws IOException {
+ throws IOException {
String bulkRequestBody = getBulkRequestBody(metricValues);
HttpEntity entity = new NStringEntity(bulkRequestBody,
- ContentType.APPLICATION_JSON);
+ ContentType.APPLICATION_JSON);
Response response = client.performRequest("POST", urlPost,
- Collections.emptyMap(), entity);
+ Collections.emptyMap(), entity);
return getResponseEntityFromResponse(response);
}
private String getBulkRequestBody(List<MetricValue> metricValues) throws
- JsonProcessingException {
+ JsonProcessingException {
StringBuilder bulkRequestBody = new StringBuilder();
for (MetricValue metricValue : metricValues) {
bulkRequestBody.append(indexMetaData);
@@ -184,38 +186,38 @@ public class MetricStoreImpl implements MetricStore {
@Override
public ResponseEntity<?> deleteMetricValues(String metricName) throws
- IOException {
+ IOException {
Map<String, Object> param = Collections.singletonMap("query",
- Collections.singletonMap("term",
- Collections.singletonMap("name.keyword", metricName)));
+ Collections.singletonMap("term",
+ Collections.singletonMap("name.keyword", metricName)));
HttpEntity entity = new NStringEntity(
- JsonUtil.toJson(param),
- ContentType.APPLICATION_JSON);
+ JsonUtil.toJson(param),
+ ContentType.APPLICATION_JSON);
Response response = client.performRequest("POST", urlDelete,
- Collections.emptyMap(), entity);
+ Collections.emptyMap(), entity);
return getResponseEntityFromResponse(response);
}
private ResponseEntity<?> getResponseEntityFromResponse(Response response)
- throws IOException {
+ throws IOException {
String body = EntityUtils.toString(response.getEntity());
HttpStatus status = HttpStatus.valueOf(response.getStatusLine()
- .getStatusCode());
+ .getStatusCode());
return new ResponseEntity<>(body, responseHeaders, status);
}
private static String buildBasicAuthString(String user, String password) {
String auth = user + ":" + password;
return String.format("Basic %s", Base64.getEncoder().encodeToString(
- auth.getBytes()));
+ auth.getBytes()));
}
@Override
public MetricValue getMetric(String applicationId) throws IOException {
Response response = client.performRequest(
- "GET", urlGet,
- Collections.singletonMap(
- "q", "metadata.applicationId:" + applicationId));
+ "GET", urlGet,
+ Collections.singletonMap(
+ "q", "metadata.applicationId:" + applicationId));
List<MetricValue> metricValues = getMetricValuesFromResponse(response);
return metricValues.get(0);
}
diff --git a/service/src/main/java/org/apache/griffin/core/metric/model/MetricValue.java b/service/src/main/java/org/apache/griffin/core/metric/model/MetricValue.java
index 4c7f386..a540c9b 100644
--- a/service/src/main/java/org/apache/griffin/core/metric/model/MetricValue.java
+++ b/service/src/main/java/org/apache/griffin/core/metric/model/MetricValue.java
@@ -89,9 +89,9 @@ public class MetricValue {
if (o == null || getClass() != o.getClass()) return false;
MetricValue that = (MetricValue) o;
return Objects.equals(name, that.name) &&
- Objects.equals(tmst, that.tmst) &&
- Objects.equals(metadata, that.metadata) &&
- Objects.equals(value, that.value);
+ Objects.equals(tmst, that.tmst) &&
+ Objects.equals(metadata, that.metadata) &&
+ Objects.equals(value, that.value);
}
@Override
@@ -102,7 +102,7 @@ public class MetricValue {
@Override
public String toString() {
return String.format(
- "MetricValue{name=%s, ts=%s, meta=%s, value=%s}",
- name, tmst, metadata, value);
+ "MetricValue{name=%s, ts=%s, meta=%s, value=%s}",
+ name, tmst, metadata, value);
}
}
diff --git a/service/src/main/java/org/apache/griffin/core/util/FSUtil.java b/service/src/main/java/org/apache/griffin/core/util/FSUtil.java
index b537ac6..b6985a4 100644
--- a/service/src/main/java/org/apache/griffin/core/util/FSUtil.java
+++ b/service/src/main/java/org/apache/griffin/core/util/FSUtil.java
@@ -54,6 +54,7 @@ public class FSUtil {
private static FileSystem fileSystem;
private static FileSystem defaultFS = getDefaultFileSystem();
+
private static FileSystem getDefaultFileSystem() {
FileSystem fs = null;
Configuration conf = new Configuration();
@@ -173,12 +174,12 @@ public class FSUtil {
}
public static InputStream getSampleInputStream(String path)
- throws IOException {
+ throws IOException {
checkHDFSConf();
if (isFileExist(path)) {
FSDataInputStream missingData = fileSystem.open(new Path(path));
BufferedReader bufReader = new BufferedReader(
- new InputStreamReader(missingData, Charsets.UTF_8));
+ new InputStreamReader(missingData, Charsets.UTF_8));
try {
String line = null;
int rowCnt = 0;
@@ -205,16 +206,16 @@ public class FSUtil {
private static void checkHDFSConf() {
if (getFileSystem() == null) {
throw new NullPointerException("FileSystem is null. " +
- "Please check your hdfs config default name.");
+ "Please check your hdfs config default name.");
}
}
public static String getFirstMissRecordPath(String hdfsDir)
- throws Exception {
+ throws Exception {
List<FileStatus> fileList = listFileStatus(hdfsDir);
for (int i = 0; i < fileList.size(); i++) {
if (fileList.get(i).getPath().toUri().toString().toLowerCase()
- .contains("missrecord")) {
+ .contains("missrecord")) {
return fileList.get(i).getPath().toUri().toString();
}
}
@@ -222,12 +223,12 @@ public class FSUtil {
}
public static InputStream getMissSampleInputStream(String path)
- throws Exception {
+ throws Exception {
List<String> subDirList = listSubDir(path);
//FIXME: only handle 1-sub dir here now
for (int i = 0; i < subDirList.size(); i++) {
return getSampleInputStream(getFirstMissRecordPath(
- subDirList.get(i)));
+ subDirList.get(i)));
}
return getSampleInputStream(getFirstMissRecordPath(path));
}
diff --git a/service/src/main/java/org/apache/griffin/core/util/FileUtil.java b/service/src/main/java/org/apache/griffin/core/util/FileUtil.java
index d2209f9..03efaf2 100644
--- a/service/src/main/java/org/apache/griffin/core/util/FileUtil.java
+++ b/service/src/main/java/org/apache/griffin/core/util/FileUtil.java
@@ -27,7 +27,7 @@ import org.slf4j.LoggerFactory;
public class FileUtil {
private static final Logger LOGGER = LoggerFactory
- .getLogger(FileUtil.class);
+ .getLogger(FileUtil.class);
public static String getFilePath(String name, String location) {
if (StringUtils.isEmpty(location)) {
@@ -39,7 +39,7 @@ public class FileUtil {
File[] files = file.listFiles();
if (files == null) {
LOGGER.warn("The external location '{}' does not exist.Read from"
- + "default path.", location);
+ + "default path.", location);
return null;
}
return getFilePath(name, files, location);
diff --git a/service/src/main/java/org/apache/griffin/core/util/JsonUtil.java b/service/src/main/java/org/apache/griffin/core/util/JsonUtil.java
index 0fe6449..0c1ef98 100644
--- a/service/src/main/java/org/apache/griffin/core/util/JsonUtil.java
+++ b/service/src/main/java/org/apache/griffin/core/util/JsonUtil.java
@@ -35,7 +35,7 @@ import org.slf4j.LoggerFactory;
public class JsonUtil {
private static final Logger LOGGER = LoggerFactory
- .getLogger(JsonUtil.class);
+ .getLogger(JsonUtil.class);
public static String toJson(Object obj) throws JsonProcessingException {
if (obj == null) {
@@ -47,30 +47,30 @@ public class JsonUtil {
}
public static String toJsonWithFormat(Object obj)
- throws JsonProcessingException {
+ throws JsonProcessingException {
if (obj == null) {
LOGGER.warn("Object to be formatted cannot be empty!");
return null;
}
ObjectWriter mapper = new ObjectMapper().writer()
- .withDefaultPrettyPrinter();
+ .withDefaultPrettyPrinter();
return mapper.writeValueAsString(obj);
}
public static <T> T toEntity(String jsonStr, Class<T> type)
- throws IOException {
+ throws IOException {
if (StringUtils.isEmpty(jsonStr)) {
LOGGER.warn("Json string {} is empty!", type);
return null;
}
ObjectMapper mapper = new ObjectMapper();
mapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES,
- false);
+ false);
return mapper.readValue(jsonStr, type);
}
public static <T> T toEntity(File file, TypeReference type)
- throws IOException {
+ throws IOException {
if (file == null) {
LOGGER.warn("File cannot be empty!");
return null;
@@ -80,7 +80,7 @@ public class JsonUtil {
}
public static <T> T toEntity(InputStream in, TypeReference type)
- throws IOException {
+ throws IOException {
if (in == null) {
throw new NullPointerException("Input stream cannot be null.");
}
@@ -89,7 +89,7 @@ public class JsonUtil {
}
public static <T> T toEntity(String jsonStr, TypeReference type)
- throws IOException {
+ throws IOException {
if (StringUtils.isEmpty(jsonStr)) {
LOGGER.warn("Json string {} is empty!", type);
return null;
diff --git a/service/src/main/java/org/apache/griffin/core/util/MeasureUtil.java b/service/src/main/java/org/apache/griffin/core/util/MeasureUtil.java
index 2a36886..7e57d87 100644
--- a/service/src/main/java/org/apache/griffin/core/util/MeasureUtil.java
+++ b/service/src/main/java/org/apache/griffin/core/util/MeasureUtil.java
@@ -42,7 +42,7 @@ import org.slf4j.LoggerFactory;
public class MeasureUtil {
private static final Logger LOGGER = LoggerFactory
- .getLogger(MeasureUtil.class);
+ .getLogger(MeasureUtil.class);
public static void validateMeasure(Measure measure) {
if (measure instanceof GriffinMeasure) {
@@ -56,7 +56,7 @@ public class MeasureUtil {
private static void validateGriffinMeasure(GriffinMeasure measure) {
if (getConnectorNamesIfValid(measure) == null) {
throw new GriffinException.BadRequestException
- (INVALID_CONNECTOR_NAME);
+ (INVALID_CONNECTOR_NAME);
}
if (!validatePredicates(measure)) {
throw new GriffinException.BadRequestException(INVALID_MEASURE_PREDICATE);
@@ -65,7 +65,7 @@ public class MeasureUtil {
private static boolean validatePredicates(GriffinMeasure measure) {
for (DataSource dataSource : measure.getDataSources()) {
- for (DataConnector dataConnector: dataSource.getConnectors()) {
+ for (DataConnector dataConnector : dataSource.getConnectors()) {
for (SegmentPredicate segmentPredicate : dataConnector.getPredicates()) {
try {
PredicatorFactory.newPredicateInstance(segmentPredicate);
@@ -81,7 +81,7 @@ public class MeasureUtil {
private static void validateExternalMeasure(ExternalMeasure measure) {
if (StringUtils.isBlank(measure.getMetricName())) {
LOGGER.warn("Failed to create external measure {}. " +
- "Its metric name is blank.", measure.getName());
+ "Its metric name is blank.", measure.getName());
throw new GriffinException.BadRequestException(MISSING_METRIC_NAME);
}
}
@@ -91,7 +91,7 @@ public class MeasureUtil {
List<DataSource> sources = measure.getDataSources();
for (DataSource source : sources) {
source.getConnectors().stream().filter(dc -> dc.getName() != null)
- .forEach(dc -> sets.add(dc.getName()));
+ .forEach(dc -> sets.add(dc.getName()));
}
if (sets.size() == 0 || sets.size() < sources.size()) {
LOGGER.warn("Connector names cannot be repeated or empty.");
diff --git a/service/src/main/java/org/apache/griffin/core/util/PropertiesUtil.java b/service/src/main/java/org/apache/griffin/core/util/PropertiesUtil.java
index 65f1707..6e5a8f7 100644
--- a/service/src/main/java/org/apache/griffin/core/util/PropertiesUtil.java
+++ b/service/src/main/java/org/apache/griffin/core/util/PropertiesUtil.java
@@ -35,7 +35,7 @@ import org.springframework.core.io.Resource;
public class PropertiesUtil {
private static final Logger LOGGER = LoggerFactory.getLogger(
- PropertiesUtil.class);
+ PropertiesUtil.class);
public static Properties getProperties(String path, Resource resource) {
PropertiesFactoryBean propFactoryBean = new PropertiesFactoryBean();
@@ -61,7 +61,7 @@ public class PropertiesUtil {
*/
public static Properties getConf(String name, String defaultPath,
String location)
- throws FileNotFoundException {
+ throws FileNotFoundException {
String path = getConfPath(name, location);
Resource resource;
if (path == null) {
diff --git a/service/src/main/java/org/apache/griffin/core/util/TimeUtil.java b/service/src/main/java/org/apache/griffin/core/util/TimeUtil.java
index 17e08d6..077b658 100644
--- a/service/src/main/java/org/apache/griffin/core/util/TimeUtil.java
+++ b/service/src/main/java/org/apache/griffin/core/util/TimeUtil.java
@@ -34,17 +34,17 @@ import org.slf4j.LoggerFactory;
public class TimeUtil {
private static final Logger LOGGER = LoggerFactory.getLogger(TimeUtil
- .class);
+ .class);
private static final String MILLISECONDS_PATTERN =
- "(?i)m(illi)?s(ec(ond)?)?";
+ "(?i)m(illi)?s(ec(ond)?)?";
private static final String SECONDS_PATTERN =
- "(?i)s(ec(ond)?)?";
+ "(?i)s(ec(ond)?)?";
private static final String MINUTES_PATTERN =
- "(?i)m(in(ute)?)?";
+ "(?i)m(in(ute)?)?";
private static final String HOURS_PATTERN =
- "(?i)h((ou)?r)?";
+ "(?i)h((ou)?r)?";
private static final String DAYS_PATTERN =
- "(?i)d(ay)?";
+ "(?i)d(ay)?";
private static class TimeUnitPair {
private long t;
@@ -114,9 +114,9 @@ public class TimeUtil {
return milliseconds(t, TimeUnit.DAYS);
} else {
LOGGER.warn("Time string format ERROR. " +
- "It only supports d(day),h(hour), m(minute), " +
- "s(second), ms(millsecond). " +
- "Please check your time format.");
+ "It only supports d(day),h(hour), m(minute), " +
+ "s(second), ms(millsecond). " +
+ "Please check your time format.");
return 0L;
}
}
diff --git a/service/src/main/java/org/apache/griffin/core/util/YarnNetUtil.java b/service/src/main/java/org/apache/griffin/core/util/YarnNetUtil.java
index f8b3766..10b41ae 100644
--- a/service/src/main/java/org/apache/griffin/core/util/YarnNetUtil.java
+++ b/service/src/main/java/org/apache/griffin/core/util/YarnNetUtil.java
@@ -35,13 +35,13 @@ import org.springframework.web.client.RestTemplate;
public class YarnNetUtil {
private static final Logger LOGGER = LoggerFactory
- .getLogger(YarnNetUtil.class);
+ .getLogger(YarnNetUtil.class);
private static RestTemplate restTemplate = new RestTemplate();
/**
* delete app task scheduling by yarn.
*
- * @param url prefix part of whole url
+ * @param url prefix part of whole url
* @param appId application id
*/
public static void delete(String url, String appId) {
@@ -49,12 +49,12 @@ public class YarnNetUtil {
if (appId != null) {
LOGGER.info("{} will delete by yarn", appId);
restTemplate.put(url + "ws/v1/cluster/apps/"
- + appId + "/state",
- "{\"state\": \"KILLED\"}");
+ + appId + "/state",
+ "{\"state\": \"KILLED\"}");
}
} catch (HttpClientErrorException e) {
LOGGER.warn("client error {} from yarn: {}",
- e.getMessage(), e.getResponseBodyAsString());
+ e.getMessage(), e.getResponseBodyAsString());
} catch (Exception e) {
LOGGER.error("delete exception happens by yarn. {}", e);
}
@@ -63,7 +63,7 @@ public class YarnNetUtil {
/**
* update app task scheduling by yarn.
*
- * @param url prefix part of whole url
+ * @param url prefix part of whole url
* @param instance job instance
* @return
*/
@@ -78,7 +78,7 @@ public class YarnNetUtil {
return true;
} catch (HttpClientErrorException e) {
LOGGER.warn("client error {} from yarn: {}",
- e.getMessage(), e.getResponseBodyAsString());
+ e.getMessage(), e.getResponseBodyAsString());
if (e.getStatusCode() == HttpStatus.NOT_FOUND) {
// in sync with Livy behavior, see com.cloudera.livy.utils.SparkYarnApp
instance.setState(DEAD);
diff --git a/service/src/test/java/org/apache/griffin/core/job/SparkSubmitJobTest.java b/service/src/test/java/org/apache/griffin/core/job/SparkSubmitJobTest.java
index 71b0887..543432e 100644
--- a/service/src/test/java/org/apache/griffin/core/job/SparkSubmitJobTest.java
+++ b/service/src/test/java/org/apache/griffin/core/job/SparkSubmitJobTest.java
@@ -51,7 +51,6 @@ import org.springframework.core.io.ClassPathResource;
import org.springframework.test.context.junit4.SpringRunner;
import org.springframework.web.client.RestTemplate;
-
@RunWith(SpringRunner.class)
public class SparkSubmitJobTest {
@@ -151,9 +150,9 @@ public class SparkSubmitJobTest {
given(context.getJobDetail()).willReturn(jd);
given(jobInstanceRepo.findByPredicateName(Matchers.anyString()))
.willReturn(instance);
- Whitebox.setInternalState(sparkSubmitJob, "restTemplate", restTemplate);
- given(restTemplate.postForObject(Matchers.anyString(), Matchers.any(),
- Matchers.any())).willReturn(result);
+// Whitebox.setInternalState(sparkSubmitJob, "restTemplate", restTemplate);
+// given(restTemplate.postForObject(Matchers.anyString(), Matchers.any(),
+// Matchers.any())).willReturn(result);
sparkSubmitJob.execute(context);