You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by lx...@apache.org on 2016/12/19 17:54:11 UTC

[1/9] helix git commit: [HELIX-641] Add total message recevied for each instance

Repository: helix
Updated Branches:
  refs/heads/helix-0.6.x f08d867cb -> c8c677405


[HELIX-641] Add total message recevied for each instance


Project: http://git-wip-us.apache.org/repos/asf/helix/repo
Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/a9267e55
Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/a9267e55
Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/a9267e55

Branch: refs/heads/helix-0.6.x
Commit: a9267e55345d72d0255a9dae2c31ab4dc27acad1
Parents: f08d867
Author: Junkai Xue <jx...@linkedin.com>
Authored: Thu Dec 15 16:45:14 2016 -0800
Committer: Junkai Xue <jx...@linkedin.com>
Committed: Thu Dec 15 16:45:14 2016 -0800

----------------------------------------------------------------------
 .../controller/stages/TaskAssignmentStage.java  |  4 +++
 .../monitoring/mbeans/ClusterStatusMonitor.java | 27 ++++++++++++++++++++
 .../monitoring/mbeans/InstanceMonitor.java      | 15 +++++++++++
 .../monitoring/mbeans/InstanceMonitorMBean.java |  6 +++++
 4 files changed, 52 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/helix/blob/a9267e55/helix-core/src/main/java/org/apache/helix/controller/stages/TaskAssignmentStage.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/TaskAssignmentStage.java b/helix-core/src/main/java/org/apache/helix/controller/stages/TaskAssignmentStage.java
index 1adcded..85ca4cf 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/TaskAssignmentStage.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/TaskAssignmentStage.java
@@ -36,6 +36,7 @@ import org.apache.helix.model.LiveInstance;
 import org.apache.helix.model.Message;
 import org.apache.helix.model.Partition;
 import org.apache.helix.model.Resource;
