You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by hu...@apache.org on 2020/03/16 18:16:41 UTC
[helix] branch master updated: Support enableCompression in
workflow and job configs (#883)
This is an automated email from the ASF dual-hosted git repository.
hulee pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/helix.git
The following commit(s) were added to refs/heads/master by this push:
new f96a3c4 Support enableCompression in workflow and job configs (#883)
f96a3c4 is described below
commit f96a3c4f17d0c25b3fce623f562c8ecf852f03f2
Author: Hunter Lee <hu...@linkedin.com>
AuthorDate: Mon Mar 16 11:16:34 2020 -0700
Support enableCompression in workflow and job configs (#883)
Previously, there was no way for users to set enableCompression in workflows and jobs configs. This allows them to set that field in the Builder.
---
.../main/java/org/apache/helix/task/JobConfig.java | 22 ++++++++++++++++++----
.../java/org/apache/helix/task/TaskConstants.java | 2 +-
.../java/org/apache/helix/task/WorkflowConfig.java | 13 +++++++++++++
.../java/org/apache/helix/task/beans/JobBean.java | 1 +
.../org/apache/helix/task/beans/WorkflowBean.java | 2 ++
5 files changed, 35 insertions(+), 5 deletions(-)
diff --git a/helix-core/src/main/java/org/apache/helix/task/JobConfig.java b/helix-core/src/main/java/org/apache/helix/task/JobConfig.java
index dafdb9a..3a0fb92 100644
--- a/helix-core/src/main/java/org/apache/helix/task/JobConfig.java
+++ b/helix-core/src/main/java/org/apache/helix/task/JobConfig.java
@@ -35,6 +35,8 @@ import org.apache.helix.HelixProperty;
import org.apache.helix.model.ResourceConfig;
import org.apache.helix.task.beans.JobBean;
import org.apache.helix.task.beans.TaskBean;
+import org.apache.helix.zookeeper.datamodel.ZNRecord;
+
/**
* Provides a typed interface to job configurations.
@@ -455,6 +457,7 @@ public class JobConfig extends ResourceConfig {
private boolean _ignoreDependentJobFailure = DEFAULT_IGNORE_DEPENDENT_JOB_FAILURE;
private int _numberOfTasks = DEFAULT_NUMBER_OF_TASKS;
private boolean _rebalanceRunningTask = DEFAULT_REBALANCE_RUNNING_TASK;
+ private boolean _enableCompression = TaskConstants.DEFAULT_TASK_ENABLE_COMPRESSION;
public JobConfig build() {
if (_targetResource == null && _taskConfigMap.isEmpty()) {
@@ -536,11 +539,11 @@ public class JobConfig extends ResourceConfig {
}
if (cfg.containsKey(JobConfigProperty.DisableExternalView.name())) {
b.setDisableExternalView(
- Boolean.valueOf(cfg.get(JobConfigProperty.DisableExternalView.name())));
+ Boolean.parseBoolean(cfg.get(JobConfigProperty.DisableExternalView.name())));
}
if (cfg.containsKey(JobConfigProperty.IgnoreDependentJobFailure.name())) {
b.setIgnoreDependentJobFailure(
- Boolean.valueOf(cfg.get(JobConfigProperty.IgnoreDependentJobFailure.name())));
+ Boolean.parseBoolean(cfg.get(JobConfigProperty.IgnoreDependentJobFailure.name())));
}
if (cfg.containsKey(JobConfigProperty.JobType.name())) {
b.setJobType(cfg.get(JobConfigProperty.JobType.name()));
@@ -553,7 +556,11 @@ public class JobConfig extends ResourceConfig {
}
if (cfg.containsKey(JobConfigProperty.RebalanceRunningTask.name())) {
b.setRebalanceRunningTask(
- Boolean.valueOf(cfg.get(JobConfigProperty.RebalanceRunningTask.name())));
+ Boolean.parseBoolean(cfg.get(JobConfigProperty.RebalanceRunningTask.name())));
+ }
+ if (cfg.containsKey(ZNRecord.ENABLE_COMPRESSION_BOOLEAN_FIELD)) {
+ b.setEnableCompression(
+ Boolean.parseBoolean(cfg.get(ZNRecord.ENABLE_COMPRESSION_BOOLEAN_FIELD)));
}
return b;
}
@@ -689,6 +696,11 @@ public class JobConfig extends ResourceConfig {
return this;
}
+ public Builder setEnableCompression(boolean enabled) {
+ _enableCompression = enabled;
+ return this;
+ }
+
private void validate() {
if (_taskConfigMap.isEmpty() && _targetResource == null) {
throw new IllegalArgumentException(
@@ -760,7 +772,8 @@ public class JobConfig extends ResourceConfig {
.setIgnoreDependentJobFailure(jobBean.ignoreDependentJobFailure)
.setNumberOfTasks(jobBean.numberOfTasks).setExecutionDelay(jobBean.executionDelay)
.setExecutionStart(jobBean.executionStart)
- .setRebalanceRunningTask(jobBean.rebalanceRunningTask);
+ .setRebalanceRunningTask(jobBean.rebalanceRunningTask)
+ .setEnableCompression(jobBean.enableCompression);
if (jobBean.jobCommandConfigMap != null) {
b.setJobCommandConfigMap(jobBean.jobCommandConfigMap);
@@ -790,6 +803,7 @@ public class JobConfig extends ResourceConfig {
if (jobBean.instanceGroupTag != null) {
b.setInstanceGroupTag(jobBean.instanceGroupTag);
}
+ b.setEnableCompression(jobBean.enableCompression);
return b;
}
diff --git a/helix-core/src/main/java/org/apache/helix/task/TaskConstants.java b/helix-core/src/main/java/org/apache/helix/task/TaskConstants.java
index c7273ad..d17e29a 100644
--- a/helix-core/src/main/java/org/apache/helix/task/TaskConstants.java
+++ b/helix-core/src/main/java/org/apache/helix/task/TaskConstants.java
@@ -48,5 +48,5 @@ public class TaskConstants {
public static final String PREV_RA_NODE = "PreviousResourceAssignment";
-
+ public static final boolean DEFAULT_TASK_ENABLE_COMPRESSION = false;
}
diff --git a/helix-core/src/main/java/org/apache/helix/task/WorkflowConfig.java b/helix-core/src/main/java/org/apache/helix/task/WorkflowConfig.java
index bff0a65..5392513 100644
--- a/helix-core/src/main/java/org/apache/helix/task/WorkflowConfig.java
+++ b/helix-core/src/main/java/org/apache/helix/task/WorkflowConfig.java
@@ -31,6 +31,7 @@ import org.apache.helix.HelixException;
import org.apache.helix.HelixProperty;
import org.apache.helix.model.ResourceConfig;
import org.apache.helix.task.beans.WorkflowBean;
+import org.apache.helix.zookeeper.datamodel.ZNRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -338,6 +339,7 @@ public class WorkflowConfig extends ResourceConfig {
private long _jobPurgeInterval = DEFAULT_JOB_PURGE_INTERVAL;
private boolean _allowOverlapJobAssignment = DEFAULT_ALLOW_OVERLAP_JOB_ASSIGNMENT;
private long _timeout = TaskConstants.DEFAULT_NEVER_TIMEOUT;
+ private boolean _enableCompression = TaskConstants.DEFAULT_TASK_ENABLE_COMPRESSION;
public WorkflowConfig build() {
validate();
@@ -492,6 +494,11 @@ public class WorkflowConfig extends ResourceConfig {
return this;
}
+ public Builder setEnableCompression(boolean enableCompression) {
+ _enableCompression = enableCompression;
+ return this;
+ }
+
@Deprecated
public static Builder fromMap(Map<String, String> cfg) {
Builder builder = new Builder();
@@ -552,6 +559,11 @@ public class WorkflowConfig extends ResourceConfig {
}
}
+ if (cfg.containsKey(ZNRecord.ENABLE_COMPRESSION_BOOLEAN_FIELD)) {
+ setEnableCompression(
+ Boolean.parseBoolean(cfg.get(ZNRecord.ENABLE_COMPRESSION_BOOLEAN_FIELD)));
+ }
+
// Parse schedule-specific configs, if they exist
ScheduleConfig scheduleConfig = parseScheduleFromConfigMap(cfg);
if (scheduleConfig != null) {
@@ -625,6 +637,7 @@ public class WorkflowConfig extends ResourceConfig {
}
b.setExpiry(workflowBean.expiry);
b.setWorkFlowType(workflowBean.workflowType);
+ b.setEnableCompression(workflowBean.enableCompression);
return b;
}
diff --git a/helix-core/src/main/java/org/apache/helix/task/beans/JobBean.java b/helix-core/src/main/java/org/apache/helix/task/beans/JobBean.java
index 5674d92..db41188 100644
--- a/helix-core/src/main/java/org/apache/helix/task/beans/JobBean.java
+++ b/helix-core/src/main/java/org/apache/helix/task/beans/JobBean.java
@@ -51,4 +51,5 @@ public class JobBean {
public boolean ignoreDependentJobFailure = JobConfig.DEFAULT_IGNORE_DEPENDENT_JOB_FAILURE;
public int numberOfTasks = JobConfig.DEFAULT_NUMBER_OF_TASKS;
public boolean rebalanceRunningTask = JobConfig.DEFAULT_REBALANCE_RUNNING_TASK;
+ public boolean enableCompression = TaskConstants.DEFAULT_TASK_ENABLE_COMPRESSION;
}
diff --git a/helix-core/src/main/java/org/apache/helix/task/beans/WorkflowBean.java b/helix-core/src/main/java/org/apache/helix/task/beans/WorkflowBean.java
index 2a9563e..91db9b9 100644
--- a/helix-core/src/main/java/org/apache/helix/task/beans/WorkflowBean.java
+++ b/helix-core/src/main/java/org/apache/helix/task/beans/WorkflowBean.java
@@ -21,6 +21,7 @@ package org.apache.helix.task.beans;
import java.util.List;
+import org.apache.helix.task.TaskConstants;
import org.apache.helix.task.WorkflowConfig;
/**
@@ -32,4 +33,5 @@ public class WorkflowBean {
public ScheduleBean schedule;
public long expiry = WorkflowConfig.DEFAULT_EXPIRY;
public String workflowType;
+ public boolean enableCompression = TaskConstants.DEFAULT_TASK_ENABLE_COMPRESSION;
}