You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by ka...@apache.org on 2014/07/10 19:05:33 UTC

[50/50] [abbrv] git commit: Merge remote-tracking branch 'origin/helix-provisioning'

Merge remote-tracking branch 'origin/helix-provisioning'


Project: http://git-wip-us.apache.org/repos/asf/helix/repo
Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/713586c4
Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/713586c4
Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/713586c4

Branch: refs/heads/master
Commit: 713586c4282feb47e411f66d43af60132ec54e64
Parents: 884e071 0f79187
Author: Kanak Biscuitwala <ka...@apache.org>
Authored: Thu Jul 10 10:02:44 2014 -0700
Committer: Kanak Biscuitwala <ka...@apache.org>
Committed: Thu Jul 10 10:02:44 2014 -0700

----------------------------------------------------------------------
 helix-core/pom.xml                              |   4 +
 .../java/org/apache/helix/HelixService.java     |   8 +-
 .../java/org/apache/helix/api/Controller.java   |   4 +-
 .../java/org/apache/helix/api/Participant.java  |  14 +-
 .../java/org/apache/helix/api/Resource.java     |  34 +-
 .../helix/api/accessor/ClusterAccessor.java     |  11 +
 .../helix/api/accessor/ParticipantAccessor.java |  15 +-
 .../helix/api/accessor/ResourceAccessor.java    |  14 +-
 .../helix/api/config/ContainerConfig.java       |  66 ++
 .../apache/helix/api/config/ResourceConfig.java |  45 +-
 .../controller/GenericHelixController.java      |   2 +
 .../context/ControllerContextProvider.java      |   6 +-
 .../controller/provisioner/ContainerId.java     |  49 ++
 .../provisioner/ContainerProvider.java          |  36 ++
 .../controller/provisioner/ContainerSpec.java   |  65 ++
 .../controller/provisioner/ContainerState.java  |  34 ++
 .../controller/provisioner/Provisioner.java     |  42 ++
 .../provisioner/ProvisionerConfig.java          |  31 +
 .../controller/provisioner/ProvisionerRef.java  | 100 +++
 .../controller/provisioner/ServiceConfig.java   |  24 +
 .../controller/provisioner/TargetProvider.java  |  39 ++
 .../provisioner/TargetProviderResponse.java     |  68 +++
 .../stages/ContainerProvisioningStage.java      | 332 ++++++++++
 .../stages/ResourceComputationStage.java        |   1 +
 .../strategy/AutoRebalanceStrategy.java         |   4 +-
 .../knapsack/AbstractBaseKnapsackSolver.java    |  51 ++
 .../knapsack/AbstractKnapsackPropagator.java    | 123 ++++
 .../strategy/knapsack/BaseKnapsackSolver.java   |  68 +++
 .../strategy/knapsack/KnapsackAssignment.java   |  40 ++
 .../KnapsackCapacityPropagatorImpl.java         | 237 +++++++
 .../knapsack/KnapsackGenericSolverImpl.java     | 288 +++++++++
 .../strategy/knapsack/KnapsackItem.java         |  52 ++
 .../strategy/knapsack/KnapsackPropagator.java   |  80 +++
 .../strategy/knapsack/KnapsackSearchNode.java   |  81 +++
 .../knapsack/KnapsackSearchNodeImpl.java        |  96 +++
 .../strategy/knapsack/KnapsackSearchPath.java   |  58 ++
 .../knapsack/KnapsackSearchPathImpl.java        |  84 +++
 .../strategy/knapsack/KnapsackSolver.java       |  79 +++
 .../strategy/knapsack/KnapsackSolverImpl.java   | 210 +++++++
 .../strategy/knapsack/KnapsackState.java        |  61 ++
 .../strategy/knapsack/KnapsackStateImpl.java    |  80 +++
 .../helix/healthcheck/DecayAggregationType.java |   3 +-
 .../DefaultControllerMessageHandlerFactory.java |   7 +-
 ...ltParticipantErrorMessageHandlerFactory.java |   7 +-
 .../DefaultSchedulerMessageHandlerFactory.java  |   7 +-
 .../manager/zk/HelixConnectionAdaptor.java      |   3 +-
 .../apache/helix/manager/zk/ZKHelixAdmin.java   |   5 +-
 .../helix/manager/zk/ZkBaseDataAccessor.java    |   7 +-
 .../manager/zk/ZkCacheBaseDataAccessor.java     |   5 +-
 .../helix/manager/zk/ZkCallbackHandler.java     |  31 +-
 .../helix/manager/zk/ZkHelixAutoController.java |   4 +-
 .../helix/manager/zk/ZkHelixController.java     |  13 +-
 .../helix/manager/zk/ZkHelixParticipant.java    |   4 +-
 .../handling/AsyncCallbackService.java          |  10 +-
 .../handling/HelixStateTransitionHandler.java   |   2 +
 .../helix/model/ClusterConfiguration.java       |  52 +-
 .../java/org/apache/helix/model/IdealState.java |  16 +-
 .../org/apache/helix/model/InstanceConfig.java  |  80 ++-
 .../java/org/apache/helix/model/Message.java    |   3 +-
 .../helix/model/ProvisionerConfigHolder.java    | 184 ++++++
 .../helix/model/ResourceConfiguration.java      |  68 ++-
 .../participant/AbstractParticipantService.java | 142 +++++
 .../helix/participant/StateMachineEngine.java   |   3 +-
 .../helix/task/FixedTargetTaskRebalancer.java   | 163 +++++
 .../helix/task/GenericTaskRebalancer.java       | 277 +++++++++
 .../java/org/apache/helix/task/JobConfig.java   | 381 ++++++++++++
 .../java/org/apache/helix/task/JobContext.java  | 248 ++++++++
 .../main/java/org/apache/helix/task/JobDag.java | 151 +++++
 .../org/apache/helix/task/ScheduleConfig.java   | 162 +++++
 .../java/org/apache/helix/task/TargetState.java |   8 +-
 .../apache/helix/task/TaskCallbackContext.java  |  67 ++
 .../java/org/apache/helix/task/TaskConfig.java  | 340 +++--------
 .../org/apache/helix/task/TaskConstants.java    |   4 +
 .../java/org/apache/helix/task/TaskContext.java | 135 ----
 .../java/org/apache/helix/task/TaskDag.java     | 152 -----
 .../java/org/apache/helix/task/TaskDriver.java  | 213 +++++--
 .../java/org/apache/helix/task/TaskFactory.java |   5 +-
 .../org/apache/helix/task/TaskRebalancer.java   | 611 ++++++++++++-------
 .../java/org/apache/helix/task/TaskRunner.java  |  18 +-
 .../org/apache/helix/task/TaskStateModel.java   |  46 +-
 .../helix/task/TaskStateModelFactory.java       |   8 +-
 .../java/org/apache/helix/task/TaskUtil.java    | 334 ++++++++--
 .../java/org/apache/helix/task/Workflow.java    | 243 +++++---
 .../org/apache/helix/task/WorkflowConfig.java   |  58 +-
 .../org/apache/helix/task/WorkflowContext.java  |  73 +--
 .../org/apache/helix/task/beans/JobBean.java    |  44 ++
 .../apache/helix/task/beans/ScheduleBean.java   |  32 +
 .../org/apache/helix/task/beans/TaskBean.java   |  17 +-
 .../apache/helix/task/beans/WorkflowBean.java   |   7 +-
 .../org/apache/helix/tools/ClusterSetup.java    |   6 +-
 .../helix/tools/StateModelConfigGenerator.java  |  54 ++
 .../org/apache/helix/util/StatusUpdateUtil.java |   5 +-
 .../java/org/apache/helix/ZkTestHelper.java     |   6 +-
 .../helix/controller/stages/BaseStageTest.java  |   2 +-
 .../stages/TestMsgSelectionStage.java           |   8 +-
 .../strategy/TestNewAutoRebalanceStrategy.java  |   2 +-
 .../helix/healthcheck/TestAlertFireHistory.java |   1 -
 .../helix/healthcheck/TestExpandAlert.java      |   3 +-
 .../helix/healthcheck/TestSimpleAlert.java      |   3 +-
 .../healthcheck/TestSimpleWildcardAlert.java    |   3 +-
 .../helix/healthcheck/TestStalenessAlert.java   |   3 +-
 .../helix/healthcheck/TestWildcardAlert.java    |   3 +-
 .../TestAddNodeAfterControllerStart.java        |   4 +-
 .../TestAddStateModelFactoryAfterConnect.java   |   4 +-
 .../helix/integration/TestAutoRebalance.java    |   3 +-
 .../integration/TestCleanupExternalView.java    |   8 +-
 .../helix/integration/TestHelixConnection.java  |   8 +-
 .../integration/TestLocalContainerProvider.java | 346 +++++++++++
 .../TestMessagePartitionStateMismatch.java      |   3 +-
 .../TestParticipantErrorMessage.java            |   9 +-
 .../integration/TestResetPartitionState.java    |   7 +-
 .../helix/integration/TestSharedConnection.java |  12 +-
 .../helix/integration/TestStandAloneCMMain.java |   3 +-
 .../integration/TestStateTransitionTimeout.java |   3 +-
 ...dAloneCMTestBaseWithPropertyServerCheck.java |  30 +-
 .../manager/ClusterDistributedController.java   |   3 +-
 .../task/TestIndependentTaskRebalancer.java     | 331 ++++++++++
 .../integration/task/TestTaskRebalancer.java    | 136 +++--
 .../task/TestTaskRebalancerStopResume.java      |  59 +-
 .../apache/helix/integration/task/TestUtil.java |  15 +-
 .../integration/task/WorkflowGenerator.java     |  74 ++-
 .../zk/TestZKPropertyTransferServer.java        |   6 +-
 .../manager/zk/TestZkHelixAutoController.java   |   4 +-
 .../helix/manager/zk/TestZkHelixController.java |   6 +-
 .../manager/zk/TestZkHelixParticipant.java      |   9 +-
 .../zk/TestZkManagerFlappingDetection.java      |   1 -
 .../handling/TestConfigThreadpoolSize.java      |   7 +-
 .../handling/TestHelixTaskExecutor.java         |   9 +-
 .../apache/helix/tools/TestHelixAdminCli.java   |   3 +-
 .../helix/examples/LogicalModelExample.java     |  12 +-
 helix-provisioning/.gitignore                   |  16 +
 helix-provisioning/DISCLAIMER                   |  15 +
 helix-provisioning/LICENSE                      | 273 +++++++++
 helix-provisioning/NOTICE                       |  30 +
 helix-provisioning/README.md                    |  35 ++
 helix-provisioning/pom.xml                      | 118 ++++
 helix-provisioning/src/assemble/assembly.xml    |  60 ++
 .../src/main/config/log4j.properties            |  31 +
 .../apache/helix/provisioning/AppConfig.java    |  35 ++
 .../helix/provisioning/ApplicationSpec.java     |  46 ++
 .../provisioning/ApplicationSpecFactory.java    |  28 +
 .../provisioning/ContainerAskResponse.java      |  36 ++
 .../provisioning/ContainerLaunchResponse.java   |  24 +
 .../provisioning/ContainerReleaseResponse.java  |  24 +
 .../provisioning/ContainerStopResponse.java     |  24 +
 .../helix/provisioning/HelixYarnUtil.java       |  61 ++
 .../helix/provisioning/ParticipantLauncher.java | 156 +++++
 .../helix/provisioning/ServiceConfig.java       |  32 +
 .../apache/helix/provisioning/TaskConfig.java   |  48 ++
 .../StatelessParticipantService.java            |  86 +++
 .../participant/StatelessServiceStateModel.java |  56 ++
 .../StatelessServiceStateModelFactory.java      |  39 ++
 .../provisioning/tools/ContainerAdmin.java      | 116 ++++
 .../tools/UpdateProvisionerConfig.java          | 106 ++++
 .../helix/provisioning/yarn/AppLauncher.java    | 580 ++++++++++++++++++
 .../provisioning/yarn/AppMasterConfig.java      | 130 ++++
 .../provisioning/yarn/AppMasterLauncher.java    | 213 +++++++
 .../yarn/AppStatusReportGenerator.java          | 103 ++++
 .../provisioning/yarn/FixedTargetProvider.java  |  39 ++
 .../yarn/GenericApplicationMaster.java          | 316 ++++++++++
 .../yarn/LaunchContainerRunnable.java           |  98 +++
 .../provisioning/yarn/NMCallbackHandler.java    | 103 ++++
 .../provisioning/yarn/RMCallbackHandler.java    | 150 +++++
 .../provisioning/yarn/YarnProvisioner.java      | 411 +++++++++++++
 .../yarn/YarnProvisionerConfig.java             |  73 +++
 .../src/main/resources/sample_application.yaml  |  42 ++
 helix-provisioning/src/test/conf/testng.xml     |  27 +
 pom.xml                                         |   6 +
 recipes/helloworld-provisioning-yarn/pom.xml    | 158 +++++
 recipes/helloworld-provisioning-yarn/run.sh     |   6 +
 .../src/assemble/assembly.xml                   |  60 ++
 .../src/main/config/log4j.properties            |  31 +
 .../yarn/example/HelloWordAppSpecFactory.java   |  48 ++
 .../yarn/example/HelloWorldService.java         |  56 ++
 .../yarn/example/HelloworldAppSpec.java         | 167 +++++
 .../main/resources/hello_world_app_spec.yaml    |  42 ++
 .../src/test/conf/testng.xml                    |  27 +
 recipes/jobrunner-yarn/pom.xml                  | 158 +++++
 recipes/jobrunner-yarn/run.sh                   |   6 +
 .../jobrunner-yarn/src/assemble/assembly.xml    |  60 ++
 .../src/main/config/log4j.properties            |  31 +
 .../yarn/example/JobRunnerMain.java             | 151 +++++
 .../helix/provisioning/yarn/example/MyTask.java |  72 +++
 .../yarn/example/MyTaskAppSpec.java             | 167 +++++
 .../yarn/example/MyTaskAppSpecFactory.java      |  47 ++
 .../yarn/example/MyTaskService.java             |  81 +++
 .../src/main/resources/dummy_job.yaml           |  36 ++
 .../src/main/resources/job_runner_app_spec.yaml |  41 ++
 recipes/jobrunner-yarn/src/test/conf/testng.xml |  27 +
 recipes/pom.xml                                 |   2 +
 190 files changed, 12263 insertions(+), 1400 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/helix/blob/713586c4/helix-core/pom.xml
