You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by ka...@apache.org on 2015/07/29 06:28:16 UTC

helix git commit: [Helix-606] Add an option in IdealState to allow a resource to disable showing external view.

Repository: helix
Updated Branches:
  refs/heads/helix-0.6.x 8006547ba -> b72ff29d1


[Helix-606] Add an option in IdealState to allow a resource to disable showing external view.

Add an option in IdealState to allow a resource to choose not showing external view. This will add flexibility for some resources that the client or application does not care about its external view (such as scheduled jobs), and reduces the ZK traffic when there are a large number of external view listeners.


Project: http://git-wip-us.apache.org/repos/asf/helix/repo
Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/b72ff29d
Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/b72ff29d
Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/b72ff29d

Branch: refs/heads/helix-0.6.x
Commit: b72ff29d1fc2845affb9ee943396424c5a7e5721
Parents: 8006547
Author: Lei Xia <lx...@linkedin.com>
Authored: Mon Jul 13 18:03:33 2015 -0700
Committer: Lei Xia <lx...@linkedin.com>
Committed: Tue Jul 28 21:20:56 2015 -0700

----------------------------------------------------------------------
 .../stages/ExternalViewComputeStage.java        |  34 +++-
 .../java/org/apache/helix/model/IdealState.java |  21 ++-
 .../helix/model/builder/IdealStateBuilder.java  |  16 ++
 .../java/org/apache/helix/task/JobConfig.java   |  24 ++-
 .../java/org/apache/helix/task/TaskDriver.java  |   5 +
 .../helix/tools/ClusterStateVerifier.java       |   9 +-
 .../java/org/apache/helix/ZkUnitTestBase.java   |   1 +
 .../integration/TestDisableExternalView.java    | 166 +++++++++++++++++++
 .../integration/task/TestRecurringJobQueue.java |  67 +++++++-
 .../apache/helix/integration/task/TestUtil.java |  17 +-
 10 files changed, 340 insertions(+), 20 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/helix/blob/b72ff29d/helix-core/src/main/java/org/apache/helix/controller/stages/ExternalViewComputeStage.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/ExternalViewComputeStage.java b/helix-core/src/main/java/org/apache/helix/controller/stages/ExternalViewComputeStage.java
index d0bdbd9..1455cd5 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/ExternalViewComputeStage.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/ExternalViewComputeStage.java
@@ -21,6 +21,7 @@ package org.apache.helix.controller.stages;
 
 import java.util.ArrayList;
 import java.util.HashMap;
