You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by hu...@apache.org on 2020/03/16 18:16:41 UTC

[helix] branch master updated: Support enableCompression in workflow and job configs (#883)

This is an automated email from the ASF dual-hosted git repository.

hulee pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/helix.git


The following commit(s) were added to refs/heads/master by this push:
     new f96a3c4  Support enableCompression in workflow and job configs (#883)
f96a3c4 is described below

commit f96a3c4f17d0c25b3fce623f562c8ecf852f03f2
Author: Hunter Lee <hu...@linkedin.com>
AuthorDate: Mon Mar 16 11:16:34 2020 -0700

    Support enableCompression in workflow and job configs (#883)
    
    Previously, there was no way for users to set enableCompression in workflows and jobs configs. This allows them to set that field in the Builder.
---
 .../main/java/org/apache/helix/task/JobConfig.java | 22 ++++++++++++++++++----
 .../java/org/apache/helix/task/TaskConstants.java  |  2 +-
 .../java/org/apache/helix/task/WorkflowConfig.java | 13 +++++++++++++
 .../java/org/apache/helix/task/beans/JobBean.java  |  1 +
 .../org/apache/helix/task/beans/WorkflowBean.java  |  2 ++
 5 files changed, 35 insertions(+), 5 deletions(-)

diff --git a/helix-core/src/main/java/org/apache/helix/task/JobConfig.java b/helix-core/src/main/java/org/apache/helix/task/JobConfig.java
index dafdb9a..3a0fb92 100644
--- a/helix-core/src/main/java/org/apache/helix/task/JobConfig.java
+++ b/helix-core/src/main/java/org/apache/helix/task/JobConfig.java
@@ -35,6 +35,8 @@ import org.apache.helix.HelixProperty;
 import org.apache.helix.model.ResourceConfig;
 import org.apache.helix.task.beans.JobBean;
 import org.apache.helix.task.beans.TaskBean;
+import org.apache.helix.zookeeper.datamodel.ZNRecord;
+
 
 /**
  * Provides a typed interface to job configurations.
@@ -455,6 +457,7 @@ public class JobConfig extends ResourceConfig {
     private boolean _ignoreDependentJobFailure = DEFAULT_IGNORE_DEPENDENT_JOB_FAILURE;
     private int _numberOfTasks = DEFAULT_NUMBER_OF_TASKS;
     private boolean _rebalanceRunningTask = DEFAULT_REBALANCE_RUNNING_TASK;
+    private boolean _enableCompression = TaskConstants.DEFAULT_TASK_ENABLE_COMPRESSION;
 
     public JobConfig build() {
       if (_targetResource == null && _taskConfigMap.isEmpty()) {
@@ -536,11 +539,11 @@ public class JobConfig extends ResourceConfig {
       }
       if (cfg.containsKey(JobConfigProperty.DisableExternalView.name())) {
         b.setDisableExternalView(
-            Boolean.valueOf(cfg.get(JobConfigProperty.DisableExternalView.name())));
+            Boolean.parseBoolean(cfg.get(JobConfigProperty.DisableExternalView.name())));
       }
       if (cfg.containsKey(JobConfigProperty.IgnoreDependentJobFailure.name())) {
         b.setIgnoreDependentJobFailure(
-            Boolean.valueOf(cfg.get(JobConfigProperty.IgnoreDependentJobFailure.name())));
+            Boolean.parseBoolean(cfg.get(JobConfigProperty.IgnoreDependentJobFailure.name())));
       }
       if (cfg.containsKey(JobConfigProperty.JobType.name())) {
         b.setJobType(cfg.get(JobConfigProperty.JobType.name()));
@@ -553,7 +556,11 @@ public class JobConfig extends ResourceConfig {
       }
       if (cfg.containsKey(JobConfigProperty.RebalanceRunningTask.name())) {
         b.setRebalanceRunningTask(
-            Boolean.valueOf(cfg.get(JobConfigProperty.RebalanceRunningTask.name())));
+            Boolean.parseBoolean(cfg.get(JobConfigProperty.RebalanceRunningTask.name())));
+      }
+      if (cfg.containsKey(ZNRecord.ENABLE_COMPRESSION_BOOLEAN_FIELD)) {
+        b.setEnableCompression(
+            Boolean.parseBoolean(cfg.get(ZNRecord.ENABLE_COMPRESSION_BOOLEAN_FIELD)));
       }
       return b;
     }
@@ -689,6 +696,11 @@ public class JobConfig extends ResourceConfig {
       return this;
     }
 
+    public Builder setEnableCompression(boolean enabled) {
+      _enableCompression = enabled;
+      return this;
+    }
+
     private void validate() {
       if (_taskConfigMap.isEmpty() && _targetResource == null) {
         throw new IllegalArgumentException(
@@ -760,7 +772,8 @@ public class JobConfig extends ResourceConfig {
           .setIgnoreDependentJobFailure(jobBean.ignoreDependentJobFailure)
           .setNumberOfTasks(jobBean.numberOfTasks).setExecutionDelay(jobBean.executionDelay)
           .setExecutionStart(jobBean.executionStart)
-          .setRebalanceRunningTask(jobBean.rebalanceRunningTask);
+          .setRebalanceRunningTask(jobBean.rebalanceRunningTask)
+          .setEnableCompression(jobBean.enableCompression);
 
       if (jobBean.jobCommandConfigMap != null) {
         b.setJobCommandConfigMap(jobBean.jobCommandConfigMap);
@@ -790,6 +803,7 @@ public class JobConfig extends ResourceConfig {
       if (jobBean.instanceGroupTag != null) {
         b.setInstanceGroupTag(jobBean.instanceGroupTag);
       }
+      b.setEnableCompression(jobBean.enableCompression);
       return b;
     }
 
diff --git a/helix-core/src/main/java/org/apache/helix/task/TaskConstants.java b/helix-core/src/main/java/org/apache/helix/task/TaskConstants.java
index c7273ad..d17e29a 100644
--- a/helix-core/src/main/java/org/apache/helix/task/TaskConstants.java
+++ b/helix-core/src/main/java/org/apache/helix/task/TaskConstants.java
@@ -48,5 +48,5 @@ public class TaskConstants {
 
   public static final String PREV_RA_NODE = "PreviousResourceAssignment";
 
-
+  public static final boolean DEFAULT_TASK_ENABLE_COMPRESSION = false;
 }
diff --git a/helix-core/src/main/java/org/apache/helix/task/WorkflowConfig.java b/helix-core/src/main/java/org/apache/helix/task/WorkflowConfig.java
index bff0a65..5392513 100644
--- a/helix-core/src/main/java/org/apache/helix/task/WorkflowConfig.java
+++ b/helix-core/src/main/java/org/apache/helix/task/WorkflowConfig.java
@@ -31,6 +31,7 @@ import org.apache.helix.HelixException;
 import org.apache.helix.HelixProperty;
 import org.apache.helix.model.ResourceConfig;
 import org.apache.helix.task.beans.WorkflowBean;
+import org.apache.helix.zookeeper.datamodel.ZNRecord;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -338,6 +339,7 @@ public class WorkflowConfig extends ResourceConfig {
     private long _jobPurgeInterval = DEFAULT_JOB_PURGE_INTERVAL;
     private boolean _allowOverlapJobAssignment = DEFAULT_ALLOW_OVERLAP_JOB_ASSIGNMENT;
     private long _timeout = TaskConstants.DEFAULT_NEVER_TIMEOUT;
+    private boolean _enableCompression = TaskConstants.DEFAULT_TASK_ENABLE_COMPRESSION;
 
     public WorkflowConfig build() {
       validate();
@@ -492,6 +494,11 @@ public class WorkflowConfig extends ResourceConfig {
       return this;
     }
 
+    public Builder setEnableCompression(boolean enableCompression) {
+      _enableCompression = enableCompression;
+      return this;
+    }
+
     @Deprecated
     public static Builder fromMap(Map<String, String> cfg) {
       Builder builder = new Builder();
@@ -552,6 +559,11 @@ public class WorkflowConfig extends ResourceConfig {
         }
       }
 
+      if (cfg.containsKey(ZNRecord.ENABLE_COMPRESSION_BOOLEAN_FIELD)) {
+        setEnableCompression(
+            Boolean.parseBoolean(cfg.get(ZNRecord.ENABLE_COMPRESSION_BOOLEAN_FIELD)));
+      }
+
       // Parse schedule-specific configs, if they exist
       ScheduleConfig scheduleConfig = parseScheduleFromConfigMap(cfg);
       if (scheduleConfig != null) {
@@ -625,6 +637,7 @@ public class WorkflowConfig extends ResourceConfig {
       }
       b.setExpiry(workflowBean.expiry);
       b.setWorkFlowType(workflowBean.workflowType);
+      b.setEnableCompression(workflowBean.enableCompression);
       return b;
     }
 
diff --git a/helix-core/src/main/java/org/apache/helix/task/beans/JobBean.java b/helix-core/src/main/java/org/apache/helix/task/beans/JobBean.java
index 5674d92..db41188 100644
--- a/helix-core/src/main/java/org/apache/helix/task/beans/JobBean.java
+++ b/helix-core/src/main/java/org/apache/helix/task/beans/JobBean.java
@@ -51,4 +51,5 @@ public class JobBean {
   public boolean ignoreDependentJobFailure = JobConfig.DEFAULT_IGNORE_DEPENDENT_JOB_FAILURE;
   public int numberOfTasks = JobConfig.DEFAULT_NUMBER_OF_TASKS;
   public boolean rebalanceRunningTask = JobConfig.DEFAULT_REBALANCE_RUNNING_TASK;
+  public boolean enableCompression = TaskConstants.DEFAULT_TASK_ENABLE_COMPRESSION;
 }
diff --git a/helix-core/src/main/java/org/apache/helix/task/beans/WorkflowBean.java b/helix-core/src/main/java/org/apache/helix/task/beans/WorkflowBean.java
index 2a9563e..91db9b9 100644
--- a/helix-core/src/main/java/org/apache/helix/task/beans/WorkflowBean.java
+++ b/helix-core/src/main/java/org/apache/helix/task/beans/WorkflowBean.java
@@ -21,6 +21,7 @@ package org.apache.helix.task.beans;
 
 import java.util.List;
 
+import org.apache.helix.task.TaskConstants;
 import org.apache.helix.task.WorkflowConfig;
 
 /**
@@ -32,4 +33,5 @@ public class WorkflowBean {
   public ScheduleBean schedule;
   public long expiry = WorkflowConfig.DEFAULT_EXPIRY;
   public String workflowType;
+  public boolean enableCompression = TaskConstants.DEFAULT_TASK_ENABLE_COMPRESSION;
 }