----------------------------------------------------------------------
diff --cc helix-core/pom.xml
index 7ca41d7,0f1f2b9..a8414f4
--- a/helix-core/pom.xml
+++ b/helix-core/pom.xml
@@@ -213,9 -217,9 +213,13 @@@ under the License
                <name>test-util</name>
              </program>
              <program>
 +              <mainClass>org.apache.helix.tools.ZkGrep</mainClass>
 +              <name>zkgrep</name>
 +            </program>
++            <program>
+               <mainClass>org.apache.helix.task.TaskDriver</mainClass>
+               <name>task-driver</name>
+             </program>
            </programs>
          </configuration>
        </plugin>

http://git-wip-us.apache.org/repos/asf/helix/blob/713586c4/helix-core/src/main/java/org/apache/helix/api/accessor/ClusterAccessor.java
----------------------------------------------------------------------
diff --cc helix-core/src/main/java/org/apache/helix/api/accessor/ClusterAccessor.java
index a41da5a,e653338..c85dd0b
--- a/helix-core/src/main/java/org/apache/helix/api/accessor/ClusterAccessor.java
+++ b/helix-core/src/main/java/org/apache/helix/api/accessor/ClusterAccessor.java
@@@ -55,7 -54,7 +55,8 @@@ import org.apache.helix.api.id.SessionI
  import org.apache.helix.api.id.StateModelDefId;
  import org.apache.helix.controller.context.ControllerContext;
  import org.apache.helix.controller.context.ControllerContextHolder;
+ import org.apache.helix.controller.provisioner.ProvisionerConfig;
 +import org.apache.helix.controller.rebalancer.RebalancerRef;
  import org.apache.helix.controller.rebalancer.config.PartitionedRebalancerConfig;
  import org.apache.helix.controller.rebalancer.config.RebalancerConfig;
  import org.apache.helix.controller.rebalancer.config.RebalancerConfigHolder;
@@@ -768,22 -700,6 +770,27 @@@ public class ClusterAccessor 
      if (idealState != null) {
        _accessor.setProperty(_keyBuilder.idealStates(resourceId.stringify()), idealState);
      }
 +
 +    // Add resource user config
 +    if (resource.getUserConfig() != null) {
 +      ResourceConfiguration configuration = new ResourceConfiguration(resourceId);
 +      configuration.setType(resource.getType());
 +      configuration.addNamespacedConfig(resource.getUserConfig());
 +      PartitionedRebalancerConfig partitionedConfig = PartitionedRebalancerConfig.from(config);
 +      if (idealState == null
 +          && (partitionedConfig == null || partitionedConfig.getRebalanceMode() == RebalanceMode.USER_DEFINED)) {
 +        // only persist if this is not easily convertible to an ideal state
 +        configuration
 +            .addNamespacedConfig(new RebalancerConfigHolder(resource.getRebalancerConfig())
 +                .toNamespacedConfig());
 +      }
++      ProvisionerConfig provisionerConfig = resource.getProvisionerConfig();
++      if (provisionerConfig != null) {
++        configuration.addNamespacedConfig(new ProvisionerConfigHolder(provisionerConfig)
++            .toNamespacedConfig());
++      }
 +      _accessor.setProperty(_keyBuilder.resourceConfig(resourceId.stringify()), configuration);
 +    }
      return true;
    }
  
@@@ -812,10 -728,14 +819,14 @@@
      BaseDataAccessor<?> baseAccessor = _accessor.getBaseDataAccessor();
      if (baseAccessor != null) {
        boolean[] existsResults = baseAccessor.exists(paths, 0);
 -      int ind =0;
++      int ind = 0;
        for (boolean exists : existsResults) {
 -        
++
          if (!exists) {
 -          LOG.warn("Path does not exist:"+ paths.get(ind));
++          LOG.warn("Path does not exist:" + paths.get(ind));
            return false;
          }
+         ind = ind + 1;
        }
      }
      return true;

http://git-wip-us.apache.org/repos/asf/helix/blob/713586c4/helix-core/src/main/java/org/apache/helix/api/accessor/ParticipantAccessor.java
----------------------------------------------------------------------
diff --cc helix-core/src/main/java/org/apache/helix/api/accessor/ParticipantAccessor.java
index d0cc3ba,cf4c549..cb52e91
--- a/helix-core/src/main/java/org/apache/helix/api/accessor/ParticipantAccessor.java
+++ b/helix-core/src/main/java/org/apache/helix/api/accessor/ParticipantAccessor.java
@@@ -43,9 -43,9 +43,10 @@@ import org.apache.helix.api.Resource
  import org.apache.helix.api.RunningInstance;
  import org.apache.helix.api.Scope;
  import org.apache.helix.api.State;
+ import org.apache.helix.api.config.ContainerConfig;
  import org.apache.helix.api.config.ParticipantConfig;
  import org.apache.helix.api.config.UserConfig;
 +import org.apache.helix.api.id.ClusterId;
  import org.apache.helix.api.id.MessageId;
  import org.apache.helix.api.id.ParticipantId;
  import org.apache.helix.api.id.PartitionId;

http://git-wip-us.apache.org/repos/asf/helix/blob/713586c4/helix-core/src/main/java/org/apache/helix/api/accessor/ResourceAccessor.java
----------------------------------------------------------------------
diff --cc helix-core/src/main/java/org/apache/helix/api/accessor/ResourceAccessor.java
index 310b457,0052871..7dde6ee
--- a/helix-core/src/main/java/org/apache/helix/api/accessor/ResourceAccessor.java
+++ b/helix-core/src/main/java/org/apache/helix/api/accessor/ResourceAccessor.java
@@@ -39,7 -38,7 +39,8 @@@ import org.apache.helix.api.id.ClusterI
  import org.apache.helix.api.id.ParticipantId;
  import org.apache.helix.api.id.PartitionId;
  import org.apache.helix.api.id.ResourceId;
+ import org.apache.helix.controller.provisioner.ProvisionerConfig;
 +import org.apache.helix.controller.rebalancer.RebalancerRef;
  import org.apache.helix.controller.rebalancer.config.BasicRebalancerConfig;
  import org.apache.helix.controller.rebalancer.config.CustomRebalancerConfig;
  import org.apache.helix.controller.rebalancer.config.PartitionedRebalancerConfig;
@@@ -274,11 -260,13 +276,17 @@@ public class ResourceAccessor 
        // only persist if this is not easily convertible to an ideal state
        config.addNamespacedConfig(new RebalancerConfigHolder(resourceConfig.getRebalancerConfig())
            .toNamespacedConfig());
 +      config.setBucketSize(resourceConfig.getBucketSize());
 +      config.setBatchMessageMode(resourceConfig.getBatchMessageMode());
 +    } else if (userConfig == null) {
 +      config = null;
      }
+     if (resourceConfig.getProvisionerConfig() != null) {
+       config.addNamespacedConfig(new ProvisionerConfigHolder(resourceConfig.getProvisionerConfig())
+           .toNamespacedConfig());
+     }
+     config.setBucketSize(resourceConfig.getBucketSize());
+     config.setBatchMessageMode(resourceConfig.getBatchMessageMode());
      setConfiguration(resourceId, config, resourceConfig.getRebalancerConfig());
      return true;
    }