+import org.apache.helix.monitoring.mbeans.ClusterStatusMonitor;
 import org.apache.log4j.Logger;
 
 public class TaskAssignmentStage extends AbstractBaseStage {
@@ -73,6 +74,9 @@ public class TaskAssignmentStage extends AbstractBaseStage {
         batchMessage(dataAccessor.keyBuilder(), messagesToSend, resourceMap, liveInstanceMap,
             manager.getProperties());
     sendMessages(dataAccessor, outputMessages);
+    ClusterStatusMonitor clusterStatusMonitor =
+        (ClusterStatusMonitor) event.getAttribute("clusterStatusMonitor");
+    clusterStatusMonitor.increaseMessagePerInstance(outputMessages);
 
     long cacheStart = System.currentTimeMillis();
     cache.cacheMessages(outputMessages);

http://git-wip-us.apache.org/repos/asf/helix/blob/a9267e55/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ClusterStatusMonitor.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ClusterStatusMonitor.java b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ClusterStatusMonitor.java
index 7f996c5..3ce9a65 100644
--- a/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ClusterStatusMonitor.java
+++ b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ClusterStatusMonitor.java
@@ -36,6 +36,7 @@ import org.apache.helix.controller.stages.BestPossibleStateOutput;
 import org.apache.helix.model.ExternalView;
 import org.apache.helix.model.IdealState;
 import org.apache.helix.model.InstanceConfig;
+import org.apache.helix.model.Message;
 import org.apache.helix.model.Partition;
 import org.apache.helix.model.Resource;
 import org.apache.helix.model.StateModelDefinition;
@@ -252,6 +253,32 @@ public class ClusterStatusMonitor implements ClusterStatusMonitorMBean {
   }
 
   /**
+   * Update message count per instance
+   * @param messages a list of messages
+   */
+  public void increaseMessagePerInstance(List<Message> messages) {
+    Map<String, Long> messageCount = new HashMap<String, Long>();
+
+    // Aggregate messages
+    for (Message message : messages) {
+      String instanceName = message.getAttribute(Message.Attributes.TGT_NAME);
+
+      // Ignore the messages do not have target name
+      if (instanceName == null) {
+        continue;
+      }
+      messageCount.put(instanceName, messageCount.getOrDefault(instanceName, 0L) + 1L);
+    }
+
+    // Update message count per instance
+    for (String instance : messageCount.keySet()) {
+      if (_instanceMbeanMap.containsKey(instance)) {
+        _instanceMbeanMap.get(instance).updateMessageCount(messageCount.get(instance));
+      }
+    }
+  }
+
+  /**
    * Update gauges for resource at instance level
    * @param bestPossibleStates
    * @param resourceMap

http://git-wip-us.apache.org/repos/asf/helix/blob/a9267e55/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/InstanceMonitor.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/InstanceMonitor.java b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/InstanceMonitor.java
index d9875cc..dc4a0a5 100644
--- a/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/InstanceMonitor.java
+++ b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/InstanceMonitor.java
@@ -37,6 +37,7 @@ public class InstanceMonitor implements InstanceMonitorMBean {
   private List<String> _disabledPartitions;
   private boolean _isUp;
   private boolean _isEnabled;
+  private long _totalMessageReceived;
 
   /**
    * Initialize the bean
@@ -50,6 +51,7 @@ public class InstanceMonitor implements InstanceMonitorMBean {
     _disabledPartitions = Collections.emptyList();
     _isUp = false;
     _isEnabled = false;
+    _totalMessageReceived = 0;
   }
 
   @Override
@@ -68,6 +70,11 @@ public class InstanceMonitor implements InstanceMonitorMBean {
     return _isEnabled ? 1 : 0;
   }
 
+  @Override
+  public long getTotalMessageReceived() {
+    return _totalMessageReceived;
+  }
+
   /**
    * Get all the tags currently on this instance
    * @return list of tags
@@ -121,4 +128,12 @@ public class InstanceMonitor implements InstanceMonitorMBean {
     _isEnabled = isEnabled;
   }
 
+  /**
+   * Update message received for this instance
+   * @param messageReceived received message numbers
+   */
+  public synchronized void updateMessageCount(long messageReceived) {
+    _totalMessageReceived += messageReceived;
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/helix/blob/a9267e55/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/InstanceMonitorMBean.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/InstanceMonitorMBean.java b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/InstanceMonitorMBean.java
index f148700..4d949b1 100644
--- a/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/InstanceMonitorMBean.java
+++ b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/InstanceMonitorMBean.java
@@ -36,4 +36,10 @@ public interface InstanceMonitorMBean extends SensorNameProvider {
    * @return 1 if enabled, 0 if disabled
    */
   public long getEnabled();
+
+  /**
+   * Get total message received for this instances
+   * @return The total number of messages sent to this instance
+   */
+  public long getTotalMessageReceived();
 }


[5/9] helix git commit: [HELIX-645] Fix Task State Model INIT priority number

Posted by lx...@apache.org.
[HELIX-645] Fix Task State Model INIT priority number


Project: http://git-wip-us.apache.org/repos/asf/helix/repo
Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/1563338e
Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/1563338e
Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/1563338e

Branch: refs/heads/helix-0.6.x
Commit: 1563338ee5488f5f1e563059ee127edd1ac350c6
Parents: 44209b8
Author: Junkai Xue <jx...@linkedin.com>
Authored: Thu Dec 15 16:49:25 2016 -0800
Committer: Junkai Xue <jx...@linkedin.com>
Committed: Thu Dec 15 16:49:25 2016 -0800

----------------------------------------------------------------------
 helix-core/src/main/java/org/apache/helix/model/TaskSMD.java | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/helix/blob/1563338e/helix-core/src/main/java/org/apache/helix/model/TaskSMD.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/model/TaskSMD.java b/helix-core/src/main/java/org/apache/helix/model/TaskSMD.java
index fd8b585..ac16a58 100644
--- a/helix-core/src/main/java/org/apache/helix/model/TaskSMD.java
+++ b/helix-core/src/main/java/org/apache/helix/model/TaskSMD.java
@@ -48,7 +48,7 @@ public final class TaskSMD extends StateModelDefinition {
     builder.initialState(TaskPartitionState.INIT.name());
 
     // add states
-    builder.addState(TaskPartitionState.INIT.name(), 01);
+    builder.addState(TaskPartitionState.INIT.name(), 0);
     builder.addState(TaskPartitionState.RUNNING.name(), 1);
     builder.addState(TaskPartitionState.STOPPED.name(), 2);
     builder.addState(TaskPartitionState.COMPLETED.name(), 3);


[6/9] helix git commit: [HELIX-646] DeleteJob from a recurrent job queue should not throw exception if last scheduled queue not exist

Posted by lx...@apache.org.
[HELIX-646] DeleteJob from a recurrent job queue should not throw exception if last scheduled queue not exist

When delete a job from recurrent jobqueue, it will delete job from last scheduled one and recurrent template. But the last scheduled one could not started or expired, thus we should ignore the fail of deletion.


Project: http://git-wip-us.apache.org/repos/asf/helix/repo
Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/a8d54e32
Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/a8d54e32
Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/a8d54e32

Branch: refs/heads/helix-0.6.x
Commit: a8d54e32cb74d6b5d9702d4d27005498b8dc43e0
Parents: 1563338
Author: Junkai Xue <jx...@linkedin.com>
Authored: Thu Dec 15 16:57:19 2016 -0800
Committer: Junkai Xue <jx...@linkedin.com>
Committed: Thu Dec 15 16:57:19 2016 -0800

----------------------------------------------------------------------
 .../java/org/apache/helix/task/TaskDriver.java    | 18 ++++++++++++------
 1 file changed, 12 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/helix/blob/a8d54e32/helix-core/src/main/java/org/apache/helix/task/TaskDriver.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/TaskDriver.java b/helix-core/src/main/java/org/apache/helix/task/TaskDriver.java
index da861f5..4fd3966 100644
--- a/helix-core/src/main/java/org/apache/helix/task/TaskDriver.java
+++ b/helix-core/src/main/java/org/apache/helix/task/TaskDriver.java
@@ -368,7 +368,7 @@ public class TaskDriver {
       String lastScheduledQueue = wCtx.getLastScheduledSingleWorkflow();
 
       // delete the current scheduled one
-      deleteJobFromScheduledQueue(lastScheduledQueue, jobName);
+      deleteJobFromScheduledQueue(lastScheduledQueue, jobName, true);
 
       // Remove the job from the original queue template's DAG
       removeJobFromDag(queueName, jobName);
@@ -383,7 +383,7 @@ public class TaskDriver {
               .join(TaskConstants.REBALANCER_CONTEXT_ROOT, namespacedJobName);
       _propertyStore.remove(jobPropertyPath, AccessOption.PERSISTENT);
     } else {
-      deleteJobFromScheduledQueue(queueName, jobName);
+      deleteJobFromScheduledQueue(queueName, jobName, false);
     }
   }
 
@@ -393,12 +393,18 @@ public class TaskDriver {
    * @param queueName
    * @param jobName
    */
-  private void deleteJobFromScheduledQueue(final String queueName, final String jobName) {
-    WorkflowConfig workflowCfg =
-        TaskUtil.getWorkflowCfg(_accessor, queueName);
+  private void deleteJobFromScheduledQueue(final String queueName, final String jobName,
+      boolean isRecurrent) {
+    WorkflowConfig workflowCfg = TaskUtil.getWorkflowCfg(_accessor, queueName);
 
     if (workflowCfg == null) {
-      throw new IllegalArgumentException("Queue " + queueName + " does not yet exist!");
+      // When try to delete recurrent job, it could be either not started or finished. So
+      // there may not be a workflow config.
+      if (isRecurrent) {
+        return;
+      } else {
+        throw new IllegalArgumentException("Queue " + queueName + " does not yet exist!");
+      }
     }
 
     WorkflowContext wCtx = TaskUtil.getWorkflowContext(_propertyStore, queueName);


[9/9] helix git commit: [HELIX-650] Add StateTransitionConfig and Expose API add state transition timeout

Posted by lx...@apache.org.
[HELIX-650] Add StateTransitionConfig and Expose API add state transition timeout

1. Add StateTransitionConfig for add state transition properties, such as timeout.
2. Add the new API for setting state transition timeout.
3. Add logics in message generation for timeout setting in message that backward compatible.


Project: http://git-wip-us.apache.org/repos/asf/helix/repo
Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/c8c67740
Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/c8c67740
Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/c8c67740

Branch: refs/heads/helix-0.6.x
Commit: c8c677405e6d7d9e9d67594fbe1b0efda455ac2f
Parents: 4c3fc7f
Author: Junkai Xue <jx...@linkedin.com>
Authored: Fri Dec 16 15:39:57 2016 -0800
Committer: Junkai Xue <jx...@linkedin.com>
Committed: Fri Dec 16 15:50:18 2016 -0800

----------------------------------------------------------------------
 .../config/StateTransitionTimeoutConfig.java    | 130 +++++++++++++
 .../stages/MessageGenerationPhase.java          |  77 +++++---
 .../java/org/apache/helix/model/Message.java    |   2 +-
 .../org/apache/helix/model/ResourceConfig.java  |  18 +-
 .../monitoring/mbeans/ClusterStatusMonitor.java |   5 +-
 .../java/org/apache/helix/task/TaskDriver.java  |   6 +-
 .../java/org/apache/helix/task/TaskUtil.java    |   2 +-
 .../TestStateTransitionTimeoutWithResource.java | 194 +++++++++++++++++++
 .../model/TestStateTransitionProperty.java      |  40 ++++
 9 files changed, 438 insertions(+), 36 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/helix/blob/c8c67740/helix-core/src/main/java/org/apache/helix/api/config/StateTransitionTimeoutConfig.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/api/config/StateTransitionTimeoutConfig.java b/helix-core/src/main/java/org/apache/helix/api/config/StateTransitionTimeoutConfig.java
new file mode 100644
index 0000000..d39f466
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/api/config/StateTransitionTimeoutConfig.java
@@ -0,0 +1,130 @@
+package org.apache.helix.api.config;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.helix.ZNRecord;
+
+public class StateTransitionTimeoutConfig {
+  public enum StateTransitionTimeoutProperty {
+    /**
+     * The timeout for a state transition
+     */
+    TIMEOUT
+  }
+
+  private final String _resource;
+  private final Map<String, String> _timeoutMap;
+
+  public StateTransitionTimeoutConfig(ZNRecord record) {
+    _resource = record.getId();
+    if (record.getMapFields().containsKey(StateTransitionTimeoutProperty.TIMEOUT.name())) {
+      _timeoutMap = record.getMapField(StateTransitionTimeoutProperty.TIMEOUT.name());
+    } else {
+      _timeoutMap = new HashMap<String, String>();
+    }
+  }
+
+  /**
+   * Set state transition timeout for given resource.
+   * Does not apply for Workflow and Job
+   * @param from          The from state
+   * @param to            The to state
+   * @param timeout       The timeout in miliseconds
+   */
+  public void setStateTransitionTimeout(String from, String to, int timeout) {
+    setStateTransitionTimeout(null, from, to, timeout);
+  }
+
+  /**
+   * Set state transition timeout for general condition.
+   * Does not apply for Workflow and Job
+   * @param partitionName The partition prefer to time out
+   * @param from          The from state
+   * @param to            The to state
+   * @param timeout       The timeout in miliseconds
+   */
+  private void setStateTransitionTimeout(String partitionName, String from, String to,
+      int timeout) {
+    if (partitionName != null) {
+      _timeoutMap.put(partitionName, String.valueOf(timeout));
+    } else {
+      _timeoutMap.put(String.format("%s.%s", from, to), String.valueOf(timeout));
+    }
+  }
+
+  /**
+   * Get state transition time out for given partition.
+   * Does not apply for Workflow and Job
+   * @param partitionName The partition prefer to time out
+   * @param from          The from state
+   * @param to            The to state
+   * @return              The timeout in miliseconds. Or -1 if there is no timeout matched up.
+   */
+  public int getStateTransitionTimeout(String partitionName, String from, String to) {
+    if (partitionName != null && _timeoutMap.containsKey(partitionName)) {
+      return Integer.parseInt(_timeoutMap.get(partitionName));
+    } else if (_timeoutMap.containsKey(String.format("%s.%s", from, to))) {
+      return Integer.parseInt(_timeoutMap.get(String.format("%s.%s", from, to)));
+    } else if (_timeoutMap.containsKey(String.format("*.%s", to))) {
+      return Integer.parseInt(_timeoutMap.get(String.format("*.%s", to)));
+    } else if (_timeoutMap.containsKey(String.format("%s.*", from))) {
+      return Integer.parseInt(_timeoutMap.get(String.format("%s.*", from)));
+    } else if (_timeoutMap.containsKey("*.*")) {
+      return Integer.parseInt(_timeoutMap.get("*.*"));
+    }
+    return -1;
+  }
+
+  /**
+   * Get state transition time out for given partition.
+   * Does not apply for Workflow and Job
+   * @param from          The from state
+   * @param to            The to state
+   * @return              The timeout in miliseconds. Or -1 if there is no timeout matched up.
+   */
+  public int getStateTransitionTimeout(String from, String to) {
+    return getStateTransitionTimeout(null, from, to);
+  }
+
+  public Map<String, String> getTimeoutMap() {
+    return _timeoutMap;
+  }
+
+  public String getResource() {
+    return _resource;
+  }
+
+  /**
+   * Get StateTransitionTimeoutConfig from ZNRecord instead of creating a new
+   * StateTransitionTimeoutConfig object.
+   * @param record The ZNRecord to extract StateTransitionTimeoutConfig
+   * @return       A StateTransitionTimeoutConfig if ZNRecord contains StateTransitionTimeoutConfig
+   *               setting.
+   */
+  public static StateTransitionTimeoutConfig fromRecord(ZNRecord record) {
+    return record.getMapFields()
+        .containsKey(StateTransitionTimeoutConfig.StateTransitionTimeoutProperty.TIMEOUT.name())
+        ? new StateTransitionTimeoutConfig(record)
+        : null;
+  }
+}

http://git-wip-us.apache.org/repos/asf/helix/blob/c8c67740/helix-core/src/main/java/org/apache/helix/controller/stages/MessageGenerationPhase.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/MessageGenerationPhase.java b/helix-core/src/main/java/org/apache/helix/controller/stages/MessageGenerationPhase.java
index 2e919f8..63096bb 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/MessageGenerationPhase.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/MessageGenerationPhase.java
@@ -26,17 +26,19 @@ import java.util.Map;
 import java.util.UUID;
 
 import org.apache.helix.HelixManager;
+import org.apache.helix.api.config.StateTransitionTimeoutConfig;
 import org.apache.helix.controller.pipeline.AbstractBaseStage;
 import org.apache.helix.controller.pipeline.StageException;
 import org.apache.helix.manager.zk.DefaultSchedulerMessageHandlerFactory;
 import org.apache.helix.model.IdealState;
 import org.apache.helix.model.LiveInstance;
 import org.apache.helix.model.Message;
+import org.apache.helix.model.Message.MessageState;
+import org.apache.helix.model.Message.MessageType;
 import org.apache.helix.model.Partition;
 import org.apache.helix.model.Resource;
+import org.apache.helix.model.ResourceConfig;
 import org.apache.helix.model.StateModelDefinition;
-import org.apache.helix.model.Message.MessageState;
-import org.apache.helix.model.Message.MessageType;
 import org.apache.log4j.Logger;
 
 /**
@@ -136,32 +138,13 @@ public class MessageGenerationPhase extends AbstractBaseStage {
                     idealState.getRecord().getMapField(partition.getPartitionName()));
               }
             }
-            // Set timeout of needed
-            String stateTransition =
-                currentState + "-" + nextState + "_" + Message.Attributes.TIMEOUT;
-            if (idealState != null) {
-              String timeOutStr = idealState.getRecord().getSimpleField(stateTransition);
-              if (timeOutStr == null
-                  && idealState.getStateModelDefRef().equalsIgnoreCase(
-                      DefaultSchedulerMessageHandlerFactory.SCHEDULER_TASK_QUEUE)) {
-                // scheduled task queue
-                if (idealState.getRecord().getMapField(partition.getPartitionName()) != null) {
-                  timeOutStr =
-                      idealState.getRecord().getMapField(partition.getPartitionName())
-                          .get(Message.Attributes.TIMEOUT.toString());
-                }
-              }
-              if (timeOutStr != null) {
-                try {
-                  int timeout = Integer.parseInt(timeOutStr);
-                  if (timeout > 0) {
-                    message.setExecutionTimeout(timeout);
-                  }
-                } catch (Exception e) {
-                  logger.error("", e);
-                }
-              }
+
+            int timeout = getTimeOut(cache.getResourceConfig(resourceName), currentState, nextState,
+                idealState, partition);
+            if (timeout > 0) {
+              message.setExecutionTimeout(timeout);
             }
+
             message.getRecord().setSimpleField("ClusterEventName", event.getName());
             // output.addMessage(resourceName, partition, message);
             if (!messageMap.containsKey(desiredState)) {
@@ -213,4 +196,44 @@ public class MessageGenerationPhase extends AbstractBaseStage {
 
     return message;
   }
+
+  private int getTimeOut(ResourceConfig resourceConfig, String currentState, String nextState,
+      IdealState idealState, Partition partition) {
+    // Set timeout of needed
+    int timeout = -1;
+    if (resourceConfig != null) {
+      // Set timeout once ResourceConfig set
+      StateTransitionTimeoutConfig stateTransitionTimeoutConfig =
+          resourceConfig.getStateTransitionTimeoutConfig();
+      timeout = stateTransitionTimeoutConfig != null ? stateTransitionTimeoutConfig
+          .getStateTransitionTimeout(currentState, nextState) : -1;
+
+    }
+
+    if (timeout <= 0) {
+      String timeOutStr = null;
+      // Check IdealState whether has timeout set
+      if (idealState != null) {
+        String stateTransition = currentState + "-" + nextState + "_" + Message.Attributes.TIMEOUT;
+        timeOutStr = idealState.getRecord().getSimpleField(stateTransition);
+        if (timeOutStr == null && idealState.getStateModelDefRef()
+            .equalsIgnoreCase(DefaultSchedulerMessageHandlerFactory.SCHEDULER_TASK_QUEUE)) {
+          // scheduled task queue
+          if (idealState.getRecord().getMapField(partition.getPartitionName()) != null) {
+            timeOutStr = idealState.getRecord().getMapField(partition.getPartitionName())
+                .get(Message.Attributes.TIMEOUT.toString());
+          }
+        }
+      }
+      if (timeOutStr != null) {
+        try {
+          timeout = Integer.parseInt(timeOutStr);
+        } catch (Exception e) {
+          logger.error("", e);
+        }
+      }
+    }
+
+    return timeout;
+  }
 }

http://git-wip-us.apache.org/repos/asf/helix/blob/c8c67740/helix-core/src/main/java/org/apache/helix/model/Message.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/model/Message.java b/helix-core/src/main/java/org/apache/helix/model/Message.java
index 9fed87b..31f50ab 100644
--- a/helix-core/src/main/java/org/apache/helix/model/Message.java
+++ b/helix-core/src/main/java/org/apache/helix/model/Message.java
@@ -602,7 +602,7 @@ public class Message extends HelixProperty {
 
   /**
    * Get the value of an attribute
-   * @param attr {@link Attribute}
+   * @param attr {@link Attributes}
    * @return attribute value
    */
   public String getAttribute(Attributes attr) {

http://git-wip-us.apache.org/repos/asf/helix/blob/c8c67740/helix-core/src/main/java/org/apache/helix/model/ResourceConfig.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/model/ResourceConfig.java b/helix-core/src/main/java/org/apache/helix/model/ResourceConfig.java
index e0e9f89..d9a925f 100644
--- a/helix-core/src/main/java/org/apache/helix/model/ResourceConfig.java
+++ b/helix-core/src/main/java/org/apache/helix/model/ResourceConfig.java
@@ -19,13 +19,14 @@ package org.apache.helix.model;
  * under the License.
  */
 
+import java.util.Collections;
+import java.util.Map;
+
 import org.apache.helix.HelixProperty;
 import org.apache.helix.ZNRecord;
+import org.apache.helix.api.config.StateTransitionTimeoutConfig;
 import org.apache.log4j.Logger;
 
-import java.util.Collections;
-import java.util.Map;
-
 /**
  * Resource configurations
  */
@@ -84,6 +85,17 @@ public class ResourceConfig extends HelixProperty {
         .setBooleanField(ResourceConfigProperty.MONITORING_DISABLED.toString(), monitoringDisabled);
   }
 
+  // TODO: Move it to constructor and Builder when the logic merged in
+  public void setStateTransitionTimeoutConfig(
+      StateTransitionTimeoutConfig stateTransitionTimeoutConfig) {
+    putMapConfig(StateTransitionTimeoutConfig.StateTransitionTimeoutProperty.TIMEOUT.name(),
+        stateTransitionTimeoutConfig.getTimeoutMap());
+  }
+
+  public StateTransitionTimeoutConfig getStateTransitionTimeoutConfig() {
+    return StateTransitionTimeoutConfig.fromRecord(_record);
+  }
+
   /**
    * Put a set of simple configs.
    *

http://git-wip-us.apache.org/repos/asf/helix/blob/c8c67740/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ClusterStatusMonitor.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ClusterStatusMonitor.java b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ClusterStatusMonitor.java
index 3ce9a65..3f1aca4 100644
--- a/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ClusterStatusMonitor.java
+++ b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ClusterStatusMonitor.java
@@ -429,7 +429,7 @@ public class ClusterStatusMonitor implements ClusterStatusMonitorMBean {
 
     Map<String, WorkflowConfig> workflowConfigMap = driver.getWorkflows();
     for (String workflow : workflowConfigMap.keySet()) {
-      if (workflowConfigMap.get(workflow).isRecurring()) {
+      if (workflowConfigMap.get(workflow).isRecurring() || workflow.isEmpty()) {
         continue;
       }
       WorkflowContext workflowContext = driver.getWorkflowContext(workflow);
@@ -473,6 +473,9 @@ public class ClusterStatusMonitor implements ClusterStatusMonitorMBean {
       jobMonitor.resetJobGauge();
     }
     for (String workflow : driver.getWorkflows().keySet()) {
+      if (workflow.isEmpty()) {
+        continue;
+      }
       Set<String> allJobs = driver.getWorkflowConfig(workflow).getJobDag().getAllNodes();
       WorkflowContext workflowContext = driver.getWorkflowContext(workflow);
       for (String job : allJobs) {

http://git-wip-us.apache.org/repos/asf/helix/blob/c8c67740/helix-core/src/main/java/org/apache/helix/task/TaskDriver.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/TaskDriver.java b/helix-core/src/main/java/org/apache/helix/task/TaskDriver.java
index 97d1067..1020cc3 100644
--- a/helix-core/src/main/java/org/apache/helix/task/TaskDriver.java
+++ b/helix-core/src/main/java/org/apache/helix/task/TaskDriver.java
@@ -202,7 +202,7 @@ public class TaskDriver {
     flow.validate();
 
     // first, add workflow config.
-    if (!TaskUtil.setResouceConfig(_accessor, flow.getName(),
+    if (!TaskUtil.setResourceConfig(_accessor, flow.getName(),
         new WorkflowConfig(flow.getWorkflowConfig(), flow.getName()))) {
       LOG.error("Failed to add workflow configuration for workflow " + flow.getName());
     }
@@ -257,7 +257,7 @@ public class TaskDriver {
           "Workflow " + workflow + " is terminable, not allow to change its configuration!");
     }
 
-    if (!TaskUtil.setResouceConfig(_accessor, workflow, newWorkflowConfig)) {
+    if (!TaskUtil.setResourceConfig(_accessor, workflow, newWorkflowConfig)) {
       LOG.error("Failed to update workflow configuration for workflow " + workflow);
     }
 
@@ -676,7 +676,7 @@ public class TaskDriver {
 
     // Set the job configuration
     JobConfig newJobCfg = new JobConfig(jobName, jobConfig);
-    if (!TaskUtil.setResouceConfig(_accessor, jobName, newJobCfg)) {
+    if (!TaskUtil.setResourceConfig(_accessor, jobName, newJobCfg)) {
       LOG.error("Failed to add job configuration for job " + jobName);
     }
   }

http://git-wip-us.apache.org/repos/asf/helix/blob/c8c67740/helix-core/src/main/java/org/apache/helix/task/TaskUtil.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/TaskUtil.java b/helix-core/src/main/java/org/apache/helix/task/TaskUtil.java
index f29f2d0..d765cd5 100644
--- a/helix-core/src/main/java/org/apache/helix/task/TaskUtil.java
+++ b/helix-core/src/main/java/org/apache/helix/task/TaskUtil.java
@@ -120,7 +120,7 @@ public class TaskUtil {
    * @param resourceConfig  The resource config to be set
    * @return                True if set successfully, otherwise false
    */
-  protected static boolean setResouceConfig(HelixDataAccessor accessor, String resource,
+  protected static boolean setResourceConfig(HelixDataAccessor accessor, String resource,
       ResourceConfig resourceConfig) {
     PropertyKey.Builder keyBuilder = accessor.keyBuilder();
     return accessor.setProperty(keyBuilder.resourceConfig(resource), resourceConfig);

http://git-wip-us.apache.org/repos/asf/helix/blob/c8c67740/helix-core/src/test/java/org/apache/helix/integration/TestStateTransitionTimeoutWithResource.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/TestStateTransitionTimeoutWithResource.java b/helix-core/src/test/java/org/apache/helix/integration/TestStateTransitionTimeoutWithResource.java
new file mode 100644
index 0000000..f8c0135
--- /dev/null
+++ b/helix-core/src/test/java/org/apache/helix/integration/TestStateTransitionTimeoutWithResource.java
@@ -0,0 +1,194 @@
+package org.apache.helix.integration;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.util.Collection;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.helix.HelixDataAccessor;
+import org.apache.helix.HelixManager;
+import org.apache.helix.HelixManagerFactory;
+import org.apache.helix.InstanceType;
+import org.apache.helix.NotificationContext;
+import org.apache.helix.PropertyKey;
+import org.apache.helix.PropertyKey.Builder;
+import org.apache.helix.ZNRecord;
+import org.apache.helix.api.config.StateTransitionTimeoutConfig;
+import org.apache.helix.integration.manager.ClusterControllerManager;
+import org.apache.helix.integration.manager.MockParticipantManager;
+import org.apache.helix.messaging.handling.MessageHandler.ErrorCode;
+import org.apache.helix.mock.participant.MockMSStateModel;
+import org.apache.helix.mock.participant.MockTransition;
+import org.apache.helix.mock.participant.SleepTransition;
+import org.apache.helix.model.ExternalView;
+import org.apache.helix.model.IdealState;
+import org.apache.helix.model.Message;
+import org.apache.helix.model.ResourceConfig;
+import org.apache.helix.participant.statemachine.StateModelFactory;
+import org.apache.helix.participant.statemachine.StateModelInfo;
+import org.apache.helix.participant.statemachine.StateTransitionError;
+import org.apache.helix.participant.statemachine.Transition;
+import org.apache.helix.tools.ClusterSetup;
+import org.apache.helix.tools.ClusterStateVerifier;
+import org.apache.helix.tools.ClusterStateVerifier.MasterNbInExtViewVerifier;
+import org.apache.log4j.Logger;
+import org.testng.Assert;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+public class TestStateTransitionTimeoutWithResource extends ZkStandAloneCMTestBase {
+  private static Logger LOG = Logger.getLogger(TestStateTransitionTimeout.class);
+  private HelixManager _manager;
+
+  @Override
+  @BeforeClass
+  public void beforeClass() throws Exception {
+    System.out.println("START " + CLASS_NAME + " at " + new Date(System.currentTimeMillis()));
+
+    String namespace = "/" + CLUSTER_NAME;
+    if (_gZkClient.exists(namespace)) {
+      _gZkClient.deleteRecursive(namespace);
+    }
+    _setupTool = new ClusterSetup(ZK_ADDR);
+
+    // setup storage cluster
+    _setupTool.addCluster(CLUSTER_NAME, true);
+    _setupTool.addResourceToCluster(CLUSTER_NAME, TEST_DB, _PARTITIONS, STATE_MODEL);
+
+    for (int i = 0; i < NODE_NR; i++) {
+      String storageNodeName = PARTICIPANT_PREFIX + "_" + (START_PORT + i);
+      _setupTool.addInstanceToCluster(CLUSTER_NAME, storageNodeName);
+    }
+    _setupTool.rebalanceStorageCluster(CLUSTER_NAME, TEST_DB, 3);
+
+    // Set the timeout values
+    StateTransitionTimeoutConfig stateTransitionTimeoutConfig =
+        new StateTransitionTimeoutConfig(new ZNRecord(TEST_DB));
+    stateTransitionTimeoutConfig.setStateTransitionTimeout("SLAVE", "MASTER", 300);
+    ResourceConfig resourceConfig = new ResourceConfig(TEST_DB);
+    resourceConfig.setStateTransitionTimeoutConfig(stateTransitionTimeoutConfig);
+
+    _manager = HelixManagerFactory
+        .getZKHelixManager(CLUSTER_NAME, "Admin", InstanceType.ADMINISTRATOR, ZK_ADDR);
+    _manager.connect();
+    PropertyKey.Builder keyBuilder = _manager.getHelixDataAccessor().keyBuilder();
+    _manager.getHelixDataAccessor().setProperty(keyBuilder.resourceConfig(TEST_DB), resourceConfig);
+
+  }
+
+  @StateModelInfo(initialState = "OFFLINE", states = {
+      "MASTER", "SLAVE", "ERROR"
+  })
+  public static class TimeOutStateModel extends MockMSStateModel {
+    boolean _sleep = false;
+    StateTransitionError _error;
+    int _errorCallcount = 0;
+
+    public TimeOutStateModel(MockTransition transition, boolean sleep) {
+      super(transition);
+      _sleep = sleep;
+    }
+
+    @Transition(to = "MASTER", from = "SLAVE")
+    public void onBecomeMasterFromSlave(Message message, NotificationContext context)
+        throws InterruptedException {
+      LOG.info("Become MASTER from SLAVE");
+      if (_transition != null && _sleep) {
+        _transition.doTransition(message, context);
+      }
+    }
+
+    @Override
+    public void rollbackOnError(Message message, NotificationContext context,
+        StateTransitionError error) {
+      _error = error;
+      _errorCallcount++;
+    }
+  }
+
+  public static class SleepStateModelFactory extends StateModelFactory<TimeOutStateModel> {
+    Set<String> partitionsToSleep = new HashSet<String>();
+    int _sleepTime;
+
+    public SleepStateModelFactory(int sleepTime) {
+      _sleepTime = sleepTime;
+    }
+
+    public void setPartitions(Collection<String> partitions) {
+      partitionsToSleep.addAll(partitions);
+    }
+
+    public void addPartition(String partition) {
+      partitionsToSleep.add(partition);
+    }
+
+    @Override
+    public TimeOutStateModel createNewStateModel(String resource, String stateUnitKey) {
+      return new TimeOutStateModel(new SleepTransition(_sleepTime),
+          partitionsToSleep.contains(stateUnitKey));
+    }
+  }
+
+  @Test
+  public void testStateTransitionTimeOut() throws Exception {
+    Map<String, SleepStateModelFactory> factories = new HashMap<String, SleepStateModelFactory>();
+    IdealState idealState =
+        _setupTool.getClusterManagementTool().getResourceIdealState(CLUSTER_NAME, TEST_DB);
+    for (int i = 0; i < NODE_NR; i++) {
+      String instanceName = PARTICIPANT_PREFIX + "_" + (START_PORT + i);
+      SleepStateModelFactory factory = new SleepStateModelFactory(1000);
+      factories.put(instanceName, factory);
+      for (String p : idealState.getPartitionSet()) {
+        if (idealState.getPreferenceList(p).get(0).equals(instanceName)) {
+          factory.addPartition(p);
+        }
+      }
+
+      _participants[i] = new MockParticipantManager(ZK_ADDR, CLUSTER_NAME, instanceName);
+      _participants[i].getStateMachineEngine().registerStateModelFactory("MasterSlave", factory);
+      _participants[i].syncStart();
+    }
+    String controllerName = CONTROLLER_PREFIX + "_0";
+    _controller =
+        new ClusterControllerManager(ZK_ADDR, CLUSTER_NAME, controllerName);
+    _controller.syncStart();
+
+    boolean result =
+        ClusterStateVerifier
+            .verifyByZkCallback(new MasterNbInExtViewVerifier(ZK_ADDR, CLUSTER_NAME));
+    Assert.assertTrue(result);
+    HelixDataAccessor accessor = _participants[0].getHelixDataAccessor();
+
+    Builder kb = accessor.keyBuilder();
+    ExternalView ev = accessor.getProperty(kb.externalView(TEST_DB));
+    for (String p : idealState.getPartitionSet()) {
+      String idealMaster = idealState.getPreferenceList(p).get(0);
+      Assert.assertTrue(ev.getStateMap(p).get(idealMaster).equals("ERROR"));
+
+      TimeOutStateModel model = factories.get(idealMaster).getStateModel(TEST_DB, p);
+      Assert.assertEquals(model._errorCallcount, 1);
+      Assert.assertEquals(model._error.getCode(), ErrorCode.TIMEOUT);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/helix/blob/c8c67740/helix-core/src/test/java/org/apache/helix/model/TestStateTransitionProperty.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/model/TestStateTransitionProperty.java b/helix-core/src/test/java/org/apache/helix/model/TestStateTransitionProperty.java
new file mode 100644
index 0000000..86ab7fb
--- /dev/null
+++ b/helix-core/src/test/java/org/apache/helix/model/TestStateTransitionProperty.java
@@ -0,0 +1,40 @@
+package org.apache.helix.model;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import org.apache.helix.ZNRecord;
+import org.apache.helix.api.config.StateTransitionTimeoutConfig;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+public class TestStateTransitionProperty {
+
+  @Test
+  public void testTimeoutSetAndGet() {
+    StateTransitionTimeoutConfig stateTransitionTimeoutConfig = new StateTransitionTimeoutConfig(new ZNRecord("TEST"));
+    stateTransitionTimeoutConfig.setStateTransitionTimeout("MASTER", "SLAVE", 300);
+    Assert.assertEquals(stateTransitionTimeoutConfig.getStateTransitionTimeout("MASTER", "SLAVE"), 300);
+
+    stateTransitionTimeoutConfig.setStateTransitionTimeout("*", "MASTER", 500);
+    Assert.assertEquals(stateTransitionTimeoutConfig.getStateTransitionTimeout("OFFLINE", "MASTER"), 500);
+
+    Assert.assertEquals(stateTransitionTimeoutConfig.getStateTransitionTimeout("SLAVE", "OFFLINE"), -1);
+  }
+}


[7/9] helix git commit: [HELIX-648] Extend WorkflowConfig and JobConfig to ResourceConfig

Posted by lx...@apache.org.
[HELIX-648] Extend WorkflowConfig and JobConfig to ResourceConfig

WorkflowConfig and JobConfig are stored as ResourceConfig but still needs extra conversion in current codebase. Thus we have to extend those two configs to ResourceConfig.


Project: http://git-wip-us.apache.org/repos/asf/helix/repo
Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/1eb40c3f
Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/1eb40c3f
Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/1eb40c3f

Branch: refs/heads/helix-0.6.x
Commit: 1eb40c3ffb5191c94ec4055b692d2601723f91cf
Parents: a8d54e3
Author: Junkai Xue <jx...@linkedin.com>
Authored: Thu Dec 15 18:20:03 2016 -0800
Committer: Junkai Xue <jx...@linkedin.com>
Committed: Thu Dec 15 18:20:03 2016 -0800

----------------------------------------------------------------------
 .../java/org/apache/helix/HelixProperty.java    |   9 +
 .../org/apache/helix/model/ResourceConfig.java  |  34 ++
 .../java/org/apache/helix/task/JobConfig.java   | 321 ++++++++++++-------
 .../java/org/apache/helix/task/TaskDriver.java  |  24 +-
 .../java/org/apache/helix/task/TaskUtil.java    |  31 +-
 .../org/apache/helix/task/WorkflowConfig.java   | 230 +++++++------
 .../org/apache/helix/task/beans/JobBean.java    |   2 +
 .../task/TestTaskRebalancerParallel.java        |   2 +
 .../integration/task/TestTaskRetryDelay.java    |   2 +-
 .../integration/task/TestUserContentStore.java  |   6 +-
 10 files changed, 422 insertions(+), 239 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/helix/blob/1eb40c3f/helix-core/src/main/java/org/apache/helix/HelixProperty.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/HelixProperty.java b/helix-core/src/main/java/org/apache/helix/HelixProperty.java
index 6c52a47..390e8d7 100644
--- a/helix-core/src/main/java/org/apache/helix/HelixProperty.java
+++ b/helix-core/src/main/java/org/apache/helix/HelixProperty.java
@@ -59,6 +59,15 @@ public class HelixProperty {
   }
 
   /**
+   * Initialize the property with an existing ZNRecord with new record id
+   * @param record
+   * @param id
+   */
+  public HelixProperty(ZNRecord record, String id) {
+    _record = new ZNRecord(record, id);
+  }
+
+  /**
    * Get the property identifier
    * @return the property id
    */

http://git-wip-us.apache.org/repos/asf/helix/blob/1eb40c3f/helix-core/src/main/java/org/apache/helix/model/ResourceConfig.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/model/ResourceConfig.java b/helix-core/src/main/java/org/apache/helix/model/ResourceConfig.java
index d58126d..e0e9f89 100644
--- a/helix-core/src/main/java/org/apache/helix/model/ResourceConfig.java
+++ b/helix-core/src/main/java/org/apache/helix/model/ResourceConfig.java
@@ -58,6 +58,14 @@ public class ResourceConfig extends HelixProperty {
   }
 
   /**
+   * Instantiate with a pre-populated record with new record id
+   * @param record a ZNRecord corresponding to an instance configuration
+   * @param id     new ZNRecord ID
+   */
+  public ResourceConfig(ZNRecord record, String id) {
+    super(record, id);
+  }
+  /**
    * Get the value of DisableMonitoring set.
    *
    * @return the MonitoringDisabled is true or false
@@ -134,6 +142,32 @@ public class ResourceConfig extends HelixProperty {
     return getRecord().getMapField(configKey);
   }
 
+  /**
+   * Determine whether the given config key is in the simple config
+   * @param configKey The key to check whether exists
+   * @return True if exists, otherwise false
+   */
+  public boolean simpleConfigContains(String configKey) {
+    return getRecord().getSimpleFields().containsKey(configKey);
+  }
+
+  /**
+   * Determine whether the given config key is the map config
+   * @param configKey The key to check whether exists
+   * @return True if exists, otherwise false
+   */
+  public boolean mapConfigContains(String configKey) {
+    return getRecord().getMapFields().containsKey(configKey);
+  }
+
+  /**
+   * Get the stored map fields
+   * @return a map of map fields
+   */
+  public Map<String, Map<String, String>> getMapConfigs() {
+    return getRecord().getMapFields();
+  }
+
   @Override
   public boolean equals(Object obj) {
     if (obj instanceof ResourceConfig) {

http://git-wip-us.apache.org/repos/asf/helix/blob/1eb40c3f/helix-core/src/main/java/org/apache/helix/task/JobConfig.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/JobConfig.java b/helix-core/src/main/java/org/apache/helix/task/JobConfig.java
index a966f35..b701623 100644
--- a/helix-core/src/main/java/org/apache/helix/task/JobConfig.java
+++ b/helix-core/src/main/java/org/apache/helix/task/JobConfig.java
@@ -20,27 +20,27 @@ package org.apache.helix.task;
  */
 
 import java.util.Arrays;
-import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
 
+import org.apache.helix.HelixProperty;
+import org.apache.helix.model.ResourceConfig;
+import org.apache.helix.task.beans.JobBean;
+import org.apache.helix.task.beans.TaskBean;
+
 import com.google.common.base.Joiner;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableSet;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
-import org.apache.helix.task.beans.JobBean;
-import org.apache.helix.task.beans.TaskBean;
-import org.apache.helix.HelixProperty;
 
 /**
  * Provides a typed interface to job configurations.
  */
-// TODO: extends JobConfig from ResourceConfig
-public class JobConfig {
+public class JobConfig extends ResourceConfig {
 
   /**
    * Do not use this value directly, always use the get/set methods in JobConfig and JobConfig.Builder.
@@ -51,6 +51,10 @@ public class JobConfig {
      */
     WorkflowID,
     /**
+     * The name of the job
+     */
+    JobID,
+    /**
      * The assignment strategy of this job
      */
     AssignmentStrategy,
@@ -87,7 +91,7 @@ public class JobConfig {
     /**
      * The maximum number of times Helix will intentionally move a failing task
      */
-    MaxForcedReassignmentsPerTask,
+        MaxForcedReassignmentsPerTask,
     /**
      * The number of concurrent tasks that are allowed to run on an instance.
      */
@@ -123,7 +127,22 @@ public class JobConfig {
     /**
      * The instance group that task assign to
      */
-    InstanceGroupTag
+    InstanceGroupTag,
+
+    /**
+     * The job execution delay time
+     */
+    DelayTime,
+
+    /**
+     * The job execution start time
+     */
+    StartTime,
+
+    /**
+     * The expiration time for the job
+     */
+    Expiry
   }
 
   //Default property values
@@ -136,164 +155,203 @@ public class JobConfig {
   public static final boolean DEFAULT_DISABLE_EXTERNALVIEW = false;
   public static final boolean DEFAULT_IGNORE_DEPENDENT_JOB_FAILURE = false;
   public static final int DEFAULT_NUMBER_OF_TASKS = 0;
+  public static final long DEFAULT_JOB_EXECUTION_START_TIME = -1L;
+  public static final long DEFAULT_Job_EXECUTION_DELAY_TIME = -1L;
 
-  private final String _workflow;
-  private final String _targetResource;
-  private final String _jobType;
-  private final String _instanceGroupTag;
-  private final List<String> _targetPartitions;
-  private final Set<String> _targetPartitionStates;
-  private final String _command;
-  private final Map<String, String> _jobCommandConfigMap;
-  private final long _timeoutPerTask;
-  private final int _numConcurrentTasksPerInstance;
-  private final int _maxAttemptsPerTask;
-  private final int _maxForcedReassignmentsPerTask;
-  private final int _failureThreshold;
-  private final long _retryDelay;
-  private final boolean _disableExternalView;
-  private final boolean _ignoreDependentJobFailure;
-  private final Map<String, TaskConfig> _taskConfigMap;
+  public JobConfig(HelixProperty property) {
+    super(property.getRecord());
+  }
+
+  public JobConfig(String jobId, JobConfig jobConfig) {
+    this(jobConfig.getWorkflow(), jobConfig.getTargetResource(), jobConfig.getTargetPartitions(),
+        jobConfig.getTargetPartitionStates(), jobConfig.getCommand(),
+        jobConfig.getJobCommandConfigMap(), jobConfig.getTimeoutPerTask(),
+        jobConfig.getNumConcurrentTasksPerInstance(), jobConfig.getMaxAttemptsPerTask(),
+        jobConfig.getMaxAttemptsPerTask(), jobConfig.getFailureThreshold(),
+        jobConfig.getTaskRetryDelay(), jobConfig.isDisableExternalView(),
+        jobConfig.isIgnoreDependentJobFailure(), jobConfig.getTaskConfigMap(),
+        jobConfig.getJobType(), jobConfig.getInstanceGroupTag(), jobConfig.getExecutionDelay(),
+        jobConfig.getExecutionStart(), jobId, jobConfig.getExpiry());
+  }
 
   private JobConfig(String workflow, String targetResource, List<String> targetPartitions,
       Set<String> targetPartitionStates, String command, Map<String, String> jobCommandConfigMap,
       long timeoutPerTask, int numConcurrentTasksPerInstance, int maxAttemptsPerTask,
       int maxForcedReassignmentsPerTask, int failureThreshold, long retryDelay,
       boolean disableExternalView, boolean ignoreDependentJobFailure,
-      Map<String, TaskConfig> taskConfigMap, String jobType, String instanceGroupTag) {
-    _workflow = workflow;
-    _targetResource = targetResource;
-    _targetPartitions = targetPartitions;
-    _targetPartitionStates = targetPartitionStates;
-    _command = command;
-    _jobCommandConfigMap = jobCommandConfigMap;
-    _timeoutPerTask = timeoutPerTask;
-    _numConcurrentTasksPerInstance = numConcurrentTasksPerInstance;
-    _maxAttemptsPerTask = maxAttemptsPerTask;
-    _maxForcedReassignmentsPerTask = maxForcedReassignmentsPerTask;
-    _failureThreshold = failureThreshold;
-    _retryDelay = retryDelay;
-    _disableExternalView = disableExternalView;
-    _ignoreDependentJobFailure = ignoreDependentJobFailure;
+      Map<String, TaskConfig> taskConfigMap, String jobType, String instanceGroupTag,
+      long executionDelay, long executionStart, String jobId, long expiry) {
+    super(jobId);
+    putSimpleConfig(JobConfigProperty.WorkflowID.name(), workflow);
+    putSimpleConfig(JobConfigProperty.JobID.name(), jobId);
+    if (command != null) {
+      putSimpleConfig(JobConfigProperty.Command.name(), command);
+    }
+    if (jobCommandConfigMap != null) {
+      String serializedConfig = TaskUtil.serializeJobCommandConfigMap(jobCommandConfigMap);
+      if (serializedConfig != null) {
+        putSimpleConfig(JobConfigProperty.JobCommandConfig.name(), serializedConfig);
+      }
+    }
+    if (targetResource != null) {
+      putSimpleConfig(JobConfigProperty.TargetResource.name(), targetResource);
+    }
+    if (targetPartitionStates != null) {
+      putSimpleConfig(JobConfigProperty.TargetPartitionStates.name(),
+          Joiner.on(",").join(targetPartitionStates));
+    }
+    if (targetPartitions != null) {
+      putSimpleConfig(JobConfigProperty.TargetPartitions.name(),
+          Joiner.on(",").join(targetPartitions));
+    }
+    if (retryDelay > 0) {
+      getRecord().setLongField(JobConfigProperty.TaskRetryDelay.name(), retryDelay);
+    }
+    if (executionDelay > 0) {
+      getRecord().setLongField(JobConfigProperty.DelayTime.name(), executionDelay);
+    }
+    if (executionStart > 0) {
+      getRecord().setLongField(JobConfigProperty.StartTime.name(), executionStart);
+    }
+    getRecord().setLongField(JobConfigProperty.TimeoutPerPartition.name(), timeoutPerTask);
+    getRecord().setIntField(JobConfigProperty.MaxAttemptsPerTask.name(), maxAttemptsPerTask);
+    getRecord().setIntField(JobConfigProperty.MaxForcedReassignmentsPerTask.name(),
+        maxForcedReassignmentsPerTask);
+    getRecord().setIntField(JobConfigProperty.FailureThreshold.name(), failureThreshold);
+    getRecord().setBooleanField(JobConfigProperty.DisableExternalView.name(), disableExternalView);
+    getRecord().setIntField(JobConfigProperty.ConcurrentTasksPerInstance.name(),
+        numConcurrentTasksPerInstance);
+    getRecord().setBooleanField(JobConfigProperty.IgnoreDependentJobFailure.name(),
+        ignoreDependentJobFailure);
+    if (jobType != null) {
+      putSimpleConfig(JobConfigProperty.JobType.name(), jobType);
+    }
+    if (instanceGroupTag != null) {
+      putSimpleConfig(JobConfigProperty.InstanceGroupTag.name(), instanceGroupTag);
+    }
     if (taskConfigMap != null) {
-      _taskConfigMap = taskConfigMap;
-    } else {
-      _taskConfigMap = Collections.emptyMap();
+      for (TaskConfig taskConfig : taskConfigMap.values()) {
+        putMapConfig(taskConfig.getId(), taskConfig.getConfigMap());
+      }
     }
-    _jobType = jobType;
-    _instanceGroupTag = instanceGroupTag;
+    if (expiry > 0) {
+      getRecord().setLongField(JobConfigProperty.Expiry.name(), expiry);
+    }
+    putSimpleConfig(ResourceConfigProperty.MONITORING_DISABLED.toString(),
+        String.valueOf(WorkflowConfig.DEFAULT_MONITOR_DISABLE));
   }
 
   public String getWorkflow() {
-    return _workflow == null ? Workflow.UNSPECIFIED : _workflow;
+    return simpleConfigContains(JobConfigProperty.WorkflowID.name())
+        ? getSimpleConfig(JobConfigProperty.WorkflowID.name())
+        : Workflow.UNSPECIFIED;
+  }
+
+  public String getJobId() {
+    return getSimpleConfig(JobConfigProperty.JobID.name());
   }
 
   public String getTargetResource() {
-    return _targetResource;
+    return getSimpleConfig(JobConfigProperty.TargetResource.name());
   }
 
   public List<String> getTargetPartitions() {
-    return _targetPartitions;
+    return simpleConfigContains(JobConfigProperty.TargetPartitions.name()) ? Arrays
+        .asList(getSimpleConfig(JobConfigProperty.TargetPartitions.name()).split(",")) : null;
   }
 
   public Set<String> getTargetPartitionStates() {
-    return _targetPartitionStates;
+    if (simpleConfigContains(JobConfigProperty.TargetPartitionStates.name())) {
+      return new HashSet<String>(Arrays
+          .asList(getSimpleConfig(JobConfigProperty.TargetPartitionStates.name()).split(",")));
+    }
+    return null;
   }
 
   public String getCommand() {
-    return _command;
+    return getSimpleConfig(JobConfigProperty.Command.name());
   }
 
   public Map<String, String> getJobCommandConfigMap() {
-    return _jobCommandConfigMap;
+    return simpleConfigContains(JobConfigProperty.JobCommandConfig.name())
+        ? TaskUtil
+        .deserializeJobCommandConfigMap(getSimpleConfig(JobConfigProperty.JobCommandConfig.name()))
+        : null;
   }
 
   public long getTimeoutPerTask() {
-    return _timeoutPerTask;
+    return getRecord()
+        .getLongField(JobConfigProperty.TimeoutPerPartition.name(), DEFAULT_TIMEOUT_PER_TASK);
   }
 
   public int getNumConcurrentTasksPerInstance() {
-    return _numConcurrentTasksPerInstance;
+    return getRecord().getIntField(JobConfigProperty.ConcurrentTasksPerInstance.name(),
+        DEFAULT_NUM_CONCURRENT_TASKS_PER_INSTANCE);
   }
 
   public int getMaxAttemptsPerTask() {
-    return _maxAttemptsPerTask;
+    return getRecord()
+        .getIntField(JobConfigProperty.MaxAttemptsPerTask.name(), DEFAULT_MAX_ATTEMPTS_PER_TASK);
   }
 
   public int getFailureThreshold() {
-    return _failureThreshold;
+    return getRecord()
+        .getIntField(JobConfigProperty.FailureThreshold.name(), DEFAULT_FAILURE_THRESHOLD);
   }
 
   public long getTaskRetryDelay() {
-    return _retryDelay;
+    return getRecord()
+        .getLongField(JobConfigProperty.TaskRetryDelay.name(), DEFAULT_TASK_RETRY_DELAY);
+  }
+
+  // Execution delay time will be ignored when it is negative number
+  public long getExecutionDelay() {
+    return getRecord()
+        .getLongField(JobConfigProperty.DelayTime.name(), DEFAULT_Job_EXECUTION_DELAY_TIME);
+  }
+
+  public long getExecutionStart() {
+    return getRecord()
+        .getLongField(JobConfigProperty.StartTime.name(), DEFAULT_JOB_EXECUTION_START_TIME);
   }
 
   public boolean isDisableExternalView() {
-    return _disableExternalView;
+    return getRecord().getBooleanField(JobConfigProperty.DisableExternalView.name(),
+        DEFAULT_DISABLE_EXTERNALVIEW);
   }
 
-  public boolean isIgnoreDependentJobFailure() { return _ignoreDependentJobFailure; }
+  public boolean isIgnoreDependentJobFailure() {
+    return getRecord().getBooleanField(JobConfigProperty.IgnoreDependentJobFailure.name(),
+        DEFAULT_IGNORE_DEPENDENT_JOB_FAILURE);
+  }
 
   public Map<String, TaskConfig> getTaskConfigMap() {
-    return _taskConfigMap;
+    Map<String, TaskConfig> taskConfigMap = new HashMap<String, TaskConfig>();
+    for (Map.Entry<String, Map<String, String>> entry : getMapConfigs().entrySet()) {
+      taskConfigMap
+          .put(entry.getKey(), new TaskConfig(null, entry.getValue(), entry.getKey(), null));
+    }
+    return taskConfigMap;
   }
 
   public TaskConfig getTaskConfig(String id) {
-    return _taskConfigMap.get(id);
+    return new TaskConfig(null, getMapConfig(id), id, null);
   }
 
   public Map<String, String> getResourceConfigMap() {
-    Map<String, String> cfgMap = new HashMap<String, String>();
-    cfgMap.put(JobConfigProperty.WorkflowID.name(), _workflow);
-    if (_command != null) {
-      cfgMap.put(JobConfigProperty.Command.name(), _command);
-    }
-    if (_jobCommandConfigMap != null) {
-      String serializedConfig = TaskUtil.serializeJobCommandConfigMap(_jobCommandConfigMap);
-      if (serializedConfig != null) {
-        cfgMap.put(JobConfigProperty.JobCommandConfig.name(), serializedConfig);
-      }
-    }
-    if (_targetResource != null) {
-      cfgMap.put(JobConfigProperty.TargetResource.name(), _targetResource);
-    }
-    if (_targetPartitionStates != null) {
-      cfgMap.put(JobConfigProperty.TargetPartitionStates.name(),
-          Joiner.on(",").join(_targetPartitionStates));
-    }
-    if (_targetPartitions != null) {
-      cfgMap
-          .put(JobConfigProperty.TargetPartitions.name(), Joiner.on(",").join(_targetPartitions));
-    }
-    if (_retryDelay > 0) {
-      cfgMap.put(JobConfigProperty.TaskRetryDelay.name(), "" + _retryDelay);
-    }
-    cfgMap.put(JobConfigProperty.TimeoutPerPartition.name(), "" + _timeoutPerTask);
-    cfgMap.put(JobConfigProperty.MaxAttemptsPerTask.name(), "" + _maxAttemptsPerTask);
-    cfgMap.put(JobConfigProperty.MaxForcedReassignmentsPerTask.name(),
-        "" + _maxForcedReassignmentsPerTask);
-    cfgMap.put(JobConfigProperty.FailureThreshold.name(), "" + _failureThreshold);
-    cfgMap.put(JobConfigProperty.DisableExternalView.name(),
-        Boolean.toString(_disableExternalView));
-    cfgMap.put(JobConfigProperty.ConcurrentTasksPerInstance.name(),
-        "" + _numConcurrentTasksPerInstance);
-    cfgMap.put(JobConfigProperty.IgnoreDependentJobFailure.name(),
-        Boolean.toString(_ignoreDependentJobFailure));
-    if (_jobType != null) {
-      cfgMap.put(JobConfigProperty.JobType.name(), _jobType);
-    }
-    if (_instanceGroupTag != null) {
-      cfgMap.put(JobConfigProperty.InstanceGroupTag.name(), _instanceGroupTag);
-    }
-    return cfgMap;
+    return getSimpleConfigs();
   }
 
   public String getJobType() {
-    return _jobType;
+    return getSimpleConfig(JobConfigProperty.JobType.name());
   }
 
   public String getInstanceGroupTag() {
-    return _instanceGroupTag;
+    return getSimpleConfig(JobConfigProperty.InstanceGroupTag.name());
+  }
+
+  public Long getExpiry() {
+    return getRecord().getLongField(JobConfigProperty.Expiry.name(), WorkflowConfig.DEFAULT_EXPIRY);
   }
 
   public static JobConfig fromHelixProperty(HelixProperty property)
@@ -306,9 +364,8 @@ public class JobConfig {
    * A builder for {@link JobConfig}. Validates the configurations.
    */
   public static class Builder {
-    private final String NUMBER_OF_TASKS = "NumberOfTasks";
-
     private String _workflow;
+    private String _jobId;
     private String _targetResource;
     private String _jobType;
     private String _instanceGroupTag;
@@ -323,25 +380,31 @@ public class JobConfig {
     private int _maxForcedReassignmentsPerTask = DEFAULT_MAX_FORCED_REASSIGNMENTS_PER_TASK;
     private int _failureThreshold = DEFAULT_FAILURE_THRESHOLD;
     private long _retryDelay = DEFAULT_TASK_RETRY_DELAY;
+    private long _executionStart = DEFAULT_JOB_EXECUTION_START_TIME;
+    private long _executionDelay = DEFAULT_Job_EXECUTION_DELAY_TIME;
+    private long _expiry = WorkflowConfig.DEFAULT_EXPIRY;
     private boolean _disableExternalView = DEFAULT_DISABLE_EXTERNALVIEW;
     private boolean _ignoreDependentJobFailure = DEFAULT_IGNORE_DEPENDENT_JOB_FAILURE;
     private int _numberOfTasks = DEFAULT_NUMBER_OF_TASKS;
 
     public JobConfig build() {
-      validate();
-
-      if (_taskConfigMap.isEmpty()) {
+      if (_targetResource == null && _taskConfigMap.isEmpty()) {
         for (int i = 0; i < _numberOfTasks; i++) {
           TaskConfig taskConfig = new TaskConfig(null, null);
           _taskConfigMap.put(taskConfig.getId(), taskConfig);
         }
       }
+      if (_jobId == null) {
+        _jobId = "";
+      }
+
+      validate();
 
       return new JobConfig(_workflow, _targetResource, _targetPartitions, _targetPartitionStates,
           _command, _commandConfig, _timeoutPerTask, _numConcurrentTasksPerInstance,
           _maxAttemptsPerTask, _maxForcedReassignmentsPerTask, _failureThreshold, _retryDelay,
           _disableExternalView, _ignoreDependentJobFailure, _taskConfigMap, _jobType,
-          _instanceGroupTag);
+          _instanceGroupTag, _executionDelay, _executionStart, _jobId, _expiry);
     }
 
     /**
@@ -355,6 +418,9 @@ public class JobConfig {
       if (cfg.containsKey(JobConfigProperty.WorkflowID.name())) {
         b.setWorkflow(cfg.get(JobConfigProperty.WorkflowID.name()));
       }
+      if (cfg.containsKey(JobConfigProperty.JobID.name())) {
+        b.setJobId(cfg.get(JobConfigProperty.JobID.name()));
+      }
       if (cfg.containsKey(JobConfigProperty.TargetResource.name())) {
         b.setTargetResource(cfg.get(JobConfigProperty.TargetResource.name()));
       }
@@ -391,6 +457,12 @@ public class JobConfig {
       if (cfg.containsKey(JobConfigProperty.TaskRetryDelay.name())) {
         b.setTaskRetryDelay(Long.parseLong(cfg.get(JobConfigProperty.TaskRetryDelay.name())));
       }
+      if (cfg.containsKey(JobConfigProperty.DelayTime.name())) {
+        b.setExecutionDelay(Long.parseLong(cfg.get(JobConfigProperty.DelayTime.name())));
+      }
+      if (cfg.containsKey(JobConfigProperty.StartTime.name())) {
+        b.setExecutionStart(Long.parseLong(cfg.get(JobConfigProperty.StartTime.name())));
+      }
       if (cfg.containsKey(JobConfigProperty.DisableExternalView.name())) {
         b.setDisableExternalView(
             Boolean.valueOf(cfg.get(JobConfigProperty.DisableExternalView.name())));
@@ -405,6 +477,9 @@ public class JobConfig {
       if (cfg.containsKey(JobConfigProperty.InstanceGroupTag.name())) {
         b.setInstanceGroupTag(cfg.get(JobConfigProperty.InstanceGroupTag.name()));
       }
+      if (cfg.containsKey(JobConfigProperty.Expiry.name())) {
+        b.setExpiry(Long.valueOf(cfg.get(JobConfigProperty.Expiry.name())));
+      }
       return b;
     }
 
@@ -413,6 +488,11 @@ public class JobConfig {
       return this;
     }
 
+    public Builder setJobId(String v) {
+      _jobId = v;
+      return this;
+    }
+
     public Builder setTargetResource(String v) {
       _targetResource = v;
       return this;
@@ -475,6 +555,16 @@ public class JobConfig {
       return this;
     }
 
+    public Builder setExecutionDelay(long v) {
+      _executionDelay = v;
+      return this;
+    }
+
+    public Builder setExecutionStart(long v) {
+      _executionStart = v;
+      return this;
+    }
+
     public Builder setDisableExternalView(boolean disableExternalView) {
       _disableExternalView = disableExternalView;
       return this;
@@ -509,6 +599,11 @@ public class JobConfig {
       return this;
     }
 
+    public Builder setExpiry(Long expiry) {
+      _expiry = expiry;
+      return this;
+    }
+
     private void validate() {
       if (_taskConfigMap.isEmpty() && _targetResource == null) {
         throw new IllegalArgumentException(
@@ -578,7 +673,9 @@ public class JobConfig {
           .setTimeoutPerTask(jobBean.timeoutPerPartition)
           .setFailureThreshold(jobBean.failureThreshold).setTaskRetryDelay(jobBean.taskRetryDelay)
           .setDisableExternalView(jobBean.disableExternalView)
-          .setIgnoreDependentJobFailure(jobBean.ignoreDependentJobFailure).setNumberOfTasks(jobBean.numberOfTasks);
+          .setIgnoreDependentJobFailure(jobBean.ignoreDependentJobFailure)
+          .setNumberOfTasks(jobBean.numberOfTasks).setExecutionDelay(jobBean.executionDelay)
+          .setExecutionStart(jobBean.executionStart);
 
       if (jobBean.jobCommandConfigMap != null) {
         b.setJobCommandConfigMap(jobBean.jobCommandConfigMap);

http://git-wip-us.apache.org/repos/asf/helix/blob/1eb40c3f/helix-core/src/main/java/org/apache/helix/task/TaskDriver.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/TaskDriver.java b/helix-core/src/main/java/org/apache/helix/task/TaskDriver.java
index 4fd3966..97d1067 100644
--- a/helix-core/src/main/java/org/apache/helix/task/TaskDriver.java
+++ b/helix-core/src/main/java/org/apache/helix/task/TaskDriver.java
@@ -202,8 +202,10 @@ public class TaskDriver {
     flow.validate();
 
     // first, add workflow config.
-    _admin.setConfig(TaskUtil.getResourceConfigScope(_clusterName, flow.getName()),
-        flow.getWorkflowConfig().getResourceConfigMap());
+    if (!TaskUtil.setResouceConfig(_accessor, flow.getName(),
+        new WorkflowConfig(flow.getWorkflowConfig(), flow.getName()))) {
+      LOG.error("Failed to add workflow configuration for workflow " + flow.getName());
+    }
 
     // then add all job configs.
     for (String job : flow.getJobConfigs().keySet()) {
@@ -255,8 +257,9 @@ public class TaskDriver {
           "Workflow " + workflow + " is terminable, not allow to change its configuration!");
     }
 
-    _admin.setConfig(TaskUtil.getResourceConfigScope(_clusterName, workflow),
-        newWorkflowConfig.getResourceConfigMap());
+    if (!TaskUtil.setResouceConfig(_accessor, workflow, newWorkflowConfig)) {
+      LOG.error("Failed to update workflow configuration for workflow " + workflow);
+    }
 
     TaskUtil.invokeRebalance(_accessor, workflow);
   }
@@ -672,17 +675,8 @@ public class TaskDriver {
     LOG.info("Add job configuration " + jobName);
 
     // Set the job configuration
-    PropertyKey.Builder keyBuilder = _accessor.keyBuilder();
-    ResourceConfig resourceConfig = new ResourceConfig(jobName);
-    resourceConfig.putSimpleConfigs(jobConfig.getResourceConfigMap());
-    Map<String, TaskConfig> taskConfigMap = jobConfig.getTaskConfigMap();
-    if (taskConfigMap != null) {
-      for (TaskConfig taskConfig : taskConfigMap.values()) {
-        resourceConfig.getRecord().setMapField(taskConfig.getId(), taskConfig.getConfigMap());
-      }
-    }
-
-    if (!_accessor.setProperty(keyBuilder.resourceConfig(jobName), resourceConfig)) {
+    JobConfig newJobCfg = new JobConfig(jobName, jobConfig);
+    if (!TaskUtil.setResouceConfig(_accessor, jobName, newJobCfg)) {
       LOG.error("Failed to add job configuration for job " + jobName);
     }
   }

http://git-wip-us.apache.org/repos/asf/helix/blob/1eb40c3f/helix-core/src/main/java/org/apache/helix/task/TaskUtil.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/TaskUtil.java b/helix-core/src/main/java/org/apache/helix/task/TaskUtil.java
index 0d30f54..f29f2d0 100644
--- a/helix-core/src/main/java/org/apache/helix/task/TaskUtil.java
+++ b/helix-core/src/main/java/org/apache/helix/task/TaskUtil.java
@@ -34,6 +34,7 @@ import org.apache.helix.PropertyKey;
 import org.apache.helix.ZNRecord;
 import org.apache.helix.model.HelixConfigScope;
 import org.apache.helix.model.IdealState;
+import org.apache.helix.model.ResourceConfig;
 import org.apache.helix.model.builder.HelixConfigScopeBuilder;
 import org.apache.helix.store.HelixPropertyStore;
 import org.apache.log4j.Logger;
@@ -65,17 +66,7 @@ public class TaskUtil {
     if (jobResourceConfig == null) {
       return null;
     }
-    JobConfig.Builder b =
-        JobConfig.Builder.fromMap(jobResourceConfig.getRecord().getSimpleFields());
-    Map<String, Map<String, String>> rawTaskConfigMap =
-        jobResourceConfig.getRecord().getMapFields();
-    Map<String, TaskConfig> taskConfigMap = Maps.newHashMap();
-    for (Map<String, String> rawTaskConfig : rawTaskConfigMap.values()) {
-      TaskConfig taskConfig = TaskConfig.Builder.from(rawTaskConfig);
-      taskConfigMap.put(taskConfig.getId(), taskConfig);
-    }
-    b.addTaskConfigMap(taskConfigMap);
-    return b.build();
+    return new JobConfig(jobResourceConfig);
   }
 
   /**
@@ -106,10 +97,7 @@ public class TaskUtil {
       return null;
     }
 
-    WorkflowConfig.Builder b =
-        WorkflowConfig.Builder.fromMap(workflowCfg.getRecord().getSimpleFields());
-
-    return b.build();
+   return new WorkflowConfig(workflowCfg);
   }
 
   /**
@@ -126,6 +114,19 @@ public class TaskUtil {
   }
 
   /**
+   * Set the resource config
+   * @param accessor        Accessor to Helix configs
+   * @param resource        The resource name
+   * @param resourceConfig  The resource config to be set
+   * @return                True if set successfully, otherwise false
+   */
+  protected static boolean setResouceConfig(HelixDataAccessor accessor, String resource,
+      ResourceConfig resourceConfig) {
+    PropertyKey.Builder keyBuilder = accessor.keyBuilder();
+    return accessor.setProperty(keyBuilder.resourceConfig(resource), resourceConfig);
+  }
+
+  /**
    * Get a Helix configuration scope at a resource (i.e. job and workflow) level
    *
    * @param clusterName the cluster containing the resource

http://git-wip-us.apache.org/repos/asf/helix/blob/1eb40c3f/helix-core/src/main/java/org/apache/helix/task/WorkflowConfig.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/WorkflowConfig.java b/helix-core/src/main/java/org/apache/helix/task/WorkflowConfig.java
index 17259d2..9f71e35 100644
--- a/helix-core/src/main/java/org/apache/helix/task/WorkflowConfig.java
+++ b/helix-core/src/main/java/org/apache/helix/task/WorkflowConfig.java
@@ -18,24 +18,25 @@ package org.apache.helix.task;
  * specific language governing permissions and limitations
  * under the License.
  */
+
 import java.io.IOException;
 import java.text.ParseException;
 import java.text.SimpleDateFormat;
 import java.util.Date;
-import java.util.HashMap;
 import java.util.Map;
 import java.util.TimeZone;
 import java.util.concurrent.TimeUnit;
+
 import org.apache.helix.HelixException;
-import org.apache.helix.task.beans.WorkflowBean;
 import org.apache.helix.HelixProperty;
+import org.apache.helix.model.ResourceConfig;
+import org.apache.helix.task.beans.WorkflowBean;
 import org.apache.log4j.Logger;
 
 /**
  * Provides a typed interface to workflow level configurations. Validates the configurations.
  */
-// TODO: extends WorkflowConfig from ResourceConfig
-public class  WorkflowConfig {
+public class  WorkflowConfig extends ResourceConfig {
   private static final Logger LOG = Logger.getLogger(WorkflowConfig.class);
 
   /**
@@ -46,6 +47,7 @@ public class  WorkflowConfig {
    * but it will be change to protected in future major release.
    */
   public enum WorkflowConfigProperty {
+    WorkflowID,
     Dag,
     ParallelJobs,
     TargetState,
@@ -58,63 +60,113 @@ public class  WorkflowConfig {
     /* this is for non-terminable workflow. */
     capacity,
     WorkflowType,
+    JobTypes,
     IsJobQueue
   }
 
   /* Default values */
   public static final long DEFAULT_EXPIRY = 24 * 60 * 60 * 1000;
   public static final int DEFAULT_FAILURE_THRESHOLD = 0;
+  public static final int DEFAULT_PARALLEL_JOBS = 1;
+  public static final int DEFAULT_CAPACITY = Integer.MAX_VALUE;
+  public static final JobDag DEFAULT_JOB_DAG = JobDag.EMPTY_DAG;
+  public static final TargetState DEFAULT_TARGET_STATE = TargetState.START;
+  public static final boolean DEFAULT_TERMINABLE = true;
+  public static final boolean DEFAULT_JOB_QUEUE = false;
+  public static final boolean DEFAULT_MONITOR_DISABLE = true;
+
+  public WorkflowConfig(HelixProperty property) {
+    super(property.getRecord());
+  }
+
+  public WorkflowConfig(WorkflowConfig cfg, String workflowId) {
+    this(workflowId, cfg.getJobDag(), cfg.getParallelJobs(), cfg.getTargetState(), cfg.getExpiry(),
+        cfg.getFailureThreshold(), cfg.isTerminable(), cfg.getScheduleConfig(), cfg.getCapacity(),
+        cfg.getWorkflowType(), cfg.isJobQueue(), cfg.getJobTypes());
+  }
 
   /* Member variables */
   // TODO: jobDag should not be in the workflowConfig.
-  private final JobDag _jobDag;
-
-  // _parallelJobs would kind of break the job dependency,
-  // e.g: if job1 -> job2, but _parallelJobs == 2,
-  // then job1 and job2 could be scheduled at the same time
-  private final int _parallelJobs;
-  private final TargetState _targetState;
-  private final long _expiry;
-  private final boolean _terminable;
-  private final ScheduleConfig _scheduleConfig;
-  private final int _failureThreshold;
-  private final int _capacity;
-  private final String _workflowType;
-  private final boolean _isJobQueue;
-
-  protected WorkflowConfig(JobDag jobDag, int parallelJobs, TargetState targetState, long expiry,
-      int failureThreshold, boolean terminable, ScheduleConfig scheduleConfig, int capacity,
-      String workflowType, boolean isJobQueue) {
-    _jobDag = jobDag;
-    _parallelJobs = parallelJobs;
-    _targetState = targetState;
-    _expiry = expiry;
-    _failureThreshold = failureThreshold;
-    _terminable = terminable;
-    _scheduleConfig = scheduleConfig;
-    _capacity = capacity;
-    _workflowType = workflowType;
-    _isJobQueue = isJobQueue;
+
+  protected WorkflowConfig(String workflowId, JobDag jobDag, int parallelJobs,
+      TargetState targetState, long expiry, int failureThreshold, boolean terminable,
+      ScheduleConfig scheduleConfig, int capacity, String workflowType, boolean isJobQueue,
+      Map<String, String> jobTypes) {
+    super(workflowId);
+
+    putSimpleConfig(WorkflowConfigProperty.WorkflowID.name(), workflowId);
+    try {
+      putSimpleConfig(WorkflowConfigProperty.Dag.name(), jobDag.toJson());
+    } catch (IOException ex) {
+      throw new HelixException("Invalid job dag configuration!", ex);
+    }
+    putSimpleConfig(WorkflowConfigProperty.ParallelJobs.name(), String.valueOf(parallelJobs));
+    putSimpleConfig(WorkflowConfigProperty.Expiry.name(), String.valueOf(expiry));
+    putSimpleConfig(WorkflowConfigProperty.TargetState.name(), targetState.name());
+    putSimpleConfig(WorkflowConfigProperty.Terminable.name(), String.valueOf(terminable));
+    putSimpleConfig(WorkflowConfigProperty.IsJobQueue.name(), String.valueOf(isJobQueue));
+    putSimpleConfig(WorkflowConfigProperty.FailureThreshold.name(),
+        String.valueOf(failureThreshold));
+
+    if (capacity > 0) {
+      putSimpleConfig(WorkflowConfigProperty.capacity.name(), String.valueOf(capacity));
+    }
+
+    // Populate schedule if present
+    if (scheduleConfig != null) {
+      Date startTime = scheduleConfig.getStartTime();
+      if (startTime != null) {
+        String formattedTime = WorkflowConfig.getDefaultDateFormat().format(startTime);
+        putSimpleConfig(WorkflowConfigProperty.StartTime.name(), formattedTime);
+      }
+      if (scheduleConfig.isRecurring()) {
+        putSimpleConfig(WorkflowConfigProperty.RecurrenceUnit.name(),
+            scheduleConfig.getRecurrenceUnit().toString());
+        putSimpleConfig(WorkflowConfigProperty.RecurrenceInterval.name(),
+            scheduleConfig.getRecurrenceInterval().toString());
+      }
+    }
+    if (workflowType != null) {
+      putSimpleConfig(WorkflowConfigProperty.WorkflowType.name(), workflowType);
+    }
+
+    if (jobTypes != null) {
+      putMapConfig(WorkflowConfigProperty.JobTypes.name(), jobTypes);
+    }
+    putSimpleConfig(ResourceConfigProperty.MONITORING_DISABLED.toString(),
+        String.valueOf(DEFAULT_MONITOR_DISABLE));
+  }
+
+  public String getWorkflowId() {
+    return getSimpleConfig(WorkflowConfigProperty.WorkflowID.name());
   }
 
   public JobDag getJobDag() {
-    return _jobDag;
+    return simpleConfigContains(WorkflowConfigProperty.Dag.name()) ? JobDag
+        .fromJson(getSimpleConfig(WorkflowConfigProperty.Dag.name())) : DEFAULT_JOB_DAG;
   }
 
   public int getParallelJobs() {
-    return _parallelJobs;
+    return _record
+        .getIntField(WorkflowConfigProperty.ParallelJobs.name(), DEFAULT_PARALLEL_JOBS);
   }
 
   public TargetState getTargetState() {
-    return _targetState;
+    return simpleConfigContains(WorkflowConfigProperty.TargetState.name()) ? TargetState
+        .valueOf(getSimpleConfig(WorkflowConfigProperty.TargetState.name())) : DEFAULT_TARGET_STATE;
   }
 
   public long getExpiry() {
-    return _expiry;
+    return _record.getLongField(WorkflowConfigProperty.Expiry.name(), DEFAULT_EXPIRY);
   }
 
+  /**
+   * This Failure threshold only works for generic workflow. Will be ignored by JobQueue
+   * @return
+   */
   public int getFailureThreshold() {
-    return _failureThreshold;
+    return _record
+        .getIntField(WorkflowConfigProperty.FailureThreshold.name(), DEFAULT_FAILURE_THRESHOLD);
   }
 
   /**
@@ -122,26 +174,40 @@ public class  WorkflowConfig {
    * this field is only used when a workflow is not terminable.
    * @return queue capacity
    */
-  public int getCapacity() { return _capacity; }
+  public int getCapacity() {
+    return _record.getIntField(WorkflowConfigProperty.capacity.name(), DEFAULT_CAPACITY);
+  }
 
   public String getWorkflowType() {
-    return _workflowType;
+    return simpleConfigContains(WorkflowConfigProperty.WorkflowType.name()) ? getSimpleConfig(
+        WorkflowConfigProperty.WorkflowType.name()) : null;
   }
 
   public boolean isTerminable() {
-    return _terminable;
+    return _record.getBooleanField(WorkflowConfigProperty.Terminable.name(), DEFAULT_TERMINABLE);
   }
 
   public ScheduleConfig getScheduleConfig() {
-    return _scheduleConfig;
+    return parseScheduleFromConfigMap(getSimpleConfigs());
   }
 
   public boolean isRecurring() {
-    return _scheduleConfig != null && _scheduleConfig.isRecurring();
+    return simpleConfigContains(WorkflowConfigProperty.StartTime.name()) && simpleConfigContains(
+        WorkflowConfigProperty.RecurrenceInterval.name()) && simpleConfigContains(
+        WorkflowConfigProperty.RecurrenceUnit.name());
   }
 
   public boolean isJobQueue() {
-    return _isJobQueue;
+    return _record.getBooleanField(WorkflowConfigProperty.IsJobQueue.name(), DEFAULT_JOB_QUEUE);
+  }
+
+  protected void setJobTypes(Map<String, String> jobTypes) {
+    putMapConfig(WorkflowConfigProperty.JobTypes.name(), jobTypes);
+  }
+
+  public Map<String, String> getJobTypes() {
+    return mapConfigContains(WorkflowConfigProperty.JobTypes.name()) ? getMapConfig(
+        WorkflowConfigProperty.JobTypes.name()) : null;
   }
 
   public static SimpleDateFormat getDefaultDateFormat() {
@@ -159,51 +225,19 @@ public class  WorkflowConfig {
    */
   public Date getStartTime() {
     // Workflow with non-scheduled config is ready to schedule immediately.
-    if (_scheduleConfig == null) {
-      return null;
+    try {
+      return simpleConfigContains(WorkflowConfigProperty.StartTime.name())
+          ? WorkflowConfig.getDefaultDateFormat()
+          .parse(getSimpleConfig(WorkflowConfigProperty.StartTime.name()))
+          : null;
+    } catch (ParseException e) {
+      LOG.error("Unparseable date " + getSimpleConfig(WorkflowConfigProperty.StartTime.name()), e);
     }
-
-    return _scheduleConfig.getStartTime();
+    return null;
   }
 
   public Map<String, String> getResourceConfigMap() {
-    Map<String, String> cfgMap = new HashMap<String, String>();
-    try {
-      cfgMap.put(WorkflowConfigProperty.Dag.name(), getJobDag().toJson());
-    } catch (IOException ex) {
-      throw new HelixException("Invalid job dag configuration!", ex);
-    }
-    cfgMap.put(WorkflowConfigProperty.ParallelJobs.name(), String.valueOf(getParallelJobs()));
-    cfgMap.put(WorkflowConfigProperty.Expiry.name(), String.valueOf(getExpiry()));
-    cfgMap.put(WorkflowConfigProperty.TargetState.name(), getTargetState().name());
-    cfgMap.put(WorkflowConfigProperty.Terminable.name(), String.valueOf(isTerminable()));
-    cfgMap.put(WorkflowConfigProperty.IsJobQueue.name(), String.valueOf(isJobQueue()));
-    cfgMap.put(WorkflowConfigProperty.FailureThreshold.name(),
-        String.valueOf(getFailureThreshold()));
-
-    if (_capacity > 0) {
-      cfgMap.put(WorkflowConfigProperty.capacity.name(), String.valueOf(_capacity));
-    }
-
-    // Populate schedule if present
-    ScheduleConfig scheduleConfig = getScheduleConfig();
-    if (scheduleConfig != null) {
-      Date startTime = scheduleConfig.getStartTime();
-      if (startTime != null) {
-        String formattedTime = WorkflowConfig.getDefaultDateFormat().format(startTime);
-        cfgMap.put(WorkflowConfigProperty.StartTime.name(), formattedTime);
-      }
-      if (scheduleConfig.isRecurring()) {
-        cfgMap.put(WorkflowConfigProperty.RecurrenceUnit.name(),
-            scheduleConfig.getRecurrenceUnit().toString());
-        cfgMap.put(WorkflowConfigProperty.RecurrenceInterval.name(),
-            scheduleConfig.getRecurrenceInterval().toString());
-      }
-    }
-    if (_workflowType != null) {
-      cfgMap.put(WorkflowConfigProperty.WorkflowType.name(), _workflowType);
-    }
-    return cfgMap;
+    return getSimpleConfigs();
   }
 
   /**
@@ -249,27 +283,31 @@ public class  WorkflowConfig {
   }
 
   public static class Builder {
-    private JobDag _taskDag = JobDag.EMPTY_DAG;
-    private int _parallelJobs = 1;
-    private TargetState _targetState = TargetState.START;
+    private String _workflowId = "";
+    private JobDag _taskDag = DEFAULT_JOB_DAG;
+    private int _parallelJobs = DEFAULT_PARALLEL_JOBS;
+    private TargetState _targetState = DEFAULT_TARGET_STATE;
     private long _expiry = DEFAULT_EXPIRY;
     private int _failureThreshold = DEFAULT_FAILURE_THRESHOLD;
-    private boolean _isTerminable = true;
-    private int _capacity = Integer.MAX_VALUE;
+    private boolean _isTerminable = DEFAULT_TERMINABLE;
+    private int _capacity = DEFAULT_CAPACITY;
     private ScheduleConfig _scheduleConfig;
     private String _workflowType;
-    private boolean _isJobQueue = false;
+    private boolean _isJobQueue = DEFAULT_JOB_QUEUE;
+    private Map<String, String> _jobTypes;
 
     public WorkflowConfig build() {
       validate();
 
-      return new WorkflowConfig(_taskDag, _parallelJobs, _targetState, _expiry, _failureThreshold,
-          _isTerminable, _scheduleConfig, _capacity, _workflowType, _isJobQueue);
+      return new WorkflowConfig(_workflowId, _taskDag, _parallelJobs, _targetState, _expiry,
+          _failureThreshold, _isTerminable, _scheduleConfig, _capacity, _workflowType, _isJobQueue,
+          _jobTypes);
     }
 
     public Builder() {}
 
     public Builder(WorkflowConfig workflowConfig) {
+      _workflowId = workflowConfig.getWorkflowId();
       _taskDag = workflowConfig.getJobDag();
       _parallelJobs = workflowConfig.getParallelJobs();
       _targetState = workflowConfig.getTargetState();
@@ -280,6 +318,12 @@ public class  WorkflowConfig {
       _failureThreshold = workflowConfig.getFailureThreshold();
       _workflowType = workflowConfig.getWorkflowType();
       _isJobQueue = workflowConfig.isJobQueue();
+      _jobTypes = workflowConfig.getJobTypes();
+    }
+
+    public Builder setWorkflowId(String v) {
+      _workflowId = v;
+      return this;
     }
 
     protected Builder setJobDag(JobDag v) {
@@ -352,7 +396,7 @@ public class  WorkflowConfig {
       builder.setConfigMap(cfg);
       return builder;
     }
-
+    // TODO: Add API to set map fields. This API only set simple fields
     public Builder setConfigMap(Map<String, String> cfg) {
       if (cfg.containsKey(WorkflowConfigProperty.Expiry.name())) {
         setExpiry(Long.parseLong(cfg.get(WorkflowConfigProperty.Expiry.name())));

http://git-wip-us.apache.org/repos/asf/helix/blob/1eb40c3f/helix-core/src/main/java/org/apache/helix/task/beans/JobBean.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/beans/JobBean.java b/helix-core/src/main/java/org/apache/helix/task/beans/JobBean.java
index 9a376f8..b781a54 100644
--- a/helix-core/src/main/java/org/apache/helix/task/beans/JobBean.java
+++ b/helix-core/src/main/java/org/apache/helix/task/beans/JobBean.java
@@ -43,6 +43,8 @@ public class JobBean {
   public int maxAttemptsPerTask = JobConfig.DEFAULT_MAX_ATTEMPTS_PER_TASK;
   public int failureThreshold = JobConfig.DEFAULT_FAILURE_THRESHOLD;
   public long taskRetryDelay = JobConfig.DEFAULT_TASK_RETRY_DELAY;
+  public long executionDelay = JobConfig.DEFAULT_Job_EXECUTION_DELAY_TIME;
+  public long executionStart = JobConfig.DEFAULT_JOB_EXECUTION_START_TIME;
   public boolean disableExternalView = JobConfig.DEFAULT_DISABLE_EXTERNALVIEW;
   public boolean ignoreDependentJobFailure = JobConfig.DEFAULT_IGNORE_DEPENDENT_JOB_FAILURE;
   public int numberOfTasks = JobConfig.DEFAULT_NUMBER_OF_TASKS;

http://git-wip-us.apache.org/repos/asf/helix/blob/1eb40c3f/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskRebalancerParallel.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskRebalancerParallel.java b/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskRebalancerParallel.java
index dbc4154..52a0d5c 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskRebalancerParallel.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskRebalancerParallel.java
@@ -59,9 +59,11 @@ public class TestTaskRebalancerParallel extends TaskTestBase {
               .setTargetPartitionStates(Collections.singleton("SLAVE")));
     }
 
+    _driver.stop(queueName);
     for (int i = 0; i < jobConfigBuilders.size(); ++i) {
       _driver.enqueueJob(queueName, "job_" + (i + 1), jobConfigBuilders.get(i));
     }
+    _driver.resume(queueName);
 
     Assert.assertTrue(TaskTestUtil.pollForWorkflowParallelState(_driver, queueName));
   }

http://git-wip-us.apache.org/repos/asf/helix/blob/1eb40c3f/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskRetryDelay.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskRetryDelay.java b/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskRetryDelay.java
index 47624e4..9c91457 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskRetryDelay.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskRetryDelay.java
@@ -41,7 +41,7 @@ public class TestTaskRetryDelay extends TaskTestBase {
     String jobResource = TestHelper.getTestMethodName();
     JobConfig.Builder jobBuilder = JobConfig.Builder.fromMap(WorkflowGenerator.DEFAULT_JOB_CONFIG);
     jobBuilder.setJobCommandConfigMap(WorkflowGenerator.DEFAULT_COMMAND_CONFIG)
-        .setMaxAttemptsPerTask(2).setCommand(MockTask.TASK_COMMAND)
+        .setMaxAttemptsPerTask(2).setCommand(MockTask.TASK_COMMAND).setWorkflow(jobResource)
         .setFailureThreshold(Integer.MAX_VALUE).setTaskRetryDelay(2000L)
         .setJobCommandConfigMap(ImmutableMap.of(MockTask.FAILURE_COUNT_BEFORE_SUCCESS, "2"));
     Workflow flow =

http://git-wip-us.apache.org/repos/asf/helix/blob/1eb40c3f/helix-core/src/test/java/org/apache/helix/integration/task/TestUserContentStore.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/task/TestUserContentStore.java b/helix-core/src/test/java/org/apache/helix/integration/task/TestUserContentStore.java
index 13cd531..eb90a34 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/task/TestUserContentStore.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/task/TestUserContentStore.java
@@ -129,7 +129,7 @@ public class TestUserContentStore extends TaskTestBase {
     jobCommandMap.put("Timeout", "1000");
 
     JobConfig.Builder jobBuilder = new JobConfig.Builder().setCommand("DummyCommand")
-        .addTaskConfigs(taskConfigs)
+        .addTaskConfigs(taskConfigs).setWorkflow(jobName)
         .setJobCommandConfigMap(jobCommandMap);
     workflowBuilder.addJob(jobName, jobBuilder);
 
@@ -158,10 +158,10 @@ public class TestUserContentStore extends TaskTestBase {
 
     JobConfig.Builder jobBuilder1 =
         new JobConfig.Builder().setCommand("DummyCommand").addTaskConfigs(taskConfigs1)
-            .setJobCommandConfigMap(jobCommandMap);
+            .setJobCommandConfigMap(jobCommandMap).setWorkflow(queueName);
     JobConfig.Builder jobBuilder2 =
         new JobConfig.Builder().setCommand("DummyCommand").addTaskConfigs(taskConfigs2)
-            .setJobCommandConfigMap(jobCommandMap);
+            .setJobCommandConfigMap(jobCommandMap).setWorkflow(queueName);
 
     queueBuilder.enqueueJob(queueName + 0, jobBuilder1);
     queueBuilder.enqueueJob(queueName + 1, jobBuilder2);


[4/9] helix git commit: [HELIX-644] Add accessor method for getting zkSerializer from ZkClient

Posted by lx...@apache.org.
[HELIX-644] Add accessor method for getting zkSerializer from ZkClient


Project: http://git-wip-us.apache.org/repos/asf/helix/repo
Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/44209b84
Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/44209b84
Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/44209b84

Branch: refs/heads/helix-0.6.x
Commit: 44209b846b5ba17d04799508947c391526991152
Parents: 488ce4c
Author: Junkai Xue <jx...@linkedin.com>
Authored: Thu Dec 15 16:48:38 2016 -0800
Committer: Junkai Xue <jx...@linkedin.com>
Committed: Thu Dec 15 16:48:38 2016 -0800

----------------------------------------------------------------------
 .../src/main/java/org/apache/helix/manager/zk/ZkClient.java   | 7 ++++---
 1 file changed, 4 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/helix/blob/44209b84/helix-core/src/main/java/org/apache/helix/manager/zk/ZkClient.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/manager/zk/ZkClient.java b/helix-core/src/main/java/org/apache/helix/manager/zk/ZkClient.java
index 48feacc..47d31d1 100644
--- a/helix-core/src/main/java/org/apache/helix/manager/zk/ZkClient.java
+++ b/helix-core/src/main/java/org/apache/helix/manager/zk/ZkClient.java
@@ -100,9 +100,6 @@ public class ZkClient extends org.I0Itec.zkclient.ZkClient {
     this(new ZkConnection(zkServers), Integer.MAX_VALUE, new SerializableSerializer());
   }
 
-  {
-  }
-
   @Override
   public void setZkSerializer(ZkSerializer zkSerializer) {
     _zkSerializer = new BasicZkSerializer(zkSerializer);
@@ -112,6 +109,10 @@ public class ZkClient extends org.I0Itec.zkclient.ZkClient {
     _zkSerializer = zkSerializer;
   }
 
+  public PathBasedZkSerializer getZkSerializer() {
+    return _zkSerializer;
+  }
+
   public IZkConnection getConnection() {
     return _connection;
   }


[8/9] helix git commit: [HELIX-649] Fix StateModelDef name is not consistent

Posted by lx...@apache.org.
[HELIX-649] Fix StateModelDef name is not consistent

The StateModelDef is not consistent as the user provided since it use the ZNRecord id of StateModel


Project: http://git-wip-us.apache.org/repos/asf/helix/repo
Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/4c3fc7f6
Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/4c3fc7f6
Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/4c3fc7f6

Branch: refs/heads/helix-0.6.x
Commit: 4c3fc7f6613e04ba85b88f5acbd6a16044331847
Parents: 1eb40c3
Author: Junkai Xue <jx...@linkedin.com>
Authored: Fri Dec 16 15:26:22 2016 -0800
Committer: Junkai Xue <jx...@linkedin.com>
Committed: Fri Dec 16 15:26:22 2016 -0800

----------------------------------------------------------------------
 .../src/main/java/org/apache/helix/manager/zk/ZKHelixAdmin.java    | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/helix/blob/4c3fc7f6/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixAdmin.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixAdmin.java b/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixAdmin.java
index 73f2cbb..c7fa2ae 100644
--- a/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixAdmin.java
+++ b/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixAdmin.java
@@ -763,7 +763,7 @@ public class ZKHelixAdmin implements HelixAdmin {
     HelixDataAccessor accessor =
         new ZKHelixDataAccessor(clusterName, new ZkBaseDataAccessor<ZNRecord>(_zkClient));
     Builder keyBuilder = accessor.keyBuilder();
-    accessor.setProperty(keyBuilder.stateModelDef(stateModel.getId()), stateModel);
+    accessor.setProperty(keyBuilder.stateModelDef(stateModelDef), stateModel);
   }
 
   @Override


[3/9] helix git commit: [HELIX-643] Make instance variables in DistClusterControllerStateModel as protected fields to make them visiable to its subclass.

Posted by lx...@apache.org.
[HELIX-643] Make instance variables in DistClusterControllerStateModel as protected fields to make them visiable to its subclass.


Project: http://git-wip-us.apache.org/repos/asf/helix/repo
Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/488ce4c3
Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/488ce4c3
Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/488ce4c3

Branch: refs/heads/helix-0.6.x
Commit: 488ce4c34a60505bf7d70df860f0df9dd5fa9642
Parents: 1db2438
Author: Junkai Xue <jx...@linkedin.com>
Authored: Thu Dec 15 16:47:27 2016 -0800
Committer: Junkai Xue <jx...@linkedin.com>
Committed: Thu Dec 15 16:47:27 2016 -0800

----------------------------------------------------------------------
 .../helix/participant/DistClusterControllerStateModel.java       | 4 ++--
 1 file changed, 2 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/helix/blob/488ce4c3/helix-core/src/main/java/org/apache/helix/participant/DistClusterControllerStateModel.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/participant/DistClusterControllerStateModel.java b/helix-core/src/main/java/org/apache/helix/participant/DistClusterControllerStateModel.java
index 8c88b7c..d1faaa9 100644
--- a/helix-core/src/main/java/org/apache/helix/participant/DistClusterControllerStateModel.java
+++ b/helix-core/src/main/java/org/apache/helix/participant/DistClusterControllerStateModel.java
@@ -36,8 +36,8 @@ import org.apache.log4j.Logger;
 })
 public class DistClusterControllerStateModel extends StateModel {
   private static Logger logger = Logger.getLogger(DistClusterControllerStateModel.class);
-  private HelixManager _controller = null;
-  private final String _zkAddr;
+  protected HelixManager _controller = null;
+  protected final String _zkAddr;
 
   public DistClusterControllerStateModel(String zkAddr) {
     StateModelParser parser = new StateModelParser();


[2/9] helix git commit: [HELIX-642] Disable the participant instance once it disconnected due to unstable ZK

Posted by lx...@apache.org.
[HELIX-642] Disable the participant instance once it disconnected due to unstable ZK

When ZK is not stable, the instance connection will be disconnected. Before disconnect from ZK, disable the instance.


Project: http://git-wip-us.apache.org/repos/asf/helix/repo
Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/1db24388
Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/1db24388
Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/1db24388

Branch: refs/heads/helix-0.6.x
Commit: 1db243887d54feee787a2ddb66bce9e34fab9afe
Parents: a9267e5
Author: Junkai Xue <jx...@linkedin.com>
Authored: Thu Dec 15 16:46:11 2016 -0800
Committer: Junkai Xue <jx...@linkedin.com>
Committed: Thu Dec 15 16:46:11 2016 -0800

----------------------------------------------------------------------
 .../main/java/org/apache/helix/manager/zk/ZKHelixManager.java | 7 +++++++
 1 file changed, 7 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/helix/blob/1db24388/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixManager.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixManager.java b/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixManager.java
index 16b0882..6f4a874 100644
--- a/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixManager.java
+++ b/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixManager.java
@@ -826,6 +826,13 @@ public class ZKHelixManager implements HelixManager, IZkStateListener {
         LOG.error("instanceName: " + _instanceName + " is flapping. disconnect it. "
             + " maxDisconnectThreshold: " + _maxDisconnectThreshold + " disconnects in "
             + _flappingTimeWindowMs + "ms.");
+
+        // Only disable the instance when it's instance type is PARTICIPANT
+        if (_instanceType.equals(InstanceType.PARTICIPANT)) {
+          LOG.warn("instanceName: " + _instanceName
+              + " is flapping. Since it is a participant, disable it.");
+          getClusterManagmentTool().enableInstance(_clusterName, _instanceName, false);
+        }
         disconnect();
       }
       break;