+import java.util.Iterator;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
@@ -72,7 +73,6 @@ public class ExternalViewComputeStage extends AbstractBaseStage {
         event.getAttribute(AttributeName.CURRENT_STATE.toString());
 
     List<ExternalView> newExtViews = new ArrayList<ExternalView>();
-    List<PropertyKey> keys = new ArrayList<PropertyKey>();
 
     Map<String, ExternalView> curExtViews =
         dataAccessor.getChildValuesMap(keyBuilder.externalViews());
@@ -106,7 +106,7 @@ public class ExternalViewComputeStage extends AbstractBaseStage {
       // Update cluster status monitor mbean
       ClusterStatusMonitor clusterStatusMonitor =
           (ClusterStatusMonitor) event.getAttribute("clusterStatusMonitor");
-      IdealState idealState = cache._idealStateMap.get(view.getResourceName());
+      IdealState idealState = cache._idealStateMap.get(resourceName);
       if (idealState != null) {
         if (clusterStatusMonitor != null
             && !idealState.getStateModelDefRef().equalsIgnoreCase(
@@ -131,25 +131,41 @@ public class ExternalViewComputeStage extends AbstractBaseStage {
 
       // compare the new external view with current one, set only on different
       if (curExtView == null || !curExtView.getRecord().equals(view.getRecord())) {
-        keys.add(keyBuilder.externalView(resourceName));
+        // Add external view to the list which will be written to ZK later.
         newExtViews.add(view);
 
         // For SCHEDULER_TASK_RESOURCE resource group (helix task queue), we need to find out which
-        // task
-        // partitions are finished (COMPLETED or ERROR), update the status update of the original
-        // scheduler
-        // message, and then remove the partitions from the ideal state
+        // task partitions are finished (COMPLETED or ERROR), update the status update of the original
+        // scheduler message, and then remove the partitions from the ideal state
         if (idealState != null
             && idealState.getStateModelDefRef().equalsIgnoreCase(
-                DefaultSchedulerMessageHandlerFactory.SCHEDULER_TASK_QUEUE)) {
+            DefaultSchedulerMessageHandlerFactory.SCHEDULER_TASK_QUEUE)) {
           updateScheduledTaskStatus(view, manager, idealState);
         }
-
       }
     }
     // TODO: consider not setting the externalview of SCHEDULER_TASK_QUEUE at all.
     // Are there any entity that will be interested in its change?
 
+    // For the resource with DisableExternalView option turned on in IdealState
+    // We will not actually create or write the externalView to ZooKeeper.
+    List<PropertyKey> keys = new ArrayList<PropertyKey>();
+    for(Iterator<ExternalView> it = newExtViews.iterator(); it.hasNext(); ) {
+      ExternalView view = it.next();
+      String resourceName = view.getResourceName();
+      IdealState idealState = cache._idealStateMap.get(resourceName);
+      if (idealState != null && idealState.isExternalViewDisabled()) {
+        it.remove();
+        // remove the external view if the external view exists
+        if (curExtViews.containsKey(resourceName)) {
+          LOG.info("Remove externalView for resource: " + resourceName);
+          dataAccessor.removeProperty(keyBuilder.externalView(resourceName));
+        }
+      } else {
+        keys.add(keyBuilder.externalView(resourceName));
+      }
+    }
+
     // add/update external-views
     if (newExtViews.size() > 0) {
       dataAccessor.setChildren(keys, newExtViews);

http://git-wip-us.apache.org/repos/asf/helix/blob/b72ff29d/helix-core/src/main/java/org/apache/helix/model/IdealState.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/model/IdealState.java b/helix-core/src/main/java/org/apache/helix/model/IdealState.java
index d2744ac..696de7a 100644
--- a/helix-core/src/main/java/org/apache/helix/model/IdealState.java
+++ b/helix-core/src/main/java/org/apache/helix/model/IdealState.java
@@ -57,7 +57,8 @@ public class IdealState extends HelixProperty {
     REBALANCER_CLASS_NAME,
     HELIX_ENABLED,
     RESOURCE_GROUP_NAME,
-    GROUP_ROUTING_ENABLED
+    GROUP_ROUTING_ENABLED,
+    EXTERNAL_VIEW_DISABLED
   }
 
   public static final String QUERY_LIST = "PREFERENCE_LIST_QUERYS";
@@ -199,6 +200,24 @@ public class IdealState extends HelixProperty {
   }
 
   /**
+   * If the external view for this resource is disabled. by default, it is false.
+   *
+   * @return true if the external view should be disabled for this resource.
+   */
+  public boolean isExternalViewDisabled() {
+    return _record.getBooleanField(IdealStateProperty.EXTERNAL_VIEW_DISABLED.name(), false);
+  }
+
+  /**
+   * Disable (true) or enable (false) External View for this resource.
+   */
+  public void setDisableExternalView(boolean disableExternalView) {
+    _record
+        .setSimpleField(IdealStateProperty.EXTERNAL_VIEW_DISABLED.name(),
+            Boolean.toString(disableExternalView));
+  }
+
+  /**
    * Set the maximum number of partitions of this resource that an instance can serve
    * @param max the maximum number of partitions supported
    */

http://git-wip-us.apache.org/repos/asf/helix/blob/b72ff29d/helix-core/src/main/java/org/apache/helix/model/builder/IdealStateBuilder.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/model/builder/IdealStateBuilder.java b/helix-core/src/main/java/org/apache/helix/model/builder/IdealStateBuilder.java
index a7c0335..d3bc3f2 100644
--- a/helix-core/src/main/java/org/apache/helix/model/builder/IdealStateBuilder.java
+++ b/helix-core/src/main/java/org/apache/helix/model/builder/IdealStateBuilder.java
@@ -63,6 +63,11 @@ public abstract class IdealStateBuilder {
    */
   private String nodeGroup = "*";
 
+  /**
+   * Whether to disable external view for this resource
+   */
+  private Boolean disableExternalView = null;
+
   protected ZNRecord _record;
 
   /**
@@ -122,6 +127,14 @@ public abstract class IdealStateBuilder {
   }
 
   /**
+   * @param disableExternalView
+   */
+  public IdealStateBuilder setDisableExternalView(boolean disableExternalView) {
+    this.disableExternalView = disableExternalView;
+    return this;
+  }
+
+  /**
    * sub-class should implement this to set ideal-state mode
    * @return
    */
@@ -141,6 +154,9 @@ public abstract class IdealStateBuilder {
     idealstate.setStateModelFactoryName(stateModelFactoryName);
     idealstate.setRebalanceMode(rebalancerMode);
     idealstate.setReplicas("" + numReplica);
+    if (disableExternalView != null) {
+      idealstate.setDisableExternalView(disableExternalView);
+    }
 
     if (!idealstate.isValid()) {
       throw new HelixException("invalid ideal-state: " + idealstate);

http://git-wip-us.apache.org/repos/asf/helix/blob/b72ff29d/helix-core/src/main/java/org/apache/helix/task/JobConfig.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/JobConfig.java b/helix-core/src/main/java/org/apache/helix/task/JobConfig.java
index 780db55..30f76b7 100644
--- a/helix-core/src/main/java/org/apache/helix/task/JobConfig.java
+++ b/helix-core/src/main/java/org/apache/helix/task/JobConfig.java
@@ -73,6 +73,10 @@ public class JobConfig {
   /** The individual task configurations, if any **/
   public static final String TASK_CONFIGS = "TaskConfigs";
 
+  /** Disable external view (not showing) for this job resource */
+  public static final String DISABLE_EXTERNALVIEW = "DisableExternalView";
+
+
   // // Default property values ////
 
   public static final long DEFAULT_TIMEOUT_PER_TASK = 60 * 60 * 1000; // 1 hr.
@@ -81,6 +85,7 @@ public class JobConfig {
   public static final int DEFAULT_NUM_CONCURRENT_TASKS_PER_INSTANCE = 1;
   public static final int DEFAULT_FAILURE_THRESHOLD = 0;
   public static final int DEFAULT_MAX_FORCED_REASSIGNMENTS_PER_TASK = 0;
+  public static final boolean DEFAULT_DISABLE_EXTERNALVIEW = false;
 
   private final String _workflow;
   private final String _targetResource;
@@ -94,12 +99,14 @@ public class JobConfig {
   private final int _maxForcedReassignmentsPerTask;
   private final int _failureThreshold;
   private final long _retryDelay;
+  private final boolean _disableExternalView;
   private final Map<String, TaskConfig> _taskConfigMap;
 
   private JobConfig(String workflow, String targetResource, List<String> targetPartitions,
       Set<String> targetPartitionStates, String command, Map<String, String> jobCommandConfigMap,
       long timeoutPerTask, int numConcurrentTasksPerInstance, int maxAttemptsPerTask,
       int maxForcedReassignmentsPerTask, int failureThreshold, long retryDelay,
+      boolean disableExternalView,
       Map<String, TaskConfig> taskConfigMap) {
     _workflow = workflow;
     _targetResource = targetResource;
@@ -113,6 +120,7 @@ public class JobConfig {
     _maxForcedReassignmentsPerTask = maxForcedReassignmentsPerTask;
     _failureThreshold = failureThreshold;
     _retryDelay = retryDelay;
+    _disableExternalView = disableExternalView;
     if (taskConfigMap != null) {
       _taskConfigMap = taskConfigMap;
     } else {
@@ -168,6 +176,10 @@ public class JobConfig {
     return _retryDelay;
   }
 
+  public boolean isDisableExternalView() {
+    return _disableExternalView;
+  }
+
   public Map<String, TaskConfig> getTaskConfigMap() {
     return _taskConfigMap;
   }
@@ -204,6 +216,7 @@ public class JobConfig {
     cfgMap.put(JobConfig.MAX_ATTEMPTS_PER_TASK, "" + _maxAttemptsPerTask);
     cfgMap.put(JobConfig.MAX_FORCED_REASSIGNMENTS_PER_TASK, "" + _maxForcedReassignmentsPerTask);
     cfgMap.put(JobConfig.FAILURE_THRESHOLD, "" + _failureThreshold);
+    cfgMap.put(JobConfig.DISABLE_EXTERNALVIEW, Boolean.toString(_disableExternalView));
     return cfgMap;
   }
 
@@ -224,6 +237,7 @@ public class JobConfig {
     private int _maxForcedReassignmentsPerTask = DEFAULT_MAX_FORCED_REASSIGNMENTS_PER_TASK;
     private int _failureThreshold = DEFAULT_FAILURE_THRESHOLD;
     private long _retryDelay = DEFAULT_TASK_RETRY_DELAY;
+    private boolean _disableExternalView = DEFAULT_DISABLE_EXTERNALVIEW;
 
     public JobConfig build() {
       validate();
@@ -231,7 +245,7 @@ public class JobConfig {
       return new JobConfig(_workflow, _targetResource, _targetPartitions, _targetPartitionStates,
           _command, _commandConfig, _timeoutPerTask, _numConcurrentTasksPerInstance,
           _maxAttemptsPerTask, _maxForcedReassignmentsPerTask, _failureThreshold, _retryDelay,
-          _taskConfigMap);
+          _disableExternalView, _taskConfigMap);
     }
 
     /**
@@ -282,6 +296,9 @@ public class JobConfig {
       if (cfg.containsKey(TASK_RETRY_DELAY)) {
         b.setTaskRetryDelay(Long.parseLong(cfg.get(TASK_RETRY_DELAY)));
       }
+      if (cfg.containsKey(DISABLE_EXTERNALVIEW)) {
+        b.setDisableExternalView(Boolean.valueOf(cfg.get(DISABLE_EXTERNALVIEW)));
+      }
       return b;
     }
 
@@ -345,6 +362,11 @@ public class JobConfig {
       return this;
     }
 
+    public Builder setDisableExternalView(boolean disableExternalView) {
+      _disableExternalView = disableExternalView;
+      return this;
+    }
+
     public Builder addTaskConfigs(List<TaskConfig> taskConfigs) {
       if (taskConfigs != null) {
         for (TaskConfig taskConfig : taskConfigs) {

http://git-wip-us.apache.org/repos/asf/helix/blob/b72ff29d/helix-core/src/main/java/org/apache/helix/task/TaskDriver.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/TaskDriver.java b/helix-core/src/main/java/org/apache/helix/task/TaskDriver.java
index dcd13f2..b4b94f8 100644
--- a/helix-core/src/main/java/org/apache/helix/task/TaskDriver.java
+++ b/helix-core/src/main/java/org/apache/helix/task/TaskDriver.java
@@ -533,6 +533,11 @@ public class TaskDriver {
     builder.setNumReplica(1);
     builder.setNumPartitions(numPartitions);
     builder.setStateModel(TaskConstants.STATE_MODEL_NAME);
+
+    if (jobConfig.isDisableExternalView()) {
+      builder.setDisableExternalView(jobConfig.isDisableExternalView());
+    }
+
     IdealState is = builder.build();
     for (int i = 0; i < numPartitions; i++) {
       is.getRecord().setListField(jobResource + "_" + i, new ArrayList<String>());

http://git-wip-us.apache.org/repos/asf/helix/blob/b72ff29d/helix-core/src/main/java/org/apache/helix/tools/ClusterStateVerifier.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/tools/ClusterStateVerifier.java b/helix-core/src/main/java/org/apache/helix/tools/ClusterStateVerifier.java
index 473a4df..b5b864c 100644
--- a/helix-core/src/main/java/org/apache/helix/tools/ClusterStateVerifier.java
+++ b/helix-core/src/main/java/org/apache/helix/tools/ClusterStateVerifier.java
@@ -320,8 +320,13 @@ public class ClusterStateVerifier {
       for (String resourceName : idealStates.keySet()) {
         ExternalView extView = extViews.get(resourceName);
         if (extView == null) {
-          LOG.info("externalView for " + resourceName + " is not available");
-          return false;
+          IdealState is = idealStates.get(resourceName);
+          if (is.isExternalViewDisabled()) {
+            continue;
+          } else {
+            LOG.info("externalView for " + resourceName + " is not available");
+            return false;
+          }
         }
 
         // step 0: remove empty map and DROPPED state from best possible state

http://git-wip-us.apache.org/repos/asf/helix/blob/b72ff29d/helix-core/src/test/java/org/apache/helix/ZkUnitTestBase.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/ZkUnitTestBase.java b/helix-core/src/test/java/org/apache/helix/ZkUnitTestBase.java
index 6f467e5..48312e7 100644
--- a/helix-core/src/test/java/org/apache/helix/ZkUnitTestBase.java
+++ b/helix-core/src/test/java/org/apache/helix/ZkUnitTestBase.java
@@ -76,6 +76,7 @@ public class ZkUnitTestBase {
   public void beforeSuite() throws Exception {
     _zkServer = TestHelper.startZkServer(ZK_ADDR);
     AssertJUnit.assertTrue(_zkServer != null);
+    ZKClientPool.reset();
 
     // System.out.println("Number of open zkClient before ZkUnitTests: "
     // + ZkClient.getNumberOfConnections());

http://git-wip-us.apache.org/repos/asf/helix/blob/b72ff29d/helix-core/src/test/java/org/apache/helix/integration/TestDisableExternalView.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/TestDisableExternalView.java b/helix-core/src/test/java/org/apache/helix/integration/TestDisableExternalView.java
new file mode 100644
index 0000000..79ebec5
--- /dev/null
+++ b/helix-core/src/test/java/org/apache/helix/integration/TestDisableExternalView.java
@@ -0,0 +1,166 @@
+package org.apache.helix.integration;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.util.Arrays;
+import java.util.Date;
+import org.apache.helix.HelixProperty;
+import org.apache.helix.PropertyKey;
+import org.apache.helix.ZNRecord;
+import org.apache.helix.integration.manager.ClusterControllerManager;
+import org.apache.helix.integration.manager.MockParticipantManager;
+import org.apache.helix.manager.zk.ZKHelixAdmin;
+import org.apache.helix.manager.zk.ZKHelixDataAccessor;
+import org.apache.helix.manager.zk.ZkBaseDataAccessor;
+import org.apache.helix.model.ExternalView;
+import org.apache.helix.model.IdealState;
+import org.apache.helix.tools.ClusterStateVerifier;
+import org.testng.Assert;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+import com.google.common.collect.Lists;
+
+/**
+ * Test disable external-view in resource ideal state -
+ * if DISABLE_EXTERNAL_VIEW is set to true in a resource's idealstate,
+ * there should be no external view for this resource.
+ */
+public class TestDisableExternalView extends ZkIntegrationTestBase {
+  private static final String TEST_DB1 = "test_db1";
+  private static final String TEST_DB2 = "test_db2";
+
+  private static final int NODE_NR = 5;
+  private static final int START_PORT = 12918;
+  private static final String STATE_MODEL = "MasterSlave";
+  private static final int _PARTITIONS = 20;
+
+  private final String CLASS_NAME = getShortClassName();
+  private final String CLUSTER_NAME = CLUSTER_PREFIX + "_" + CLASS_NAME;
+
+  private MockParticipantManager[] _participants = new MockParticipantManager[NODE_NR];
+  private ClusterControllerManager _controller;
+  private String [] instances = new String[NODE_NR];
+
+  private ZKHelixAdmin _admin;
+
+  int _replica = 3;
+
+  @BeforeClass
+  public void beforeClass() throws Exception {
+    System.out.println("START " + CLASS_NAME + " at " + new Date(System.currentTimeMillis()));
+
+    _admin = new ZKHelixAdmin(_gZkClient);
+    String namespace = "/" + CLUSTER_NAME;
+    if (_gZkClient.exists(namespace)) {
+      _gZkClient.deleteRecursive(namespace);
+    }
+
+    // setup storage cluster
+    _gSetupTool.addCluster(CLUSTER_NAME, true);
+
+    _gSetupTool.addResourceToCluster(CLUSTER_NAME, TEST_DB1, _PARTITIONS, STATE_MODEL,
+        IdealState.RebalanceMode.FULL_AUTO + "");
+
+    IdealState idealState = _admin.getResourceIdealState(CLUSTER_NAME, TEST_DB1);
+    idealState.setDisableExternalView(true);
+    _admin.setResourceIdealState(CLUSTER_NAME, TEST_DB1, idealState);
+
+    _gSetupTool.addResourceToCluster(CLUSTER_NAME, TEST_DB2, _PARTITIONS, STATE_MODEL,
+        IdealState.RebalanceMode.FULL_AUTO + "");
+
+    for (int i = 0; i < NODE_NR; i++) {
+      instances[i] = PARTICIPANT_PREFIX + "_" + (START_PORT + i);
+      _gSetupTool.addInstanceToCluster(CLUSTER_NAME, instances[i]);
+    }
+
+    _gSetupTool.rebalanceStorageCluster(CLUSTER_NAME, TEST_DB1, _replica);
+    _gSetupTool.rebalanceStorageCluster(CLUSTER_NAME, TEST_DB2, _replica);
+
+    // start dummy participants
+    for (int i = 0; i < NODE_NR; i++) {
+      String instanceName = PARTICIPANT_PREFIX + "_" + (START_PORT + i);
+      MockParticipantManager participant =
+          new MockParticipantManager(ZK_ADDR, CLUSTER_NAME, instanceName);
+      participant.syncStart();
+      _participants[i] = participant;
+
+    }
+
+    // start controller
+    String controllerName = CONTROLLER_PREFIX + "_0";
+    _controller = new ClusterControllerManager(ZK_ADDR, CLUSTER_NAME, controllerName);
+    _controller.syncStart();
+
+    boolean result = ClusterStateVerifier.verifyByPolling(
+        new ClusterStateVerifier.BestPossAndExtViewZkVerifier(ZK_ADDR, CLUSTER_NAME));
+    Assert.assertTrue(result);
+  }
+
+  @Test
+  public void testDisableExternalView() throws InterruptedException {
+    ZKHelixDataAccessor accessor =
+        new ZKHelixDataAccessor(CLUSTER_NAME, new ZkBaseDataAccessor<ZNRecord>(_gZkClient));
+    PropertyKey.Builder keyBuilder = accessor.keyBuilder();
+
+    // verify external view for TEST_DB1 does not exist
+    ExternalView externalView = null;
+    externalView = accessor.getProperty(keyBuilder.externalView(TEST_DB1));
+    Assert.assertNull(externalView,
+        "There should be no external-view for " + TEST_DB1 + ", but is: " + externalView);
+
+    // verify external view for TEST_DB2 exists
+    externalView = accessor.getProperty(keyBuilder.externalView(TEST_DB2));
+    Assert.assertNotNull(externalView,
+        "Could not find external-view for " + TEST_DB2);
+
+    // disable external view in IS
+    IdealState idealState = _admin.getResourceIdealState(CLUSTER_NAME, TEST_DB2);
+    idealState.setDisableExternalView(true);
+    _admin.setResourceIdealState(CLUSTER_NAME, TEST_DB2, idealState);
+
+    // touch liveinstance to trigger externalview compute stage
+    String instance = PARTICIPANT_PREFIX + "_" + START_PORT;
+    HelixProperty liveInstance = accessor.getProperty(keyBuilder.liveInstance(instance));
+    accessor.setProperty(keyBuilder.liveInstance(instance), liveInstance);
+
+    // verify the external view for the db got removed
+    for (int i = 0; i < 10; i++) {
+      Thread.sleep(100);
+      externalView = accessor.getProperty(keyBuilder.externalView(TEST_DB2));
+      if (externalView == null) {
+        break;
+      }
+    }
+
+    Assert.assertNull(externalView, "external-view for " + TEST_DB2 + " should be removed, but was: "
+        + externalView);
+  }
+
+  @AfterClass
+  public void afterClass() {
+    // clean up
+    _controller.syncStop();
+    for (int i = 0; i < NODE_NR; i++) {
+      _participants[i].syncStop();
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/helix/blob/b72ff29d/helix-core/src/test/java/org/apache/helix/integration/task/TestRecurringJobQueue.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/task/TestRecurringJobQueue.java b/helix-core/src/test/java/org/apache/helix/integration/task/TestRecurringJobQueue.java
index 011ed81..4656a23 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/task/TestRecurringJobQueue.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/task/TestRecurringJobQueue.java
@@ -32,9 +32,13 @@ import org.apache.helix.HelixManagerFactory;
 import org.apache.helix.InstanceType;
 import org.apache.helix.PropertyKey;
 import org.apache.helix.TestHelper;
+import org.apache.helix.ZNRecord;
 import org.apache.helix.integration.ZkIntegrationTestBase;
 import org.apache.helix.integration.manager.ClusterControllerManager;
 import org.apache.helix.integration.manager.MockParticipantManager;
+import org.apache.helix.manager.zk.ZKHelixDataAccessor;
+import org.apache.helix.manager.zk.ZkBaseDataAccessor;
+import org.apache.helix.model.ExternalView;
 import org.apache.helix.participant.StateMachineEngine;
 import org.apache.helix.task.JobConfig;
 import org.apache.helix.task.JobContext;
@@ -75,6 +79,7 @@ public class TestRecurringJobQueue extends ZkIntegrationTestBase {
 
   private HelixManager _manager;
   private TaskDriver _driver;
+  private ZKHelixDataAccessor _accessor;
 
   @BeforeClass
   public void beforeClass() throws Exception {
@@ -83,6 +88,9 @@ public class TestRecurringJobQueue extends ZkIntegrationTestBase {
       _gZkClient.deleteRecursive(namespace);
     }
 
+    _accessor =
+        new ZKHelixDataAccessor(CLUSTER_NAME, new ZkBaseDataAccessor<ZNRecord>(_gZkClient));
+
     ClusterSetup setupTool = new ClusterSetup(ZK_ADDR);
     setupTool.addCluster(CLUSTER_NAME, true);
     for (int i = 0; i < n; i++) {
@@ -151,7 +159,7 @@ public class TestRecurringJobQueue extends ZkIntegrationTestBase {
 
   private JobQueue buildRecurrentJobQueue(String jobQueueName, int delayStart) {
     Map<String, String> cfgMap = new HashMap<String, String>();
-    cfgMap.put(WorkflowConfig.EXPIRY, String.valueOf(50000));
+    cfgMap.put(WorkflowConfig.EXPIRY, String.valueOf(500000));
     cfgMap.put(WorkflowConfig.RECURRENCE_INTERVAL, String.valueOf(120));
     cfgMap.put(WorkflowConfig.RECURRENCE_UNIT, "SECONDS");
     Calendar cal = Calendar.getInstance();
@@ -264,7 +272,8 @@ public class TestRecurringJobQueue extends ZkIntegrationTestBase {
     String deletedJob1 = currentJobNames.get(0);
     String namedSpaceDeletedJob1 = String.format("%s_%s", scheduledQueue, deletedJob1);
     TestUtil
-        .pollForJobState(_manager, scheduledQueue, namedSpaceDeletedJob1, TaskState.IN_PROGRESS);
+        .pollForJobState(_manager, scheduledQueue, namedSpaceDeletedJob1, TaskState.IN_PROGRESS,
+            TaskState.COMPLETED);
 
     // stop the queue
     LOG.info("Pausing job-queue: " + scheduledQueue);
@@ -282,7 +291,8 @@ public class TestRecurringJobQueue extends ZkIntegrationTestBase {
 
     // ensure job 2 is started
     TestUtil.pollForJobState(_manager, scheduledQueue,
-        String.format("%s_%s", scheduledQueue, currentJobNames.get(1)), TaskState.IN_PROGRESS);
+        String.format("%s_%s", scheduledQueue, currentJobNames.get(1)), TaskState.IN_PROGRESS,
+        TaskState.COMPLETED);
 
     // stop the queue
     LOG.info("Pausing job-queue: " + queueName);
@@ -373,6 +383,57 @@ public class TestRecurringJobQueue extends ZkIntegrationTestBase {
         String.format("%s_%s", scheduledQueue, jobNames.get(JOB_COUNTS - 1)));
   }
 
+  @Test
+  public void testJobsDisableExternalView() throws Exception {
+    String queueName = TestHelper.getTestMethodName();
+
+    // Create a queue
+    LOG.info("Starting job-queue: " + queueName);
+    JobQueue queue = buildRecurrentJobQueue(queueName);
+    _driver.createQueue(queue);
+
+    // create jobs
+    Map<String, String> commandConfig = ImmutableMap.of(TIMEOUT_CONFIG, String.valueOf(500));
+
+    JobConfig.Builder job1 =
+        new JobConfig.Builder().setCommand("Reindex").setJobCommandConfigMap(commandConfig)
+            .setTargetResource(WorkflowGenerator.DEFAULT_TGT_DB)
+            .setTargetPartitionStates(Sets.newHashSet("SLAVE"))
+            .setDisableExternalView(true);
+
+    JobConfig.Builder job2 =
+        new JobConfig.Builder().setCommand("Reindex").setJobCommandConfigMap(commandConfig)
+            .setTargetResource(WorkflowGenerator.DEFAULT_TGT_DB)
+            .setTargetPartitionStates(Sets.newHashSet("MASTER"));
+
+    // enqueue both jobs
+    _driver.enqueueJob(queueName, "job1", job1);
+    _driver.enqueueJob(queueName, "job2", job2);
+
+    WorkflowContext wCtx = TestUtil.pollForWorkflowContext(_manager, queueName);
+    String scheduledQueue = wCtx.getLastScheduledSingleWorkflow();
+
+    // ensure job1 is started
+    String namedSpaceJob1 = String.format("%s_%s", scheduledQueue, "job1");
+    TestUtil.pollForJobState(_manager, scheduledQueue, namedSpaceJob1, TaskState.IN_PROGRESS,
+        TaskState.COMPLETED);
+
+    PropertyKey.Builder keyBuilder = _accessor.keyBuilder();
+    // verify external view for job does not exists
+    ExternalView externalView = _accessor.getProperty(keyBuilder.externalView(namedSpaceJob1));
+    Assert.assertNull(externalView, "External View for " + namedSpaceJob1 + " shoudld not exist!");
+
+    // ensure job2 is completed
+    String namedSpaceJob2 = String.format("%s_%s", scheduledQueue, "job2");
+    TestUtil.pollForJobState(_manager, scheduledQueue, namedSpaceJob2, TaskState.IN_PROGRESS,
+        TaskState.COMPLETED);
+
+    // verify external view for job does not exists
+    externalView = _accessor.getProperty(keyBuilder.externalView(namedSpaceJob2));
+    Assert.assertNotNull(externalView, "Can not find external View for " + namedSpaceJob2 + "!");
+  }
+
+
   private void verifyJobDeleted(String queueName, String jobName) throws Exception {
     HelixDataAccessor accessor = _manager.getHelixDataAccessor();
     PropertyKey.Builder keyBuilder = accessor.keyBuilder();

http://git-wip-us.apache.org/repos/asf/helix/blob/b72ff29d/helix-core/src/test/java/org/apache/helix/integration/task/TestUtil.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/task/TestUtil.java b/helix-core/src/test/java/org/apache/helix/integration/task/TestUtil.java
index 5c081a4..d40ac89 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/task/TestUtil.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/task/TestUtil.java
@@ -18,7 +18,7 @@ package org.apache.helix.integration.task;
  * specific language governing permissions and limitations
  * under the License.
  */
-
+import java.util.Arrays;
 import java.util.ArrayList;
 import java.util.HashSet;
 import java.util.List;
@@ -62,8 +62,16 @@ public class TestUtil {
     Assert.assertEquals(ctx.getWorkflowState(), state);
   }
 
+  /**
+   * poll for job until it is at either state in targetStates.
+   * @param manager
+   * @param workflowResource
+   * @param jobName
+   * @param targetStates
+   * @throws InterruptedException
+   */
   public static void pollForJobState(HelixManager manager, String workflowResource, String jobName,
-      TaskState state) throws InterruptedException {
+      TaskState... targetStates) throws InterruptedException {
     // Get workflow config
     WorkflowConfig wfCfg = TaskUtil.getWorkflowCfg(manager, workflowResource);
     Assert.assertNotNull(wfCfg);
@@ -81,16 +89,17 @@ public class TestUtil {
       jobName = String.format("%s_%s", workflowResource, jobName);
     }
 
+    Set<TaskState> allowedStates = new HashSet<TaskState>(Arrays.asList(targetStates));
     // Wait for state
     long st = System.currentTimeMillis();
     do {
       Thread.sleep(100);
       ctx = TaskUtil.getWorkflowContext(manager, workflowResource);
     }
-    while ((ctx == null || ctx.getJobState(jobName) == null || ctx.getJobState(jobName) != state)
+    while ((ctx == null || ctx.getJobState(jobName) == null || !allowedStates.contains(ctx.getJobState(jobName)))
         && System.currentTimeMillis() < st + _default_timeout);
     Assert.assertNotNull(ctx);
-    Assert.assertEquals(ctx.getJobState(jobName), state);
+    Assert.assertTrue(allowedStates.contains(ctx.getJobState(jobName)));
   }
 
   public static void pollForEmptyJobState(final HelixManager manager, final String workflowName,