http://git-wip-us.apache.org/repos/asf/helix/blob/713586c4/helix-core/src/main/java/org/apache/helix/controller/GenericHelixController.java
----------------------------------------------------------------------
diff --cc helix-core/src/main/java/org/apache/helix/controller/GenericHelixController.java
index c8b033e,b5fb23e..f9af914
--- a/helix-core/src/main/java/org/apache/helix/controller/GenericHelixController.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/GenericHelixController.java
@@@ -48,10 -45,9 +48,11 @@@ import org.apache.helix.api.id.SessionI
  import org.apache.helix.controller.pipeline.Pipeline;
  import org.apache.helix.controller.pipeline.PipelineRegistry;
  import org.apache.helix.controller.stages.BestPossibleStateCalcStage;
 +import org.apache.helix.controller.stages.ClusterDataCache;
  import org.apache.helix.controller.stages.ClusterEvent;
 +import org.apache.helix.controller.stages.ClusterEventBlockingQueue;
  import org.apache.helix.controller.stages.CompatibilityCheckStage;
+ import org.apache.helix.controller.stages.ContainerProvisioningStage;
  import org.apache.helix.controller.stages.CurrentStateComputationStage;
  import org.apache.helix.controller.stages.ExternalViewComputeStage;
  import org.apache.helix.controller.stages.MessageGenerationStage;
@@@ -201,7 -183,7 +202,8 @@@ public class GenericHelixController imp
        Pipeline rebalancePipeline = new Pipeline();
        rebalancePipeline.addStage(new CompatibilityCheckStage());
        rebalancePipeline.addStage(new ResourceComputationStage());
 +      rebalancePipeline.addStage(new ResourceValidationStage());
+       rebalancePipeline.addStage(new ContainerProvisioningStage());
        rebalancePipeline.addStage(new CurrentStateComputationStage());
        rebalancePipeline.addStage(new BestPossibleStateCalcStage());
        rebalancePipeline.addStage(new MessageGenerationStage());

http://git-wip-us.apache.org/repos/asf/helix/blob/713586c4/helix-core/src/main/java/org/apache/helix/controller/provisioner/ContainerId.java
----------------------------------------------------------------------
diff --cc helix-core/src/main/java/org/apache/helix/controller/provisioner/ContainerId.java
index 0000000,49bc0fc..b42d881
mode 000000,100644..100644
--- a/helix-core/src/main/java/org/apache/helix/controller/provisioner/ContainerId.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/provisioner/ContainerId.java
@@@ -1,0 -1,30 +1,49 @@@
+ package org.apache.helix.controller.provisioner;
+ 
++/*
++ * Licensed to the Apache Software Foundation (ASF) under one
++ * or more contributor license agreements.  See the NOTICE file
++ * distributed with this work for additional information
++ * regarding copyright ownership.  The ASF licenses this file
++ * to you under the Apache License, Version 2.0 (the
++ * "License"); you may not use this file except in compliance
++ * with the License.  You may obtain a copy of the License at
++ *
++ *   http://www.apache.org/licenses/LICENSE-2.0
++ *
++ * Unless required by applicable law or agreed to in writing,
++ * software distributed under the License is distributed on an
++ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
++ * KIND, either express or implied.  See the License for the
++ * specific language governing permissions and limitations
++ * under the License.
++ */
++
+ import org.apache.helix.api.id.Id;
+ 
+ public class ContainerId extends Id {
+ 
+   String id;
+ 
+   private ContainerId(String containerId) {
+     this.id = containerId;
+   }
+ 
+   @Override
+   public String stringify() {
+     return id;
+   }
+ 
+   /**
+    * Get a concrete partition id
+    * @param partitionId string partition identifier
+    * @return PartitionId
+    */
+   public static ContainerId from(String containerId) {
+     if (containerId == null) {
+       return null;
+     }
+     return new ContainerId(containerId);
+   }
+ 
+ }

http://git-wip-us.apache.org/repos/asf/helix/blob/713586c4/helix-core/src/main/java/org/apache/helix/controller/provisioner/ContainerSpec.java
----------------------------------------------------------------------
diff --cc helix-core/src/main/java/org/apache/helix/controller/provisioner/ContainerSpec.java
index 0000000,ab3c46a..36ad7f9
mode 000000,100644..100644
--- a/helix-core/src/main/java/org/apache/helix/controller/provisioner/ContainerSpec.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/provisioner/ContainerSpec.java
@@@ -1,0 -1,65 +1,65 @@@
+ package org.apache.helix.controller.provisioner;
+ 
+ import org.apache.helix.api.id.ParticipantId;
+ 
+ /*
+  * Licensed to the Apache Software Foundation (ASF) under one
+  * or more contributor license agreements.  See the NOTICE file
+  * distributed with this work for additional information
+  * regarding copyright ownership.  The ASF licenses this file
+  * to you under the Apache License, Version 2.0 (the
+  * "License"); you may not use this file except in compliance
+  * with the License.  You may obtain a copy of the License at
+  *
+  *   http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing,
+  * software distributed under the License is distributed on an
+  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+  * KIND, either express or implied.  See the License for the
+  * specific language governing permissions and limitations
+  * under the License.
+  */
+ 
+ public class ContainerSpec {
+   /**
+    * Some unique id representing the container.
+    */
+   ContainerId _containerId;
 -  
++
+   int _memory;
+ 
+   private ParticipantId _participantId;
+ 
+   public ContainerSpec(ParticipantId _participantId) {
+     this._participantId = _participantId;
+   }
+ 
+   public ContainerId getContainerId() {
+     return _containerId;
+   }
+ 
+   @Override
+   public String toString() {
+     return _participantId.toString();
+   }
 -  
 -  public void setMemory(int memory){
++
++  public void setMemory(int memory) {
+     _memory = memory;
+   }
+ 
 -  public int getMemory(){
++  public int getMemory() {
+     return _memory;
+   }
 -  
++
+   public static ContainerSpec from(String serialized) {
 -    //todo 
++    // todo
+     return null;
 -    //return new ContainerSpec(ContainerId.from(serialized));
++    // return new ContainerSpec(ContainerId.from(serialized));
+   }
+ 
+   public ParticipantId getParticipantId() {
+     return _participantId;
+   }
 -  
++
+ }

http://git-wip-us.apache.org/repos/asf/helix/blob/713586c4/helix-core/src/main/java/org/apache/helix/controller/provisioner/ServiceConfig.java
----------------------------------------------------------------------
diff --cc helix-core/src/main/java/org/apache/helix/controller/provisioner/ServiceConfig.java
index 0000000,adccb2c..9370cfd
mode 000000,100644..100644
--- a/helix-core/src/main/java/org/apache/helix/controller/provisioner/ServiceConfig.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/provisioner/ServiceConfig.java
@@@ -1,0 -1,5 +1,24 @@@
+ package org.apache.helix.controller.provisioner;
+ 
++/*
++ * Licensed to the Apache Software Foundation (ASF) under one
++ * or more contributor license agreements.  See the NOTICE file
++ * distributed with this work for additional information
++ * regarding copyright ownership.  The ASF licenses this file
++ * to you under the Apache License, Version 2.0 (the
++ * "License"); you may not use this file except in compliance
++ * with the License.  You may obtain a copy of the License at
++ *
++ *   http://www.apache.org/licenses/LICENSE-2.0
++ *
++ * Unless required by applicable law or agreed to in writing,
++ * software distributed under the License is distributed on an
++ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
++ * KIND, either express or implied.  See the License for the
++ * specific language governing permissions and limitations
++ * under the License.
++ */
++
+ public class ServiceConfig {
+ 
+ }

http://git-wip-us.apache.org/repos/asf/helix/blob/713586c4/helix-core/src/main/java/org/apache/helix/controller/provisioner/TargetProvider.java
----------------------------------------------------------------------
diff --cc helix-core/src/main/java/org/apache/helix/controller/provisioner/TargetProvider.java
index 0000000,063d008..1e5957b
mode 000000,100644..100644
--- a/helix-core/src/main/java/org/apache/helix/controller/provisioner/TargetProvider.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/provisioner/TargetProvider.java
@@@ -1,0 -1,42 +1,39 @@@
+ package org.apache.helix.controller.provisioner;
+ 
+ /*
+  * Licensed to the Apache Software Foundation (ASF) under one
+  * or more contributor license agreements.  See the NOTICE file
+  * distributed with this work for additional information
+  * regarding copyright ownership.  The ASF licenses this file
+  * to you under the Apache License, Version 2.0 (the
+  * "License"); you may not use this file except in compliance
+  * with the License.  You may obtain a copy of the License at
+  *
+  *   http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing,
+  * software distributed under the License is distributed on an
+  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+  * KIND, either express or implied.  See the License for the
+  * specific language governing permissions and limitations
+  * under the License.
+  */
+ 
+ import java.util.Collection;
+ 
 -import org.apache.helix.HelixManager;
+ import org.apache.helix.api.Cluster;
+ import org.apache.helix.api.Participant;
 -import org.apache.helix.api.config.ResourceConfig;
+ import org.apache.helix.api.id.ResourceId;
+ 
+ public interface TargetProvider {
+ 
 -
+   /**
+    * @param cluster
+    * @param resourceId ResourceId name of the resource
+    * @param participants
+    * @return
+    */
+   TargetProviderResponse evaluateExistingContainers(Cluster cluster, ResourceId resourceId,
+       Collection<Participant> participants);
+ 
+ }

http://git-wip-us.apache.org/repos/asf/helix/blob/713586c4/helix-core/src/main/java/org/apache/helix/controller/stages/ContainerProvisioningStage.java
----------------------------------------------------------------------
diff --cc helix-core/src/main/java/org/apache/helix/controller/stages/ContainerProvisioningStage.java
index 0000000,ae433e0..25645d3
mode 000000,100644..100644
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/ContainerProvisioningStage.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/ContainerProvisioningStage.java
@@@ -1,0 -1,332 +1,332 @@@
+ package org.apache.helix.controller.stages;
+ 
+ /*
+  * Licensed to the Apache Software Foundation (ASF) under one
+  * or more contributor license agreements.  See the NOTICE file
+  * distributed with this work for additional information
+  * regarding copyright ownership.  The ASF licenses this file
+  * to you under the Apache License, Version 2.0 (the
+  * "License"); you may not use this file except in compliance
+  * with the License.  You may obtain a copy of the License at
+  *
+  *   http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing,
+  * software distributed under the License is distributed on an
+  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+  * KIND, either express or implied.  See the License for the
+  * specific language governing permissions and limitations
+  * under the License.
+  */
+ 
+ import java.util.Collection;
+ import java.util.HashMap;
+ import java.util.List;
+ import java.util.Map;
+ import java.util.UUID;
+ 
+ import org.apache.helix.HelixAdmin;
+ import org.apache.helix.HelixDataAccessor;
+ import org.apache.helix.HelixManager;
+ import org.apache.helix.PropertyKey;
+ import org.apache.helix.api.Cluster;
+ import org.apache.helix.api.Participant;
+ import org.apache.helix.api.config.ContainerConfig;
+ import org.apache.helix.api.config.ResourceConfig;
+ import org.apache.helix.api.id.ParticipantId;
+ import org.apache.helix.api.id.ResourceId;
+ import org.apache.helix.controller.pipeline.AbstractBaseStage;
+ import org.apache.helix.controller.provisioner.ContainerId;
+ import org.apache.helix.controller.provisioner.ContainerProvider;
+ import org.apache.helix.controller.provisioner.ContainerSpec;
+ import org.apache.helix.controller.provisioner.ContainerState;
+ import org.apache.helix.controller.provisioner.Provisioner;
+ import org.apache.helix.controller.provisioner.ProvisionerConfig;
+ import org.apache.helix.controller.provisioner.ProvisionerRef;
+ import org.apache.helix.controller.provisioner.TargetProvider;
+ import org.apache.helix.controller.provisioner.TargetProviderResponse;
+ import org.apache.helix.model.InstanceConfig;
+ import org.apache.helix.model.Message;
+ import org.apache.helix.model.Message.MessageType;
+ import org.apache.log4j.Logger;
+ 
+ import com.google.common.util.concurrent.FutureCallback;
+ import com.google.common.util.concurrent.Futures;
+ import com.google.common.util.concurrent.ListenableFuture;
+ 
+ /**
+  * This stage will manager the container allocation/deallocation needed for a
+  * specific resource.<br/>
+  * It does the following <br/>
+  * From the idealstate, it gets ContainerTargetProvider and ContainerProvider <br/>
+  * ContainerTargetProviderFactory will provide the number of containers needed
+  * for a resource <br/>
+  * ContainerProvider will provide the ability to allocate, deallocate, start,
+  * stop container <br/>
+  */
+ public class ContainerProvisioningStage extends AbstractBaseStage {
+   private static final Logger LOG = Logger.getLogger(ContainerProvisioningStage.class);
+ 
+   Map<ResourceId, Provisioner> _provisionerMap = new HashMap<ResourceId, Provisioner>();
+   Map<ResourceId, TargetProvider> _targetProviderMap = new HashMap<ResourceId, TargetProvider>();
+   Map<ResourceId, ContainerProvider> _containerProviderMap =
+       new HashMap<ResourceId, ContainerProvider>();
+ 
+   @Override
+   public void process(ClusterEvent event) throws Exception {
+     final HelixManager helixManager = event.getAttribute("helixmanager");
+     final Map<ResourceId, ResourceConfig> resourceMap =
+         event.getAttribute(AttributeName.RESOURCES.toString());
+     final HelixAdmin helixAdmin = helixManager.getClusterManagmentTool();
+     final HelixDataAccessor accessor = helixManager.getHelixDataAccessor();
+     final PropertyKey.Builder keyBuilder = accessor.keyBuilder();
+     for (ResourceId resourceId : resourceMap.keySet()) {
+       ResourceConfig resourceConfig = resourceMap.get(resourceId);
+       ProvisionerConfig provisionerConfig = resourceConfig.getProvisionerConfig();
+       if (provisionerConfig != null) {
+         Provisioner provisioner;
+         provisioner = _provisionerMap.get(resourceId);
+ 
+         // instantiate and cache a provisioner if there isn't one already cached
+         if (provisioner == null) {
+           ProvisionerRef provisionerRef = provisionerConfig.getProvisionerRef();
+           if (provisionerRef != null) {
+             provisioner = provisionerRef.getProvisioner();
+           }
+           if (provisioner != null) {
+             provisioner.init(helixManager, resourceConfig);
+             _containerProviderMap.put(resourceId, provisioner.getContainerProvider());
+             _targetProviderMap.put(resourceId, provisioner.getTargetProvider());
+             _provisionerMap.put(resourceId, provisioner);
+           } else {
+             LOG.error("Resource " + resourceId + " does not have a valid provisioner class!");
+             break;
+           }
+         }
+         TargetProvider targetProvider = _targetProviderMap.get(resourceId);
+         ContainerProvider containerProvider = _containerProviderMap.get(resourceId);
 -        final Cluster cluster = event.getAttribute("ClusterDataCache");
++        final Cluster cluster = event.getAttribute("Cluster");
+         final Collection<Participant> participants = cluster.getParticipantMap().values();
+ 
+         // If a process died, we need to mark it as DISCONNECTED or if the process is ready, mark as
+         // CONNECTED
+         Map<ParticipantId, Participant> participantMap = cluster.getParticipantMap();
+         for (ParticipantId participantId : participantMap.keySet()) {
+           Participant participant = participantMap.get(participantId);
+           ContainerConfig config = participant.getContainerConfig();
+           if (config != null) {
+             ContainerState containerState = config.getState();
+             if (!participant.isAlive() && ContainerState.CONNECTED.equals(containerState)) {
+               // Need to mark as disconnected if process died
+               LOG.info("Participant " + participantId + " died, marking as DISCONNECTED");
+               updateContainerState(helixAdmin, accessor, keyBuilder, cluster, participantId,
+                   ContainerState.DISCONNECTED);
+             } else if (participant.isAlive() && ContainerState.CONNECTING.equals(containerState)) {
+               // Need to mark as connected only when the live instance is visible
+               LOG.info("Participant " + participantId + " is ready, marking as CONNECTED");
+               updateContainerState(helixAdmin, accessor, keyBuilder, cluster, participantId,
+                   ContainerState.CONNECTED);
+             } else if (!participant.isAlive() && ContainerState.HALTING.equals(containerState)) {
+               // Need to mark as connected only when the live instance is visible
+               LOG.info("Participant " + participantId + " is has been killed, marking as HALTED");
+               updateContainerState(helixAdmin, accessor, keyBuilder, cluster, participantId,
+                   ContainerState.HALTED);
+             }
+           }
+         }
+ 
+         // Participants registered in helix
+         // Give those participants to targetprovider
+         // Provide the response that contains, new containerspecs, containers to be released,
+         // containers to be stopped
+         // call the respective provisioner to allocate and start the container.
+         // Each container is then started its state is changed from any place.
+         // The target provider is given the state of container and asked for its new state. For each
+         // state there is a corresponding handler function.
+ 
+         // TargetProvider should be stateless, given the state of cluster and existing participants
+         // it should return the same result
+         final TargetProviderResponse response =
+             targetProvider.evaluateExistingContainers(cluster, resourceId, participants);
+ 
+         // allocate new containers
+         for (final ContainerSpec spec : response.getContainersToAcquire()) {
+           final ParticipantId participantId = spec.getParticipantId();
+           List<String> instancesInCluster =
+               helixAdmin.getInstancesInCluster(cluster.getId().stringify());
+           if (!instancesInCluster.contains(participantId.stringify())) {
+             // create a new Participant, attach the container spec
+             InstanceConfig instanceConfig = new InstanceConfig(participantId);
+             instanceConfig.setContainerSpec(spec);
+             // create a helix_participant in ACQUIRING state
+             instanceConfig.setContainerState(ContainerState.ACQUIRING);
+             // create the helix participant and add it to cluster
+             helixAdmin.addInstance(cluster.getId().toString(), instanceConfig);
+           }
+           LOG.info("Allocating container for " + participantId);
+           ListenableFuture<ContainerId> future = containerProvider.allocateContainer(spec);
+           FutureCallback<ContainerId> callback = new FutureCallback<ContainerId>() {
+             @Override
+             public void onSuccess(ContainerId containerId) {
+               LOG.info("Container " + containerId + " acquired. Marking " + participantId);
+               InstanceConfig existingInstance =
+                   helixAdmin
+                       .getInstanceConfig(cluster.getId().toString(), participantId.toString());
+               existingInstance.setContainerId(containerId);
+               existingInstance.setContainerState(ContainerState.ACQUIRED);
+               accessor.updateProperty(keyBuilder.instanceConfig(participantId.toString()),
+                   existingInstance);
+             }
+ 
+             @Override
+             public void onFailure(Throwable t) {
+               LOG.error("Could not allocate a container for participant " + participantId, t);
+               updateContainerState(helixAdmin, accessor, keyBuilder, cluster, participantId,
+                   ContainerState.FAILED);
+             }
+           };
+           safeAddCallback(future, callback);
+         }
+ 
+         // start new containers
+         for (final Participant participant : response.getContainersToStart()) {
+           final InstanceConfig existingInstance =
+               helixAdmin.getInstanceConfig(cluster.getId().toString(), participant.getId()
+                   .toString());
+           final ContainerId containerId = existingInstance.getContainerId();
+           existingInstance.setContainerState(ContainerState.CONNECTING);
+           accessor.updateProperty(keyBuilder.instanceConfig(participant.getId().toString()),
+               existingInstance);
+           // create the helix participant and add it to cluster
+           LOG.info("Starting container " + containerId + " for " + participant.getId());
+           ListenableFuture<Boolean> future =
+               containerProvider.startContainer(containerId, participant);
+           FutureCallback<Boolean> callback = new FutureCallback<Boolean>() {
+             @Override
+             public void onSuccess(Boolean result) {
+               // Do nothing yet, need to wait for live instance
+               LOG.info("Container " + containerId + " started for " + participant.getId());
+             }
+ 
+             @Override
+             public void onFailure(Throwable t) {
+               LOG.error("Could not start container" + containerId + "for participant "
+                   + participant.getId(), t);
+               updateContainerState(helixAdmin, accessor, keyBuilder, cluster, participant.getId(),
+                   ContainerState.FAILED);
+             }
+           };
+           safeAddCallback(future, callback);
+         }
+ 
+         // release containers
+         for (final Participant participant : response.getContainersToRelease()) {
+           // mark it as finalizing
+           final InstanceConfig existingInstance =
+               helixAdmin.getInstanceConfig(cluster.getId().toString(), participant.getId()
+                   .toString());
+           final ContainerId containerId = existingInstance.getContainerId();
+           existingInstance.setContainerState(ContainerState.FINALIZING);
+           accessor.updateProperty(keyBuilder.instanceConfig(participant.getId().toString()),
+               existingInstance);
+           // remove the participant
+           LOG.info("Deallocating container " + containerId + " for " + participant.getId());
+           ListenableFuture<Boolean> future = containerProvider.deallocateContainer(containerId);
+           FutureCallback<Boolean> callback = new FutureCallback<Boolean>() {
+             @Override
+             public void onSuccess(Boolean result) {
+               LOG.info("Container " + containerId + " deallocated. Dropping " + participant.getId());
+               InstanceConfig existingInstance =
+                   helixAdmin.getInstanceConfig(cluster.getId().toString(), participant.getId()
+                       .toString());
+               helixAdmin.dropInstance(cluster.getId().toString(), existingInstance);
+             }
+ 
+             @Override
+             public void onFailure(Throwable t) {
+               LOG.error("Could not deallocate container" + containerId + "for participant "
+                   + participant.getId(), t);
+               updateContainerState(helixAdmin, accessor, keyBuilder, cluster, participant.getId(),
+                   ContainerState.FAILED);
+             }
+           };
+           safeAddCallback(future, callback);
+         }
+ 
+         // stop but don't remove
+         for (final Participant participant : response.getContainersToStop()) {
+           // disable the node first
+           final InstanceConfig existingInstance =
+               helixAdmin.getInstanceConfig(cluster.getId().toString(), participant.getId()
+                   .toString());
+           final ContainerId containerId = existingInstance.getContainerId();
+           existingInstance.setInstanceEnabled(false);
+           existingInstance.setContainerState(ContainerState.HALTING);
+           accessor.updateProperty(keyBuilder.instanceConfig(participant.getId().toString()),
+               existingInstance);
+           // stop the container
+           LOG.info("Stopping container " + containerId + " for " + participant.getId());
+           ListenableFuture<Boolean> future = containerProvider.stopContainer(containerId);
+           FutureCallback<Boolean> callback = new FutureCallback<Boolean>() {
+             @Override
+             public void onSuccess(Boolean result) {
+               // Don't update the state here, wait for the live instance, but do send a shutdown
+               // message
+               LOG.info("Container " + containerId + " stopped for " + participant.getId());
+               if (participant.isAlive()) {
+                 Message message = new Message(MessageType.SHUTDOWN, UUID.randomUUID().toString());
+                 message.setTgtName(participant.getId().toString());
+                 message.setTgtSessionId(participant.getRunningInstance().getSessionId());
+                 message.setMsgId(message.getId());
+                 accessor.createProperty(
+                     keyBuilder.message(participant.getId().toString(), message.getId()), message);
+               }
+             }
+ 
+             @Override
+             public void onFailure(Throwable t) {
+               LOG.error(
+                   "Could not stop container" + containerId + "for participant "
+                       + participant.getId(), t);
+               updateContainerState(helixAdmin, accessor, keyBuilder, cluster, participant.getId(),
+                   ContainerState.FAILED);
+             }
+           };
+           safeAddCallback(future, callback);
+         }
+       }
+     }
+   }
+ 
+   /**
+    * Update a participant with a new container state
+    * @param helixAdmin
+    * @param accessor
+    * @param keyBuilder
+    * @param cluster
+    * @param participantId
+    * @param state
+    */
+   private void updateContainerState(HelixAdmin helixAdmin, HelixDataAccessor accessor,
+       PropertyKey.Builder keyBuilder, Cluster cluster, ParticipantId participantId,
+       ContainerState state) {
+     InstanceConfig existingInstance =
+         helixAdmin.getInstanceConfig(cluster.getId().toString(), participantId.toString());
+     existingInstance.setContainerState(state);
+     existingInstance.setInstanceEnabled(state.equals(ContainerState.CONNECTED));
+     accessor.updateProperty(keyBuilder.instanceConfig(participantId.toString()), existingInstance);
+   }
+ 
+   /**
+    * Add a callback, failing if the add fails
+    * @param future the future to listen on
+    * @param callback the callback to invoke
+    */
+   private <T> void safeAddCallback(ListenableFuture<T> future, FutureCallback<T> callback) {
+     try {
+       Futures.addCallback(future, callback);
+     } catch (Throwable t) {
+       callback.onFailure(t);
+     }
+   }
+ }

http://git-wip-us.apache.org/repos/asf/helix/blob/713586c4/helix-core/src/main/java/org/apache/helix/controller/stages/ResourceComputationStage.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/helix/blob/713586c4/helix-core/src/main/java/org/apache/helix/controller/strategy/AutoRebalanceStrategy.java
----------------------------------------------------------------------
diff --cc helix-core/src/main/java/org/apache/helix/controller/strategy/AutoRebalanceStrategy.java
index 5a8de20,bff7e46..09b66c1
--- a/helix-core/src/main/java/org/apache/helix/controller/strategy/AutoRebalanceStrategy.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/strategy/AutoRebalanceStrategy.java
@@@ -510,13 -491,12 +510,13 @@@ public class AutoRebalanceStrategy 
     * @return The current assignments that do not conform to the preferred assignment
     */
    private Map<Replica, Node> computeExistingNonPreferredPlacement(
 -      Map<String, Map<String, String>> currentMapping) {
 +      Map<PartitionId, Map<ParticipantId, State>> currentMapping) {
      Map<Replica, Node> existingNonPreferredAssignment = new TreeMap<Replica, Node>();
      int count = countStateReplicas();
 -    for (String partition : currentMapping.keySet()) {
 -      Map<String, String> nodeStateMap = currentMapping.get(partition);
 -      for (String nodeId : nodeStateMap.keySet()) {
 +    for (PartitionId partition : currentMapping.keySet()) {
 +      Map<ParticipantId, State> nodeStateMap = currentMapping.get(partition);
++      nodeStateMap.keySet().retainAll(_nodeMap.keySet());
 +      for (ParticipantId nodeId : nodeStateMap.keySet()) {
-         nodeStateMap.keySet().retainAll(_nodeMap.keySet());
          Node node = _nodeMap.get(nodeId);
          boolean skip = false;
          for (Replica replica : node.preferred) {
@@@ -580,13 -560,12 +580,13 @@@
     * @return Assignments that conform to the preferred placement
     */
    private Map<Replica, Node> computeExistingPreferredPlacement(
 -      final Map<String, Map<String, String>> currentMapping) {
 +      final Map<PartitionId, Map<ParticipantId, State>> currentMapping) {
      Map<Replica, Node> existingPreferredAssignment = new TreeMap<Replica, Node>();
      int count = countStateReplicas();
 -    for (String partition : currentMapping.keySet()) {
 -      Map<String, String> nodeStateMap = currentMapping.get(partition);
 -      for (String nodeId : nodeStateMap.keySet()) {
 +    for (PartitionId partition : currentMapping.keySet()) {
 +      Map<ParticipantId, State> nodeStateMap = currentMapping.get(partition);
++      nodeStateMap.keySet().retainAll(_nodeMap.keySet());
 +      for (ParticipantId nodeId : nodeStateMap.keySet()) {
-         nodeStateMap.keySet().retainAll(_nodeMap.keySet());
          Node node = _nodeMap.get(nodeId);
          node.currentlyAssigned = node.currentlyAssigned + 1;
          // check if its in one of the preferred position

http://git-wip-us.apache.org/repos/asf/helix/blob/713586c4/helix-core/src/main/java/org/apache/helix/controller/strategy/knapsack/AbstractBaseKnapsackSolver.java
----------------------------------------------------------------------
diff --cc helix-core/src/main/java/org/apache/helix/controller/strategy/knapsack/AbstractBaseKnapsackSolver.java
index 0000000,4d27bd7..a0de0e7
mode 000000,100644..100644
--- a/helix-core/src/main/java/org/apache/helix/controller/strategy/knapsack/AbstractBaseKnapsackSolver.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/strategy/knapsack/AbstractBaseKnapsackSolver.java
@@@ -1,0 -1,32 +1,51 @@@
+ package org.apache.helix.controller.strategy.knapsack;
+ 
++/*
++ * Licensed to the Apache Software Foundation (ASF) under one
++ * or more contributor license agreements.  See the NOTICE file
++ * distributed with this work for additional information
++ * regarding copyright ownership.  The ASF licenses this file
++ * to you under the Apache License, Version 2.0 (the
++ * "License"); you may not use this file except in compliance
++ * with the License.  You may obtain a copy of the License at
++ *
++ *   http://www.apache.org/licenses/LICENSE-2.0
++ *
++ * Unless required by applicable law or agreed to in writing,
++ * software distributed under the License is distributed on an
++ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
++ * KIND, either express or implied.  See the License for the
++ * specific language governing permissions and limitations
++ * under the License.
++ */
++
+ /**
+  * Common implementation of a knapsack solver<br/>
+  * <br/>
+  * Based on the C++ knapsack solver in Google's or-tools package.
+  */
+ public abstract class AbstractBaseKnapsackSolver implements BaseKnapsackSolver {
+   private final String _solverName;
+ 
+   /**
+    * Initialize the solver
+    * @param solverName the name of the solvers
+    */
+   public AbstractBaseKnapsackSolver(final String solverName) {
+     _solverName = solverName;
+   }
+ 
+   @Override
+   public long[] getLowerAndUpperBoundWhenItem(int itemId, boolean isItemIn, long lowerBound,
+       long upperBound) {
+     return new long[] {
+         0L, Long.MAX_VALUE
+     };
+   }
+ 
+   @Override
+   public String getName() {
+     return _solverName;
+   }
+ 
+ }

http://git-wip-us.apache.org/repos/asf/helix/blob/713586c4/helix-core/src/main/java/org/apache/helix/controller/strategy/knapsack/AbstractKnapsackPropagator.java
----------------------------------------------------------------------
diff --cc helix-core/src/main/java/org/apache/helix/controller/strategy/knapsack/AbstractKnapsackPropagator.java
index 0000000,0663990..6f9de66
mode 000000,100644..100644
--- a/helix-core/src/main/java/org/apache/helix/controller/strategy/knapsack/AbstractKnapsackPropagator.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/strategy/knapsack/AbstractKnapsackPropagator.java
@@@ -1,0 -1,104 +1,123 @@@
+ package org.apache.helix.controller.strategy.knapsack;
+ 
++/*
++ * Licensed to the Apache Software Foundation (ASF) under one
++ * or more contributor license agreements.  See the NOTICE file
++ * distributed with this work for additional information
++ * regarding copyright ownership.  The ASF licenses this file
++ * to you under the Apache License, Version 2.0 (the
++ * "License"); you may not use this file except in compliance
++ * with the License.  You may obtain a copy of the License at
++ *
++ *   http://www.apache.org/licenses/LICENSE-2.0
++ *
++ * Unless required by applicable law or agreed to in writing,
++ * software distributed under the License is distributed on an
++ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
++ * KIND, either express or implied.  See the License for the
++ * specific language governing permissions and limitations
++ * under the License.
++ */
++
+ import java.util.ArrayList;
+ 
+ /**
+  * Common implementation of a knapsack constraint satisfier<br/>
+  * <br/>
+  * Based on the C++ knapsack solver in Google's or-tools package.
+  */
+ public abstract class AbstractKnapsackPropagator implements KnapsackPropagator {
+   private ArrayList<KnapsackItem> _items;
+   private long _currentProfit;
+   private long _profitLowerBound;
+   private long _profitUpperBound;
+   private KnapsackState _state;
+ 
+   /**
+    * Initialize the propagator
+    * @param state the current knapsack state
+    */
+   public AbstractKnapsackPropagator(final KnapsackState state) {
+     _items = new ArrayList<KnapsackItem>();
+     _currentProfit = 0L;
+     _profitLowerBound = 0L;
+     _profitUpperBound = Long.MAX_VALUE;
+     _state = state;
+   }
+ 
+   @Override
+   public void init(ArrayList<Long> profits, ArrayList<Long> weights) {
+     final int numberOfItems = profits.size();
+     _items.clear();
+     for (int i = 0; i < numberOfItems; i++) {
+       _items.add(new KnapsackItem(i, weights.get(i), profits.get(i)));
+     }
+     _currentProfit = 0;
+     _profitLowerBound = Long.MIN_VALUE;
+     _profitUpperBound = Long.MAX_VALUE;
+     initPropagator();
+   }
+ 
+   @Override
+   public boolean update(boolean revert, KnapsackAssignment assignment) {
+     if (assignment.isIn) {
+       if (revert) {
+         _currentProfit -= _items.get(assignment.itemId).profit;
+       } else {
+         _currentProfit += _items.get(assignment.itemId).profit;
+       }
+     }
+     return updatePropagator(revert, assignment);
+   }
+ 
+   @Override
+   public long currentProfit() {
+     return _currentProfit;
+   }
+ 
+   @Override
+   public long profitLowerBound() {
+     return _profitLowerBound;
+   }
+ 
+   @Override
+   public long profitUpperBound() {
+     return _profitUpperBound;
+   }
+ 
+   @Override
+   public void copyCurrentStateToSolution(boolean hasOnePropagator, ArrayList<Boolean> solution) {
+     if (solution == null) {
+       throw new RuntimeException("solution cannot be null!");
+     }
+     for (KnapsackItem item : _items) {
+       final int itemId = item.id;
+       solution.set(itemId, _state.isBound(itemId) && _state.isIn(itemId));
+     }
+     if (hasOnePropagator) {
+       copyCurrentStateToSolutionPropagator(solution);
+     }
+   }
+ 
+   protected abstract void initPropagator();
+ 
+   protected abstract boolean updatePropagator(boolean revert, final KnapsackAssignment assignment);
+ 
+   protected abstract void copyCurrentStateToSolutionPropagator(ArrayList<Boolean> solution);
+ 
+   protected KnapsackState state() {
+     return _state;
+   }
+ 
+   protected ArrayList<KnapsackItem> items() {
+     return _items;
+   }
+ 
+   protected void setProfitLowerBound(long profit) {
+     _profitLowerBound = profit;
+   }
+ 
+   protected void setProfitUpperBound(long profit) {
+     _profitUpperBound = profit;
+   }
+ }

http://git-wip-us.apache.org/repos/asf/helix/blob/713586c4/helix-core/src/main/java/org/apache/helix/controller/strategy/knapsack/BaseKnapsackSolver.java
----------------------------------------------------------------------
diff --cc helix-core/src/main/java/org/apache/helix/controller/strategy/knapsack/BaseKnapsackSolver.java
index 0000000,1d71a22..51221e5
mode 000000,100644..100644
--- a/helix-core/src/main/java/org/apache/helix/controller/strategy/knapsack/BaseKnapsackSolver.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/strategy/knapsack/BaseKnapsackSolver.java
@@@ -1,0 -1,49 +1,68 @@@
+ package org.apache.helix.controller.strategy.knapsack;
+ 
++/*
++ * Licensed to the Apache Software Foundation (ASF) under one
++ * or more contributor license agreements.  See the NOTICE file
++ * distributed with this work for additional information
++ * regarding copyright ownership.  The ASF licenses this file
++ * to you under the Apache License, Version 2.0 (the
++ * "License"); you may not use this file except in compliance
++ * with the License.  You may obtain a copy of the License at
++ *
++ *   http://www.apache.org/licenses/LICENSE-2.0
++ *
++ * Unless required by applicable law or agreed to in writing,
++ * software distributed under the License is distributed on an
++ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
++ * KIND, either express or implied.  See the License for the
++ * specific language governing permissions and limitations
++ * under the License.
++ */
++
+ import java.util.ArrayList;
+ 
+ /**
+  * The interface of any multidimensional knapsack solver<br/>
+  * <br/>
+  * Based on the C++ knapsack solver in Google's or-tools package.
+  */
+ public interface BaseKnapsackSolver {
+   /**
+    * Initialize the solver
+    * @param profits profit of adding each item to the knapsack
+    * @param weights cost of adding each item in each dimension
+    * @param capacities maximum weight per dimension
+    */
+   void init(final ArrayList<Long> profits, final ArrayList<ArrayList<Long>> weights,
+       final ArrayList<Long> capacities);
+ 
+   /**
+    * Compute an upper and lower bound on the knapsack given the assignment state of the knapsack
+    * @param itemId the item id
+    * @param isItemIn true if the item is in the knapsack, false otherwise
+    * @param lowerBound the current lower bound
+    * @param upperBound the current upper bound
+    * @return the new lower and upper bounds
+    */
+   long[] getLowerAndUpperBoundWhenItem(int itemId, boolean isItemIn, long lowerBound,
+       long upperBound);
+ 
+   /**
+    * Solve the knapsack problem
+    * @return the (approximate) optimal profit
+    */
+   long solve();
+ 
+   /**
+    * Check if an item is in the final solution
+    * @param itemId the item id
+    * @return true if the item is present, false otherwise
+    */
+   boolean bestSolution(int itemId);
+ 
+   /**
+    * Get the solver name
+    * @return solver name
+    */
+   String getName();
+ }

http://git-wip-us.apache.org/repos/asf/helix/blob/713586c4/helix-core/src/main/java/org/apache/helix/controller/strategy/knapsack/KnapsackAssignment.java
----------------------------------------------------------------------
diff --cc helix-core/src/main/java/org/apache/helix/controller/strategy/knapsack/KnapsackAssignment.java
index 0000000,bfd29d7..50f58a7
mode 000000,100644..100644
--- a/helix-core/src/main/java/org/apache/helix/controller/strategy/knapsack/KnapsackAssignment.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/strategy/knapsack/KnapsackAssignment.java
@@@ -1,0 -1,21 +1,40 @@@
+ package org.apache.helix.controller.strategy.knapsack;
+ 
++/*
++ * Licensed to the Apache Software Foundation (ASF) under one
++ * or more contributor license agreements.  See the NOTICE file
++ * distributed with this work for additional information
++ * regarding copyright ownership.  The ASF licenses this file
++ * to you under the Apache License, Version 2.0 (the
++ * "License"); you may not use this file except in compliance
++ * with the License.  You may obtain a copy of the License at
++ *
++ *   http://www.apache.org/licenses/LICENSE-2.0
++ *
++ * Unless required by applicable law or agreed to in writing,
++ * software distributed under the License is distributed on an
++ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
++ * KIND, either express or implied.  See the License for the
++ * specific language governing permissions and limitations
++ * under the License.
++ */
++
+ /**
+  * The assignment of a knapsack item to a knapsack<br/>
+  * <br/>
+  * Based on the C++ knapsack solver in Google's or-tools package.
+  */
+ public class KnapsackAssignment {
+   public int itemId;
+   public boolean isIn;
+ 
+   /**
+    * Create the assignment
+    * @param itemId the item id
+    * @param isIn true if the item is in the knapsack, false otherwise
+    */
+   public KnapsackAssignment(int itemId, boolean isIn) {
+     this.itemId = itemId;
+     this.isIn = isIn;
+   }
+ }

http://git-wip-us.apache.org/repos/asf/helix/blob/713586c4/helix-core/src/main/java/org/apache/helix/controller/strategy/knapsack/KnapsackCapacityPropagatorImpl.java
----------------------------------------------------------------------
diff --cc helix-core/src/main/java/org/apache/helix/controller/strategy/knapsack/KnapsackCapacityPropagatorImpl.java
index 0000000,357cc2a..e630b3c
mode 000000,100644..100644
--- a/helix-core/src/main/java/org/apache/helix/controller/strategy/knapsack/KnapsackCapacityPropagatorImpl.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/strategy/knapsack/KnapsackCapacityPropagatorImpl.java
@@@ -1,0 -1,218 +1,237 @@@
+ package org.apache.helix.controller.strategy.knapsack;
+ 
++/*
++ * Licensed to the Apache Software Foundation (ASF) under one
++ * or more contributor license agreements.  See the NOTICE file
++ * distributed with this work for additional information
++ * regarding copyright ownership.  The ASF licenses this file
++ * to you under the Apache License, Version 2.0 (the
++ * "License"); you may not use this file except in compliance
++ * with the License.  You may obtain a copy of the License at
++ *
++ *   http://www.apache.org/licenses/LICENSE-2.0
++ *
++ * Unless required by applicable law or agreed to in writing,
++ * software distributed under the License is distributed on an
++ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
++ * KIND, either express or implied.  See the License for the
++ * specific language governing permissions and limitations
++ * under the License.
++ */
++
+ import java.util.ArrayList;
+ import java.util.Collections;
+ import java.util.Comparator;
+ 
+ /**
+  * A knapsack propagator that constrains assignments based on knapsack capacity for a given
+  * dimension<br/>
+  * <br/>
+  * Based on the C++ knapsack solver in Google's or-tools package.
+  */
+ public class KnapsackCapacityPropagatorImpl extends AbstractKnapsackPropagator {
+   private static final long ALL_BITS_64 = 0xFFFFFFFFFFFFFFFFL;
+   private static final int NO_SELECTION = -1;
+ 
+   private long _capacity;
+   private long _consumedCapacity;
+   private int _breakItemId;
+   private ArrayList<KnapsackItem> _sortedItems;
+   private long _profitMax;
+ 
+   /**
+    * Initialize the propagator
+    * @param state the current knapsack state
+    * @param capacity the knapsack capacity for this dimension
+    */
+   public KnapsackCapacityPropagatorImpl(KnapsackState state, long capacity) {
+     super(state);
+     _capacity = capacity;
+     _consumedCapacity = 0L;
+     _breakItemId = NO_SELECTION;
+     _sortedItems = new ArrayList<KnapsackItem>();
+     _profitMax = 0L;
+   }
+ 
+   @Override
+   public void computeProfitBounds() {
+     setProfitLowerBound(currentProfit());
+     _breakItemId = NO_SELECTION;
+ 
+     long remainingCapacity = _capacity - _consumedCapacity;
+     int breakSortedItemId = NO_SELECTION;
+     final int numberOfSortedItems = _sortedItems.size();
+     for (int sortedId = 0; sortedId < numberOfSortedItems; sortedId++) {
+       final KnapsackItem item = _sortedItems.get(sortedId);
+       if (!state().isBound(item.id)) {
+         _breakItemId = item.id;
+ 
+         if (remainingCapacity >= item.weight) {
+           remainingCapacity -= item.weight;
+           setProfitLowerBound(profitLowerBound() + item.profit);
+         } else {
+           breakSortedItemId = sortedId;
+           break;
+         }
+       }
+     }
+     setProfitUpperBound(profitLowerBound());
+     if (breakSortedItemId != NO_SELECTION) {
+       final long additionalProfit = getAdditionalProfit(remainingCapacity, breakSortedItemId);
+       setProfitUpperBound(profitUpperBound() + additionalProfit);
+     }
+   }
+ 
+   @Override
+   public int getNextItemId() {
+     return _breakItemId;
+   }
+ 
+   @Override
+   protected void initPropagator() {
+     _consumedCapacity = 0L;
+     _breakItemId = NO_SELECTION;
+     _sortedItems = new ArrayList<KnapsackItem>(items());
+     _profitMax = 0L;
+     for (KnapsackItem item : _sortedItems) {
+       _profitMax = Math.max(_profitMax, item.profit);
+     }
+     _profitMax++;
+     Collections.sort(_sortedItems, new KnapsackItemDecreasingEfficiencyComparator(_profitMax));
+   }
+ 
+   @Override
+   protected boolean updatePropagator(boolean revert, KnapsackAssignment assignment) {
+     if (assignment.isIn) {
+       if (revert) {
+         _consumedCapacity -= items().get(assignment.itemId).weight;
+       } else {
+         _consumedCapacity += items().get(assignment.itemId).weight;
+         if (_consumedCapacity > _capacity) {
+           return false;
+         }
+       }
+     }
+     return true;
+   }
+ 
+   @Override
+   protected void copyCurrentStateToSolutionPropagator(ArrayList<Boolean> solution) {
+     if (solution == null) {
+       throw new RuntimeException("solution cannot be null!");
+     }
+     long remainingCapacity = _capacity - _consumedCapacity;
+     for (KnapsackItem item : _sortedItems) {
+       if (!state().isBound(item.id)) {
+         if (remainingCapacity >= item.weight) {
+           remainingCapacity -= item.weight;
+           solution.set(item.id, true);
+         } else {
+           return;
+         }
+       }
+     }
+   }
+ 
+   private long getAdditionalProfit(long remainingCapacity, int breakItemId) {
+     final int afterBreakItemId = breakItemId + 1;
+     long additionalProfitWhenNoBreakItem = 0L;
+     if (afterBreakItemId < _sortedItems.size()) {
+       final long nextWeight = _sortedItems.get(afterBreakItemId).weight;
+       final long nextProfit = _sortedItems.get(afterBreakItemId).profit;
+       additionalProfitWhenNoBreakItem =
+           upperBoundOfRatio(remainingCapacity, nextProfit, nextWeight);
+     }
+ 
+     final int beforeBreakItemId = breakItemId - 1;
+     long additionalProfitWhenBreakItem = 0L;
+     if (beforeBreakItemId >= 0) {
+       final long previousWeight = _sortedItems.get(beforeBreakItemId).weight;
+       if (previousWeight != 0) {
+         final long previousProfit = _sortedItems.get(beforeBreakItemId).profit;
+         final long overusedCapacity = _sortedItems.get(breakItemId).weight - remainingCapacity;
+         final long ratio = upperBoundOfRatio(overusedCapacity, previousProfit, previousWeight);
+ 
+         additionalProfitWhenBreakItem = _sortedItems.get(breakItemId).profit - ratio;
+       }
+     }
+ 
+     final long additionalProfit =
+         Math.max(additionalProfitWhenNoBreakItem, additionalProfitWhenBreakItem);
+     return additionalProfit;
+   }
+ 
+   private int mostSignificantBitsPosition64(long n) {
+     int b = 0;
+     if (0 != (n & (ALL_BITS_64 << (1 << 5)))) {
+       b |= (1 << 5);
+       n >>= (1 << 5);
+     }
+     if (0 != (n & (ALL_BITS_64 << (1 << 4)))) {
+       b |= (1 << 4);
+       n >>= (1 << 4);
+     }
+     if (0 != (n & (ALL_BITS_64 << (1 << 3)))) {
+       b |= (1 << 3);
+       n >>= (1 << 3);
+     }
+     if (0 != (n & (ALL_BITS_64 << (1 << 2)))) {
+       b |= (1 << 2);
+       n >>= (1 << 2);
+     }
+     if (0 != (n & (ALL_BITS_64 << (1 << 1)))) {
+       b |= (1 << 1);
+       n >>= (1 << 1);
+     }
+     if (0 != (n & (ALL_BITS_64 << (1 << 0)))) {
+       b |= (1 << 0);
+     }
+     return b;
+   }
+ 
+   private boolean willProductOverflow(long value1, long value2) {
+     final int mostSignificantBitsPosition1 = mostSignificantBitsPosition64(value1);
+     final int mostSignificantBitsPosition2 = mostSignificantBitsPosition64(value2);
+     final int overflow = 61;
+     return mostSignificantBitsPosition1 + mostSignificantBitsPosition2 > overflow;
+   }
+ 
+   private long upperBoundOfRatio(long numerator1, long numerator2, long denominator) {
+     if (!willProductOverflow(numerator1, numerator2)) {
+       final long numerator = numerator1 * numerator2;
+       final long result = numerator / denominator;
+       return result;
+     } else {
+       final double ratio = (((double) numerator1) * ((double) numerator2)) / ((double) denominator);
+       final long result = ((long) Math.floor(ratio + 0.5));
+       return result;
+     }
+   }
+ 
+   /**
+    * A special comparator that orders knapsack items by decreasing efficiency (profit to weight
+    * ratio)
+    */
+   private static class KnapsackItemDecreasingEfficiencyComparator implements
+       Comparator<KnapsackItem> {
+     private final long _profitMax;
+ 
+     public KnapsackItemDecreasingEfficiencyComparator(long profitMax) {
+       _profitMax = profitMax;
+     }
+ 
+     @Override
+     public int compare(KnapsackItem item1, KnapsackItem item2) {
+       double eff1 = item1.getEfficiency(_profitMax);
+       double eff2 = item2.getEfficiency(_profitMax);
+       if (eff1 < eff2) {
+         return 1;
+       } else if (eff1 > eff2) {
+         return -1;
+       } else {
+         return 0;
+       }
+     }
+ 
+   }
+ }

http://git-wip-us.apache.org/repos/asf/helix/blob/713586c4/helix-core/src/main/java/org/apache/helix/controller/strategy/knapsack/KnapsackGenericSolverImpl.java
----------------------------------------------------------------------
diff --cc helix-core/src/main/java/org/apache/helix/controller/strategy/knapsack/KnapsackGenericSolverImpl.java
index 0000000,1bf1d3f..dec9b2d
mode 000000,100644..100644
--- a/helix-core/src/main/java/org/apache/helix/controller/strategy/knapsack/KnapsackGenericSolverImpl.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/strategy/knapsack/KnapsackGenericSolverImpl.java
@@@ -1,0 -1,269 +1,288 @@@
+ package org.apache.helix.controller.strategy.knapsack;
+ 
++/*
++ * Licensed to the Apache Software Foundation (ASF) under one
++ * or more contributor license agreements.  See the NOTICE file
++ * distributed with this work for additional information
++ * regarding copyright ownership.  The ASF licenses this file
++ * to you under the Apache License, Version 2.0 (the
++ * "License"); you may not use this file except in compliance
++ * with the License.  You may obtain a copy of the License at
++ *
++ *   http://www.apache.org/licenses/LICENSE-2.0
++ *
++ * Unless required by applicable law or agreed to in writing,
++ * software distributed under the License is distributed on an
++ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
++ * KIND, either express or implied.  See the License for the
++ * specific language governing permissions and limitations
++ * under the License.
++ */
++
+ import java.util.ArrayList;
+ import java.util.Comparator;
+ import java.util.PriorityQueue;
+ 
+ /**
+  * A generic knapsack solver that supports multiple dimensions<br/>
+  * <br/>
+  * Based on the C++ knapsack solver in Google's or-tools package.
+  */
+ public class KnapsackGenericSolverImpl extends AbstractBaseKnapsackSolver {
+   private static final int MASTER_PROPAGATOR_ID = 0;
+   private static final int NO_SELECTION = -1;
+ 
+   private ArrayList<KnapsackPropagator> _propagators;
+   private int _masterPropagatorId;
+   private ArrayList<KnapsackSearchNode> _searchNodes;
+   private KnapsackState _state;
+   private long _bestSolutionProfit;
+   private ArrayList<Boolean> _bestSolution;
+ 
+   /**
+    * Create the solver
+    * @param solverName name of the solver
+    */
+   public KnapsackGenericSolverImpl(String solverName) {
+     super(solverName);
+     _propagators = new ArrayList<KnapsackPropagator>();
+     _masterPropagatorId = MASTER_PROPAGATOR_ID;
+     _searchNodes = new ArrayList<KnapsackSearchNode>();
+     _state = new KnapsackStateImpl();
+     _bestSolutionProfit = 0L;
+     _bestSolution = new ArrayList<Boolean>();
+   }
+ 
+   @Override
+   public void init(ArrayList<Long> profits, ArrayList<ArrayList<Long>> weights,
+       ArrayList<Long> capacities) {
+     clear();
+     final int numberOfItems = profits.size();
+     final int numberOfDimensions = weights.size();
+     _state.init(numberOfItems);
+ 
+     _bestSolution.clear();
+     for (int i = 0; i < numberOfItems; i++) {
+       _bestSolution.add(false);
+     }
+ 
+     for (int i = 0; i < numberOfDimensions; i++) {
+       KnapsackPropagator propagator = new KnapsackCapacityPropagatorImpl(_state, capacities.get(i));
+       propagator.init(profits, weights.get(i));
+       _propagators.add(propagator);
+     }
+     _masterPropagatorId = MASTER_PROPAGATOR_ID;
+   }
+ 
+   public int getNumberOfItems() {
+     return _state.getNumberOfItems();
+   }
+ 
+   @Override
+   public long[] getLowerAndUpperBoundWhenItem(int itemId, boolean isItemIn, long lowerBound,
+       long upperBound) {
+     long[] result = {
+         lowerBound, upperBound
+     };
+     KnapsackAssignment assignment = new KnapsackAssignment(itemId, isItemIn);
+     final boolean fail = !incrementalUpdate(false, assignment);
+     if (fail) {
+       result[0] = 0L;
+       result[1] = 0L;
+     } else {
+       result[0] =
+           (hasOnePropagator()) ? _propagators.get(_masterPropagatorId).profitLowerBound() : 0L;
+       result[1] = getAggregatedProfitUpperBound();
+     }
+ 
+     final boolean failRevert = !incrementalUpdate(true, assignment);
+     if (failRevert) {
+       result[0] = 0L;
+       result[1] = 0L;
+     }
+     return result;
+   }
+ 
+   public void setMasterPropagatorId(int masterPropagatorId) {
+     _masterPropagatorId = masterPropagatorId;
+   }
+ 
+   @Override
+   public long solve() {
+     _bestSolutionProfit = 0L;
+     PriorityQueue<KnapsackSearchNode> searchQueue =
+         new PriorityQueue<KnapsackSearchNode>(11,
+             new KnapsackSearchNodeInDecreasingUpperBoundComparator());
+     KnapsackAssignment assignment = new KnapsackAssignment(NO_SELECTION, true);
+     KnapsackSearchNode rootNode = new KnapsackSearchNodeImpl(null, assignment);
+     rootNode.setCurrentProfit(getCurrentProfit());
+     rootNode.setProfitUpperBound(getAggregatedProfitUpperBound());
+     rootNode.setNextItemId(getNextItemId());
+     _searchNodes.add(rootNode);
+ 
+     if (makeNewNode(rootNode, false)) {
+       searchQueue.add(_searchNodes.get(_searchNodes.size() - 1));
+     }
+     if (makeNewNode(rootNode, true)) {
+       searchQueue.add(_searchNodes.get(_searchNodes.size() - 1));
+     }
+ 
+     KnapsackSearchNode currentNode = rootNode;
+     while (!searchQueue.isEmpty() && searchQueue.peek().profitUpperBound() > _bestSolutionProfit) {
+       KnapsackSearchNode node = searchQueue.poll();
+ 
+       // TODO: check if equality is enough
+       if (node != currentNode) {
+         KnapsackSearchPath path = new KnapsackSearchPathImpl(currentNode, node);
+         path.init();
+         final boolean noFail = updatePropagators(path);
+         currentNode = node;
+         if (!noFail) {
+           throw new RuntimeException("solver failed to update propagators");
+         }
+       }
+ 
+       if (makeNewNode(node, false)) {
+         searchQueue.add(_searchNodes.get(_searchNodes.size() - 1));
+       }
+       if (makeNewNode(node, true)) {
+         searchQueue.add(_searchNodes.get(_searchNodes.size() - 1));
+       }
+     }
+     return _bestSolutionProfit;
+   }
+ 
+   @Override
+   public boolean bestSolution(int itemId) {
+     return _bestSolution.get(itemId);
+   }
+ 
+   private void clear() {
+     _propagators.clear();
+     _searchNodes.clear();
+   }
+ 
+   private boolean updatePropagators(final KnapsackSearchPath path) {
+     boolean noFail = true;
+     KnapsackSearchNode node = path.from();
+     KnapsackSearchNode via = path.via();
+     while (node != via) {
+       noFail = incrementalUpdate(true, node.assignment()) && noFail;
+       node = node.parent();
+     }
+     node = path.to();
+     while (node != via) {
+       noFail = incrementalUpdate(false, node.assignment()) && noFail;
+       node = node.parent();
+     }
+     return noFail;
+   }
+ 
+   private boolean incrementalUpdate(boolean revert, final KnapsackAssignment assignment) {
+     boolean noFail = _state.updateState(revert, assignment);
+     for (KnapsackPropagator propagator : _propagators) {
+       noFail = propagator.update(revert, assignment) && noFail;
+     }
+     return noFail;
+   }
+ 
+   private void updateBestSolution() {
+     final long profitLowerBound =
+         (hasOnePropagator()) ? _propagators.get(_masterPropagatorId).profitLowerBound()
+             : _propagators.get(_masterPropagatorId).currentProfit();
+ 
+     if (_bestSolutionProfit < profitLowerBound) {
+       _bestSolutionProfit = profitLowerBound;
+       _propagators.get(_masterPropagatorId).copyCurrentStateToSolution(hasOnePropagator(),
+           _bestSolution);
+     }
+   }
+ 
+   private boolean makeNewNode(final KnapsackSearchNode node, boolean isIn) {
+     if (node.nextItemId() == NO_SELECTION) {
+       return false;
+     }
+     KnapsackAssignment assignment = new KnapsackAssignment(node.nextItemId(), isIn);
+     KnapsackSearchNode newNode = new KnapsackSearchNodeImpl(node, assignment);
+ 
+     KnapsackSearchPath path = new KnapsackSearchPathImpl(node, newNode);
+     path.init();
+     final boolean noFail = updatePropagators(path);
+     if (noFail) {
+       newNode.setCurrentProfit(getCurrentProfit());
+       newNode.setProfitUpperBound(getAggregatedProfitUpperBound());
+       newNode.setNextItemId(getNextItemId());
+       updateBestSolution();
+     }
+ 
+     KnapsackSearchPath revertPath = new KnapsackSearchPathImpl(newNode, node);
+     revertPath.init();
+     updatePropagators(revertPath);
+ 
+     if (!noFail || newNode.profitUpperBound() < _bestSolutionProfit) {
+       return false;
+     }
+ 
+     KnapsackSearchNode relevantNode = new KnapsackSearchNodeImpl(node, assignment);
+     relevantNode.setCurrentProfit(newNode.currentProfit());
+     relevantNode.setProfitUpperBound(newNode.profitUpperBound());
+     relevantNode.setNextItemId(newNode.nextItemId());
+     _searchNodes.add(relevantNode);
+ 
+     return true;
+   }
+ 
+   private long getAggregatedProfitUpperBound() {
+     long upperBound = Long.MAX_VALUE;
+     for (KnapsackPropagator propagator : _propagators) {
+       propagator.computeProfitBounds();
+       final long propagatorUpperBound = propagator.profitUpperBound();
+       upperBound = Math.min(upperBound, propagatorUpperBound);
+     }
+     return upperBound;
+   }
+ 
+   private boolean hasOnePropagator() {
+     return _propagators.size() == 1;
+   }
+ 
+   private long getCurrentProfit() {
+     return _propagators.get(_masterPropagatorId).currentProfit();
+   }
+ 
+   private int getNextItemId() {
+     return _propagators.get(_masterPropagatorId).getNextItemId();
+   }
+ 
+   /**
+    * A special comparator that orders knapsack search nodes in decreasing potential profit order
+    */
+   // TODO: check order
+   private static class KnapsackSearchNodeInDecreasingUpperBoundComparator implements
+       Comparator<KnapsackSearchNode> {
+     @Override
+     public int compare(KnapsackSearchNode node1, KnapsackSearchNode node2) {
+       final long profitUpperBound1 = node1.profitUpperBound();
+       final long profitUpperBound2 = node2.profitUpperBound();
+       if (profitUpperBound1 == profitUpperBound2) {
+         final long currentProfit1 = node1.currentProfit();
+         final long currentProfit2 = node2.currentProfit();
+         if (currentProfit1 > currentProfit2) {
+           return -1;
+         } else if (currentProfit1 < currentProfit2) {
+           return 1;
+         } else {
+           return 0;
+         }
+       }
+       if (profitUpperBound1 > profitUpperBound2) {
+         return -1;
+       } else if (profitUpperBound1 < profitUpperBound2) {
+         return 1;
+       } else {
+         return 0;
+       }
+     }
+ 
+   }
+ }

http://git-wip-us.apache.org/repos/asf/helix/blob/713586c4/helix-core/src/main/java/org/apache/helix/controller/strategy/knapsack/KnapsackItem.java
----------------------------------------------------------------------
diff --cc helix-core/src/main/java/org/apache/helix/controller/strategy/knapsack/KnapsackItem.java
index 0000000,3996816..70824a9
mode 000000,100644..100644
--- a/helix-core/src/main/java/org/apache/helix/controller/strategy/knapsack/KnapsackItem.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/strategy/knapsack/KnapsackItem.java
@@@ -1,0 -1,33 +1,52 @@@
+ package org.apache.helix.controller.strategy.knapsack;
+ 
++/*
++ * Licensed to the Apache Software Foundation (ASF) under one
++ * or more contributor license agreements.  See the NOTICE file
++ * distributed with this work for additional information
++ * regarding copyright ownership.  The ASF licenses this file
++ * to you under the Apache License, Version 2.0 (the
++ * "License"); you may not use this file except in compliance
++ * with the License.  You may obtain a copy of the License at
++ *
++ *   http://www.apache.org/licenses/LICENSE-2.0
++ *
++ * Unless required by applicable law or agreed to in writing,
++ * software distributed under the License is distributed on an
++ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
++ * KIND, either express or implied.  See the License for the
++ * specific language governing permissions and limitations
++ * under the License.
++ */
++
+ /**
+  * Basic structure of an item in a knapsack<br/>
+  * <br/>
+  * Based on the C++ knapsack solver in Google's or-tools package.
+  */
+ public class KnapsackItem {
+   public final int id;
+   public final long weight;
+   public final long profit;
+ 
+   /**
+    * Initialize the item
+    * @param id the item id
+    * @param weight the cost to place the item in the knapsack for one dimension
+    * @param profit the benefit of placing the item in the knapsack
+    */
+   public KnapsackItem(int id, long weight, long profit) {
+     this.id = id;
+     this.weight = weight;
+     this.profit = profit;
+   }
+ 
+   /**
+    * Get the profit to weight ratio
+    * @param profitMax the maximum possible profit for this item
+    * @return the item addition effciency
+    */
+   public double getEfficiency(long profitMax) {
+     return (weight > 0) ? ((double) profit) / ((double) weight) : ((double) profitMax);
+   }
+ }

http://git-wip-us.apache.org/repos/asf/helix/blob/713586c4/helix-core/src/main/java/org/apache/helix/controller/strategy/knapsack/KnapsackPropagator.java
----------------------------------------------------------------------
diff --cc helix-core/src/main/java/org/apache/helix/controller/strategy/knapsack/KnapsackPropagator.java
index 0000000,702bf1e..cb3eca7
mode 000000,100644..100644
--- a/helix-core/src/main/java/org/apache/helix/controller/strategy/knapsack/KnapsackPropagator.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/strategy/knapsack/KnapsackPropagator.java
@@@ -1,0 -1,61 +1,80 @@@
+ package org.apache.helix.controller.strategy.knapsack;
+ 
++/*
++ * Licensed to the Apache Software Foundation (ASF) under one
++ * or more contributor license agreements.  See the NOTICE file
++ * distributed with this work for additional information
++ * regarding copyright ownership.  The ASF licenses this file
++ * to you under the Apache License, Version 2.0 (the
++ * "License"); you may not use this file except in compliance
++ * with the License.  You may obtain a copy of the License at
++ *
++ *   http://www.apache.org/licenses/LICENSE-2.0
++ *
++ * Unless required by applicable law or agreed to in writing,
++ * software distributed under the License is distributed on an
++ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
++ * KIND, either express or implied.  See the License for the
++ * specific language governing permissions and limitations
++ * under the License.
++ */
++
+ import java.util.ArrayList;
+ 
+ /**
+  * Constraint enforcer for a single dimenstion on a knapsack solution search<br/>
+  * <br/>
+  * Based on the C++ knapsack solver in Google's or-tools package.
+  */
+ public interface KnapsackPropagator {
+   /**
+    * Initialize the propagator
+    * @param profits profits for selecting each item
+    * @param weights weights of each item for this dimension
+    */
+   void init(final ArrayList<Long> profits, final ArrayList<Long> weights);
+ 
+   /**
+    * Update the search
+    * @param revert revert the assignment
+    * @param assignment the assignment to use for the update
+    * @return true if successful, false if failed
+    */
+   boolean update(boolean revert, final KnapsackAssignment assignment);
+ 
+   /**
+    * Compute the upper and lower bounds of potential profits
+    */
+   void computeProfitBounds();
+ 
+   /**
+    * Get the next item to use in the search
+    * @return item id
+    */
+   int getNextItemId();
+ 
+   /**
+    * Get the current profit of the search
+    * @return current profit
+    */
+   long currentProfit();
+ 
+   /**
+    * Get the lowest possible profit of the search
+    * @return profit lower bound
+    */
+   long profitLowerBound();
+ 
+   /**
+    * Get the highest possible profit of the search
+    * @return profit upper bound
+    */
+   long profitUpperBound();
+ 
+   /**
+    * Copy the current computed state to the final solution
+    * @param hasOnePropagator true if there is only one propagator, i.e. 1 dimension
+    * @param solution the solution vector
+    */
+   void copyCurrentStateToSolution(boolean hasOnePropagator, ArrayList<Boolean> solution);
+ }