You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by ka...@apache.org on 2014/07/10 19:05:33 UTC
[50/50] [abbrv] git commit: Merge remote-tracking branch
'origin/helix-provisioning'
Merge remote-tracking branch 'origin/helix-provisioning'
Project: http://git-wip-us.apache.org/repos/asf/helix/repo
Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/713586c4
Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/713586c4
Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/713586c4
Branch: refs/heads/master
Commit: 713586c4282feb47e411f66d43af60132ec54e64
Parents: 884e071 0f79187
Author: Kanak Biscuitwala <ka...@apache.org>
Authored: Thu Jul 10 10:02:44 2014 -0700
Committer: Kanak Biscuitwala <ka...@apache.org>
Committed: Thu Jul 10 10:02:44 2014 -0700
----------------------------------------------------------------------
helix-core/pom.xml | 4 +
.../java/org/apache/helix/HelixService.java | 8 +-
.../java/org/apache/helix/api/Controller.java | 4 +-
.../java/org/apache/helix/api/Participant.java | 14 +-
.../java/org/apache/helix/api/Resource.java | 34 +-
.../helix/api/accessor/ClusterAccessor.java | 11 +
.../helix/api/accessor/ParticipantAccessor.java | 15 +-
.../helix/api/accessor/ResourceAccessor.java | 14 +-
.../helix/api/config/ContainerConfig.java | 66 ++
.../apache/helix/api/config/ResourceConfig.java | 45 +-
.../controller/GenericHelixController.java | 2 +
.../context/ControllerContextProvider.java | 6 +-
.../controller/provisioner/ContainerId.java | 49 ++
.../provisioner/ContainerProvider.java | 36 ++
.../controller/provisioner/ContainerSpec.java | 65 ++
.../controller/provisioner/ContainerState.java | 34 ++
.../controller/provisioner/Provisioner.java | 42 ++
.../provisioner/ProvisionerConfig.java | 31 +
.../controller/provisioner/ProvisionerRef.java | 100 +++
.../controller/provisioner/ServiceConfig.java | 24 +
.../controller/provisioner/TargetProvider.java | 39 ++
.../provisioner/TargetProviderResponse.java | 68 +++
.../stages/ContainerProvisioningStage.java | 332 ++++++++++
.../stages/ResourceComputationStage.java | 1 +
.../strategy/AutoRebalanceStrategy.java | 4 +-
.../knapsack/AbstractBaseKnapsackSolver.java | 51 ++
.../knapsack/AbstractKnapsackPropagator.java | 123 ++++
.../strategy/knapsack/BaseKnapsackSolver.java | 68 +++
.../strategy/knapsack/KnapsackAssignment.java | 40 ++
.../KnapsackCapacityPropagatorImpl.java | 237 +++++++
.../knapsack/KnapsackGenericSolverImpl.java | 288 +++++++++
.../strategy/knapsack/KnapsackItem.java | 52 ++
.../strategy/knapsack/KnapsackPropagator.java | 80 +++
.../strategy/knapsack/KnapsackSearchNode.java | 81 +++
.../knapsack/KnapsackSearchNodeImpl.java | 96 +++
.../strategy/knapsack/KnapsackSearchPath.java | 58 ++
.../knapsack/KnapsackSearchPathImpl.java | 84 +++
.../strategy/knapsack/KnapsackSolver.java | 79 +++
.../strategy/knapsack/KnapsackSolverImpl.java | 210 +++++++
.../strategy/knapsack/KnapsackState.java | 61 ++
.../strategy/knapsack/KnapsackStateImpl.java | 80 +++
.../helix/healthcheck/DecayAggregationType.java | 3 +-
.../DefaultControllerMessageHandlerFactory.java | 7 +-
...ltParticipantErrorMessageHandlerFactory.java | 7 +-
.../DefaultSchedulerMessageHandlerFactory.java | 7 +-
.../manager/zk/HelixConnectionAdaptor.java | 3 +-
.../apache/helix/manager/zk/ZKHelixAdmin.java | 5 +-
.../helix/manager/zk/ZkBaseDataAccessor.java | 7 +-
.../manager/zk/ZkCacheBaseDataAccessor.java | 5 +-
.../helix/manager/zk/ZkCallbackHandler.java | 31 +-
.../helix/manager/zk/ZkHelixAutoController.java | 4 +-
.../helix/manager/zk/ZkHelixController.java | 13 +-
.../helix/manager/zk/ZkHelixParticipant.java | 4 +-
.../handling/AsyncCallbackService.java | 10 +-
.../handling/HelixStateTransitionHandler.java | 2 +
.../helix/model/ClusterConfiguration.java | 52 +-
.../java/org/apache/helix/model/IdealState.java | 16 +-
.../org/apache/helix/model/InstanceConfig.java | 80 ++-
.../java/org/apache/helix/model/Message.java | 3 +-
.../helix/model/ProvisionerConfigHolder.java | 184 ++++++
.../helix/model/ResourceConfiguration.java | 68 ++-
.../participant/AbstractParticipantService.java | 142 +++++
.../helix/participant/StateMachineEngine.java | 3 +-
.../helix/task/FixedTargetTaskRebalancer.java | 163 +++++
.../helix/task/GenericTaskRebalancer.java | 277 +++++++++
.../java/org/apache/helix/task/JobConfig.java | 381 ++++++++++++
.../java/org/apache/helix/task/JobContext.java | 248 ++++++++
.../main/java/org/apache/helix/task/JobDag.java | 151 +++++
.../org/apache/helix/task/ScheduleConfig.java | 162 +++++
.../java/org/apache/helix/task/TargetState.java | 8 +-
.../apache/helix/task/TaskCallbackContext.java | 67 ++
.../java/org/apache/helix/task/TaskConfig.java | 340 +++--------
.../org/apache/helix/task/TaskConstants.java | 4 +
.../java/org/apache/helix/task/TaskContext.java | 135 ----
.../java/org/apache/helix/task/TaskDag.java | 152 -----
.../java/org/apache/helix/task/TaskDriver.java | 213 +++++--
.../java/org/apache/helix/task/TaskFactory.java | 5 +-
.../org/apache/helix/task/TaskRebalancer.java | 611 ++++++++++++-------
.../java/org/apache/helix/task/TaskRunner.java | 18 +-
.../org/apache/helix/task/TaskStateModel.java | 46 +-
.../helix/task/TaskStateModelFactory.java | 8 +-
.../java/org/apache/helix/task/TaskUtil.java | 334 ++++++++--
.../java/org/apache/helix/task/Workflow.java | 243 +++++---
.../org/apache/helix/task/WorkflowConfig.java | 58 +-
.../org/apache/helix/task/WorkflowContext.java | 73 +--
.../org/apache/helix/task/beans/JobBean.java | 44 ++
.../apache/helix/task/beans/ScheduleBean.java | 32 +
.../org/apache/helix/task/beans/TaskBean.java | 17 +-
.../apache/helix/task/beans/WorkflowBean.java | 7 +-
.../org/apache/helix/tools/ClusterSetup.java | 6 +-
.../helix/tools/StateModelConfigGenerator.java | 54 ++
.../org/apache/helix/util/StatusUpdateUtil.java | 5 +-
.../java/org/apache/helix/ZkTestHelper.java | 6 +-
.../helix/controller/stages/BaseStageTest.java | 2 +-
.../stages/TestMsgSelectionStage.java | 8 +-
.../strategy/TestNewAutoRebalanceStrategy.java | 2 +-
.../helix/healthcheck/TestAlertFireHistory.java | 1 -
.../helix/healthcheck/TestExpandAlert.java | 3 +-
.../helix/healthcheck/TestSimpleAlert.java | 3 +-
.../healthcheck/TestSimpleWildcardAlert.java | 3 +-
.../helix/healthcheck/TestStalenessAlert.java | 3 +-
.../helix/healthcheck/TestWildcardAlert.java | 3 +-
.../TestAddNodeAfterControllerStart.java | 4 +-
.../TestAddStateModelFactoryAfterConnect.java | 4 +-
.../helix/integration/TestAutoRebalance.java | 3 +-
.../integration/TestCleanupExternalView.java | 8 +-
.../helix/integration/TestHelixConnection.java | 8 +-
.../integration/TestLocalContainerProvider.java | 346 +++++++++++
.../TestMessagePartitionStateMismatch.java | 3 +-
.../TestParticipantErrorMessage.java | 9 +-
.../integration/TestResetPartitionState.java | 7 +-
.../helix/integration/TestSharedConnection.java | 12 +-
.../helix/integration/TestStandAloneCMMain.java | 3 +-
.../integration/TestStateTransitionTimeout.java | 3 +-
...dAloneCMTestBaseWithPropertyServerCheck.java | 30 +-
.../manager/ClusterDistributedController.java | 3 +-
.../task/TestIndependentTaskRebalancer.java | 331 ++++++++++
.../integration/task/TestTaskRebalancer.java | 136 +++--
.../task/TestTaskRebalancerStopResume.java | 59 +-
.../apache/helix/integration/task/TestUtil.java | 15 +-
.../integration/task/WorkflowGenerator.java | 74 ++-
.../zk/TestZKPropertyTransferServer.java | 6 +-
.../manager/zk/TestZkHelixAutoController.java | 4 +-
.../helix/manager/zk/TestZkHelixController.java | 6 +-
.../manager/zk/TestZkHelixParticipant.java | 9 +-
.../zk/TestZkManagerFlappingDetection.java | 1 -
.../handling/TestConfigThreadpoolSize.java | 7 +-
.../handling/TestHelixTaskExecutor.java | 9 +-
.../apache/helix/tools/TestHelixAdminCli.java | 3 +-
.../helix/examples/LogicalModelExample.java | 12 +-
helix-provisioning/.gitignore | 16 +
helix-provisioning/DISCLAIMER | 15 +
helix-provisioning/LICENSE | 273 +++++++++
helix-provisioning/NOTICE | 30 +
helix-provisioning/README.md | 35 ++
helix-provisioning/pom.xml | 118 ++++
helix-provisioning/src/assemble/assembly.xml | 60 ++
.../src/main/config/log4j.properties | 31 +
.../apache/helix/provisioning/AppConfig.java | 35 ++
.../helix/provisioning/ApplicationSpec.java | 46 ++
.../provisioning/ApplicationSpecFactory.java | 28 +
.../provisioning/ContainerAskResponse.java | 36 ++
.../provisioning/ContainerLaunchResponse.java | 24 +
.../provisioning/ContainerReleaseResponse.java | 24 +
.../provisioning/ContainerStopResponse.java | 24 +
.../helix/provisioning/HelixYarnUtil.java | 61 ++
.../helix/provisioning/ParticipantLauncher.java | 156 +++++
.../helix/provisioning/ServiceConfig.java | 32 +
.../apache/helix/provisioning/TaskConfig.java | 48 ++
.../StatelessParticipantService.java | 86 +++
.../participant/StatelessServiceStateModel.java | 56 ++
.../StatelessServiceStateModelFactory.java | 39 ++
.../provisioning/tools/ContainerAdmin.java | 116 ++++
.../tools/UpdateProvisionerConfig.java | 106 ++++
.../helix/provisioning/yarn/AppLauncher.java | 580 ++++++++++++++++++
.../provisioning/yarn/AppMasterConfig.java | 130 ++++
.../provisioning/yarn/AppMasterLauncher.java | 213 +++++++
.../yarn/AppStatusReportGenerator.java | 103 ++++
.../provisioning/yarn/FixedTargetProvider.java | 39 ++
.../yarn/GenericApplicationMaster.java | 316 ++++++++++
.../yarn/LaunchContainerRunnable.java | 98 +++
.../provisioning/yarn/NMCallbackHandler.java | 103 ++++
.../provisioning/yarn/RMCallbackHandler.java | 150 +++++
.../provisioning/yarn/YarnProvisioner.java | 411 +++++++++++++
.../yarn/YarnProvisionerConfig.java | 73 +++
.../src/main/resources/sample_application.yaml | 42 ++
helix-provisioning/src/test/conf/testng.xml | 27 +
pom.xml | 6 +
recipes/helloworld-provisioning-yarn/pom.xml | 158 +++++
recipes/helloworld-provisioning-yarn/run.sh | 6 +
.../src/assemble/assembly.xml | 60 ++
.../src/main/config/log4j.properties | 31 +
.../yarn/example/HelloWordAppSpecFactory.java | 48 ++
.../yarn/example/HelloWorldService.java | 56 ++
.../yarn/example/HelloworldAppSpec.java | 167 +++++
.../main/resources/hello_world_app_spec.yaml | 42 ++
.../src/test/conf/testng.xml | 27 +
recipes/jobrunner-yarn/pom.xml | 158 +++++
recipes/jobrunner-yarn/run.sh | 6 +
.../jobrunner-yarn/src/assemble/assembly.xml | 60 ++
.../src/main/config/log4j.properties | 31 +
.../yarn/example/JobRunnerMain.java | 151 +++++
.../helix/provisioning/yarn/example/MyTask.java | 72 +++
.../yarn/example/MyTaskAppSpec.java | 167 +++++
.../yarn/example/MyTaskAppSpecFactory.java | 47 ++
.../yarn/example/MyTaskService.java | 81 +++
.../src/main/resources/dummy_job.yaml | 36 ++
.../src/main/resources/job_runner_app_spec.yaml | 41 ++
recipes/jobrunner-yarn/src/test/conf/testng.xml | 27 +
recipes/pom.xml | 2 +
190 files changed, 12263 insertions(+), 1400 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/helix/blob/713586c4/helix-core/pom.xml
----------------------------------------------------------------------
diff --cc helix-core/pom.xml
index 7ca41d7,0f1f2b9..a8414f4
--- a/helix-core/pom.xml
+++ b/helix-core/pom.xml
@@@ -213,9 -217,9 +213,13 @@@ under the License
<name>test-util</name>
</program>
<program>
+ <mainClass>org.apache.helix.tools.ZkGrep</mainClass>
+ <name>zkgrep</name>
+ </program>
++ <program>
+ <mainClass>org.apache.helix.task.TaskDriver</mainClass>
+ <name>task-driver</name>
+ </program>
</programs>
</configuration>
</plugin>
http://git-wip-us.apache.org/repos/asf/helix/blob/713586c4/helix-core/src/main/java/org/apache/helix/api/accessor/ClusterAccessor.java
----------------------------------------------------------------------
diff --cc helix-core/src/main/java/org/apache/helix/api/accessor/ClusterAccessor.java
index a41da5a,e653338..c85dd0b
--- a/helix-core/src/main/java/org/apache/helix/api/accessor/ClusterAccessor.java
+++ b/helix-core/src/main/java/org/apache/helix/api/accessor/ClusterAccessor.java
@@@ -55,7 -54,7 +55,8 @@@ import org.apache.helix.api.id.SessionI
import org.apache.helix.api.id.StateModelDefId;
import org.apache.helix.controller.context.ControllerContext;
import org.apache.helix.controller.context.ControllerContextHolder;
+ import org.apache.helix.controller.provisioner.ProvisionerConfig;
+import org.apache.helix.controller.rebalancer.RebalancerRef;
import org.apache.helix.controller.rebalancer.config.PartitionedRebalancerConfig;
import org.apache.helix.controller.rebalancer.config.RebalancerConfig;
import org.apache.helix.controller.rebalancer.config.RebalancerConfigHolder;
@@@ -768,22 -700,6 +770,27 @@@ public class ClusterAccessor
if (idealState != null) {
_accessor.setProperty(_keyBuilder.idealStates(resourceId.stringify()), idealState);
}
+
+ // Add resource user config
+ if (resource.getUserConfig() != null) {
+ ResourceConfiguration configuration = new ResourceConfiguration(resourceId);
+ configuration.setType(resource.getType());
+ configuration.addNamespacedConfig(resource.getUserConfig());
+ PartitionedRebalancerConfig partitionedConfig = PartitionedRebalancerConfig.from(config);
+ if (idealState == null
+ && (partitionedConfig == null || partitionedConfig.getRebalanceMode() == RebalanceMode.USER_DEFINED)) {
+ // only persist if this is not easily convertible to an ideal state
+ configuration
+ .addNamespacedConfig(new RebalancerConfigHolder(resource.getRebalancerConfig())
+ .toNamespacedConfig());
+ }
++ ProvisionerConfig provisionerConfig = resource.getProvisionerConfig();
++ if (provisionerConfig != null) {
++ configuration.addNamespacedConfig(new ProvisionerConfigHolder(provisionerConfig)
++ .toNamespacedConfig());
++ }
+ _accessor.setProperty(_keyBuilder.resourceConfig(resourceId.stringify()), configuration);
+ }
return true;
}
@@@ -812,10 -728,14 +819,14 @@@
BaseDataAccessor<?> baseAccessor = _accessor.getBaseDataAccessor();
if (baseAccessor != null) {
boolean[] existsResults = baseAccessor.exists(paths, 0);
- int ind =0;
++ int ind = 0;
for (boolean exists : existsResults) {
-
++
if (!exists) {
- LOG.warn("Path does not exist:"+ paths.get(ind));
++ LOG.warn("Path does not exist:" + paths.get(ind));
return false;
}
+ ind = ind + 1;
}
}
return true;
http://git-wip-us.apache.org/repos/asf/helix/blob/713586c4/helix-core/src/main/java/org/apache/helix/api/accessor/ParticipantAccessor.java
----------------------------------------------------------------------
diff --cc helix-core/src/main/java/org/apache/helix/api/accessor/ParticipantAccessor.java
index d0cc3ba,cf4c549..cb52e91
--- a/helix-core/src/main/java/org/apache/helix/api/accessor/ParticipantAccessor.java
+++ b/helix-core/src/main/java/org/apache/helix/api/accessor/ParticipantAccessor.java
@@@ -43,9 -43,9 +43,10 @@@ import org.apache.helix.api.Resource
import org.apache.helix.api.RunningInstance;
import org.apache.helix.api.Scope;
import org.apache.helix.api.State;
+ import org.apache.helix.api.config.ContainerConfig;
import org.apache.helix.api.config.ParticipantConfig;
import org.apache.helix.api.config.UserConfig;
+import org.apache.helix.api.id.ClusterId;
import org.apache.helix.api.id.MessageId;
import org.apache.helix.api.id.ParticipantId;
import org.apache.helix.api.id.PartitionId;
http://git-wip-us.apache.org/repos/asf/helix/blob/713586c4/helix-core/src/main/java/org/apache/helix/api/accessor/ResourceAccessor.java
----------------------------------------------------------------------
diff --cc helix-core/src/main/java/org/apache/helix/api/accessor/ResourceAccessor.java
index 310b457,0052871..7dde6ee
--- a/helix-core/src/main/java/org/apache/helix/api/accessor/ResourceAccessor.java
+++ b/helix-core/src/main/java/org/apache/helix/api/accessor/ResourceAccessor.java
@@@ -39,7 -38,7 +39,8 @@@ import org.apache.helix.api.id.ClusterI
import org.apache.helix.api.id.ParticipantId;
import org.apache.helix.api.id.PartitionId;
import org.apache.helix.api.id.ResourceId;
+ import org.apache.helix.controller.provisioner.ProvisionerConfig;
+import org.apache.helix.controller.rebalancer.RebalancerRef;
import org.apache.helix.controller.rebalancer.config.BasicRebalancerConfig;
import org.apache.helix.controller.rebalancer.config.CustomRebalancerConfig;
import org.apache.helix.controller.rebalancer.config.PartitionedRebalancerConfig;
@@@ -274,11 -260,13 +276,17 @@@ public class ResourceAccessor
// only persist if this is not easily convertible to an ideal state
config.addNamespacedConfig(new RebalancerConfigHolder(resourceConfig.getRebalancerConfig())
.toNamespacedConfig());
+ config.setBucketSize(resourceConfig.getBucketSize());
+ config.setBatchMessageMode(resourceConfig.getBatchMessageMode());
+ } else if (userConfig == null) {
+ config = null;
}
+ if (resourceConfig.getProvisionerConfig() != null) {
+ config.addNamespacedConfig(new ProvisionerConfigHolder(resourceConfig.getProvisionerConfig())
+ .toNamespacedConfig());
+ }
+ config.setBucketSize(resourceConfig.getBucketSize());
+ config.setBatchMessageMode(resourceConfig.getBatchMessageMode());
setConfiguration(resourceId, config, resourceConfig.getRebalancerConfig());
return true;
}
http://git-wip-us.apache.org/repos/asf/helix/blob/713586c4/helix-core/src/main/java/org/apache/helix/controller/GenericHelixController.java
----------------------------------------------------------------------
diff --cc helix-core/src/main/java/org/apache/helix/controller/GenericHelixController.java
index c8b033e,b5fb23e..f9af914
--- a/helix-core/src/main/java/org/apache/helix/controller/GenericHelixController.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/GenericHelixController.java
@@@ -48,10 -45,9 +48,11 @@@ import org.apache.helix.api.id.SessionI
import org.apache.helix.controller.pipeline.Pipeline;
import org.apache.helix.controller.pipeline.PipelineRegistry;
import org.apache.helix.controller.stages.BestPossibleStateCalcStage;
+import org.apache.helix.controller.stages.ClusterDataCache;
import org.apache.helix.controller.stages.ClusterEvent;
+import org.apache.helix.controller.stages.ClusterEventBlockingQueue;
import org.apache.helix.controller.stages.CompatibilityCheckStage;
+ import org.apache.helix.controller.stages.ContainerProvisioningStage;
import org.apache.helix.controller.stages.CurrentStateComputationStage;
import org.apache.helix.controller.stages.ExternalViewComputeStage;
import org.apache.helix.controller.stages.MessageGenerationStage;
@@@ -201,7 -183,7 +202,8 @@@ public class GenericHelixController imp
Pipeline rebalancePipeline = new Pipeline();
rebalancePipeline.addStage(new CompatibilityCheckStage());
rebalancePipeline.addStage(new ResourceComputationStage());
+ rebalancePipeline.addStage(new ResourceValidationStage());
+ rebalancePipeline.addStage(new ContainerProvisioningStage());
rebalancePipeline.addStage(new CurrentStateComputationStage());
rebalancePipeline.addStage(new BestPossibleStateCalcStage());
rebalancePipeline.addStage(new MessageGenerationStage());
http://git-wip-us.apache.org/repos/asf/helix/blob/713586c4/helix-core/src/main/java/org/apache/helix/controller/provisioner/ContainerId.java
----------------------------------------------------------------------
diff --cc helix-core/src/main/java/org/apache/helix/controller/provisioner/ContainerId.java
index 0000000,49bc0fc..b42d881
mode 000000,100644..100644
--- a/helix-core/src/main/java/org/apache/helix/controller/provisioner/ContainerId.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/provisioner/ContainerId.java
@@@ -1,0 -1,30 +1,49 @@@
+ package org.apache.helix.controller.provisioner;
+
++/*
++ * Licensed to the Apache Software Foundation (ASF) under one
++ * or more contributor license agreements. See the NOTICE file
++ * distributed with this work for additional information
++ * regarding copyright ownership. The ASF licenses this file
++ * to you under the Apache License, Version 2.0 (the
++ * "License"); you may not use this file except in compliance
++ * with the License. You may obtain a copy of the License at
++ *
++ * http://www.apache.org/licenses/LICENSE-2.0
++ *
++ * Unless required by applicable law or agreed to in writing,
++ * software distributed under the License is distributed on an
++ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
++ * KIND, either express or implied. See the License for the
++ * specific language governing permissions and limitations
++ * under the License.
++ */
++
+ import org.apache.helix.api.id.Id;
+
+ public class ContainerId extends Id {
+
+ String id;
+
+ private ContainerId(String containerId) {
+ this.id = containerId;
+ }
+
+ @Override
+ public String stringify() {
+ return id;
+ }
+
+ /**
+ * Get a concrete partition id
+ * @param partitionId string partition identifier
+ * @return PartitionId
+ */
+ public static ContainerId from(String containerId) {
+ if (containerId == null) {
+ return null;
+ }
+ return new ContainerId(containerId);
+ }
+
+ }
http://git-wip-us.apache.org/repos/asf/helix/blob/713586c4/helix-core/src/main/java/org/apache/helix/controller/provisioner/ContainerSpec.java
----------------------------------------------------------------------
diff --cc helix-core/src/main/java/org/apache/helix/controller/provisioner/ContainerSpec.java
index 0000000,ab3c46a..36ad7f9
mode 000000,100644..100644
--- a/helix-core/src/main/java/org/apache/helix/controller/provisioner/ContainerSpec.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/provisioner/ContainerSpec.java
@@@ -1,0 -1,65 +1,65 @@@
+ package org.apache.helix.controller.provisioner;
+
+ import org.apache.helix.api.id.ParticipantId;
+
+ /*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+ public class ContainerSpec {
+ /**
+ * Some unique id representing the container.
+ */
+ ContainerId _containerId;
-
++
+ int _memory;
+
+ private ParticipantId _participantId;
+
+ public ContainerSpec(ParticipantId _participantId) {
+ this._participantId = _participantId;
+ }
+
+ public ContainerId getContainerId() {
+ return _containerId;
+ }
+
+ @Override
+ public String toString() {
+ return _participantId.toString();
+ }
-
- public void setMemory(int memory){
++
++ public void setMemory(int memory) {
+ _memory = memory;
+ }
+
- public int getMemory(){
++ public int getMemory() {
+ return _memory;
+ }
-
++
+ public static ContainerSpec from(String serialized) {
- //todo
++ // todo
+ return null;
- //return new ContainerSpec(ContainerId.from(serialized));
++ // return new ContainerSpec(ContainerId.from(serialized));
+ }
+
+ public ParticipantId getParticipantId() {
+ return _participantId;
+ }
-
++
+ }
http://git-wip-us.apache.org/repos/asf/helix/blob/713586c4/helix-core/src/main/java/org/apache/helix/controller/provisioner/ServiceConfig.java
----------------------------------------------------------------------
diff --cc helix-core/src/main/java/org/apache/helix/controller/provisioner/ServiceConfig.java
index 0000000,adccb2c..9370cfd
mode 000000,100644..100644
--- a/helix-core/src/main/java/org/apache/helix/controller/provisioner/ServiceConfig.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/provisioner/ServiceConfig.java
@@@ -1,0 -1,5 +1,24 @@@
+ package org.apache.helix.controller.provisioner;
+
++/*
++ * Licensed to the Apache Software Foundation (ASF) under one
++ * or more contributor license agreements. See the NOTICE file
++ * distributed with this work for additional information
++ * regarding copyright ownership. The ASF licenses this file
++ * to you under the Apache License, Version 2.0 (the
++ * "License"); you may not use this file except in compliance
++ * with the License. You may obtain a copy of the License at
++ *
++ * http://www.apache.org/licenses/LICENSE-2.0
++ *
++ * Unless required by applicable law or agreed to in writing,
++ * software distributed under the License is distributed on an
++ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
++ * KIND, either express or implied. See the License for the
++ * specific language governing permissions and limitations
++ * under the License.
++ */
++
+ public class ServiceConfig {
+
+ }
http://git-wip-us.apache.org/repos/asf/helix/blob/713586c4/helix-core/src/main/java/org/apache/helix/controller/provisioner/TargetProvider.java
----------------------------------------------------------------------
diff --cc helix-core/src/main/java/org/apache/helix/controller/provisioner/TargetProvider.java
index 0000000,063d008..1e5957b
mode 000000,100644..100644
--- a/helix-core/src/main/java/org/apache/helix/controller/provisioner/TargetProvider.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/provisioner/TargetProvider.java
@@@ -1,0 -1,42 +1,39 @@@
+ package org.apache.helix.controller.provisioner;
+
+ /*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+ import java.util.Collection;
+
-import org.apache.helix.HelixManager;
+ import org.apache.helix.api.Cluster;
+ import org.apache.helix.api.Participant;
-import org.apache.helix.api.config.ResourceConfig;
+ import org.apache.helix.api.id.ResourceId;
+
+ public interface TargetProvider {
+
-
+ /**
+ * @param cluster
+ * @param resourceId ResourceId name of the resource
+ * @param participants
+ * @return
+ */
+ TargetProviderResponse evaluateExistingContainers(Cluster cluster, ResourceId resourceId,
+ Collection<Participant> participants);
+
+ }
http://git-wip-us.apache.org/repos/asf/helix/blob/713586c4/helix-core/src/main/java/org/apache/helix/controller/stages/ContainerProvisioningStage.java
----------------------------------------------------------------------
diff --cc helix-core/src/main/java/org/apache/helix/controller/stages/ContainerProvisioningStage.java
index 0000000,ae433e0..25645d3
mode 000000,100644..100644
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/ContainerProvisioningStage.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/ContainerProvisioningStage.java
@@@ -1,0 -1,332 +1,332 @@@
+ package org.apache.helix.controller.stages;
+
+ /*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+ import java.util.Collection;
+ import java.util.HashMap;
+ import java.util.List;
+ import java.util.Map;
+ import java.util.UUID;
+
+ import org.apache.helix.HelixAdmin;
+ import org.apache.helix.HelixDataAccessor;
+ import org.apache.helix.HelixManager;
+ import org.apache.helix.PropertyKey;
+ import org.apache.helix.api.Cluster;
+ import org.apache.helix.api.Participant;
+ import org.apache.helix.api.config.ContainerConfig;
+ import org.apache.helix.api.config.ResourceConfig;
+ import org.apache.helix.api.id.ParticipantId;
+ import org.apache.helix.api.id.ResourceId;
+ import org.apache.helix.controller.pipeline.AbstractBaseStage;
+ import org.apache.helix.controller.provisioner.ContainerId;
+ import org.apache.helix.controller.provisioner.ContainerProvider;
+ import org.apache.helix.controller.provisioner.ContainerSpec;
+ import org.apache.helix.controller.provisioner.ContainerState;
+ import org.apache.helix.controller.provisioner.Provisioner;
+ import org.apache.helix.controller.provisioner.ProvisionerConfig;
+ import org.apache.helix.controller.provisioner.ProvisionerRef;
+ import org.apache.helix.controller.provisioner.TargetProvider;
+ import org.apache.helix.controller.provisioner.TargetProviderResponse;
+ import org.apache.helix.model.InstanceConfig;
+ import org.apache.helix.model.Message;
+ import org.apache.helix.model.Message.MessageType;
+ import org.apache.log4j.Logger;
+
+ import com.google.common.util.concurrent.FutureCallback;
+ import com.google.common.util.concurrent.Futures;
+ import com.google.common.util.concurrent.ListenableFuture;
+
+ /**
+ * This stage will manager the container allocation/deallocation needed for a
+ * specific resource.<br/>
+ * It does the following <br/>
+ * From the idealstate, it gets ContainerTargetProvider and ContainerProvider <br/>
+ * ContainerTargetProviderFactory will provide the number of containers needed
+ * for a resource <br/>
+ * ContainerProvider will provide the ability to allocate, deallocate, start,
+ * stop container <br/>
+ */
+ public class ContainerProvisioningStage extends AbstractBaseStage {
+ private static final Logger LOG = Logger.getLogger(ContainerProvisioningStage.class);
+
+ Map<ResourceId, Provisioner> _provisionerMap = new HashMap<ResourceId, Provisioner>();
+ Map<ResourceId, TargetProvider> _targetProviderMap = new HashMap<ResourceId, TargetProvider>();
+ Map<ResourceId, ContainerProvider> _containerProviderMap =
+ new HashMap<ResourceId, ContainerProvider>();
+
+ @Override
+ public void process(ClusterEvent event) throws Exception {
+ final HelixManager helixManager = event.getAttribute("helixmanager");
+ final Map<ResourceId, ResourceConfig> resourceMap =
+ event.getAttribute(AttributeName.RESOURCES.toString());
+ final HelixAdmin helixAdmin = helixManager.getClusterManagmentTool();
+ final HelixDataAccessor accessor = helixManager.getHelixDataAccessor();
+ final PropertyKey.Builder keyBuilder = accessor.keyBuilder();
+ for (ResourceId resourceId : resourceMap.keySet()) {
+ ResourceConfig resourceConfig = resourceMap.get(resourceId);
+ ProvisionerConfig provisionerConfig = resourceConfig.getProvisionerConfig();
+ if (provisionerConfig != null) {
+ Provisioner provisioner;
+ provisioner = _provisionerMap.get(resourceId);
+
+ // instantiate and cache a provisioner if there isn't one already cached
+ if (provisioner == null) {
+ ProvisionerRef provisionerRef = provisionerConfig.getProvisionerRef();
+ if (provisionerRef != null) {
+ provisioner = provisionerRef.getProvisioner();
+ }
+ if (provisioner != null) {
+ provisioner.init(helixManager, resourceConfig);
+ _containerProviderMap.put(resourceId, provisioner.getContainerProvider());
+ _targetProviderMap.put(resourceId, provisioner.getTargetProvider());
+ _provisionerMap.put(resourceId, provisioner);
+ } else {
+ LOG.error("Resource " + resourceId + " does not have a valid provisioner class!");
+ break;
+ }
+ }
+ TargetProvider targetProvider = _targetProviderMap.get(resourceId);
+ ContainerProvider containerProvider = _containerProviderMap.get(resourceId);
- final Cluster cluster = event.getAttribute("ClusterDataCache");
++ final Cluster cluster = event.getAttribute("Cluster");
+ final Collection<Participant> participants = cluster.getParticipantMap().values();
+
+ // If a process died, we need to mark it as DISCONNECTED or if the process is ready, mark as
+ // CONNECTED
+ Map<ParticipantId, Participant> participantMap = cluster.getParticipantMap();
+ for (ParticipantId participantId : participantMap.keySet()) {
+ Participant participant = participantMap.get(participantId);
+ ContainerConfig config = participant.getContainerConfig();
+ if (config != null) {
+ ContainerState containerState = config.getState();
+ if (!participant.isAlive() && ContainerState.CONNECTED.equals(containerState)) {
+ // Need to mark as disconnected if process died
+ LOG.info("Participant " + participantId + " died, marking as DISCONNECTED");
+ updateContainerState(helixAdmin, accessor, keyBuilder, cluster, participantId,
+ ContainerState.DISCONNECTED);
+ } else if (participant.isAlive() && ContainerState.CONNECTING.equals(containerState)) {
+ // Need to mark as connected only when the live instance is visible
+ LOG.info("Participant " + participantId + " is ready, marking as CONNECTED");
+ updateContainerState(helixAdmin, accessor, keyBuilder, cluster, participantId,
+ ContainerState.CONNECTED);
+ } else if (!participant.isAlive() && ContainerState.HALTING.equals(containerState)) {
+ // Need to mark as connected only when the live instance is visible
+ LOG.info("Participant " + participantId + " is has been killed, marking as HALTED");
+ updateContainerState(helixAdmin, accessor, keyBuilder, cluster, participantId,
+ ContainerState.HALTED);
+ }
+ }
+ }
+
+ // Participants registered in helix
+ // Give those participants to targetprovider
+ // Provide the response that contains, new containerspecs, containers to be released,
+ // containers to be stopped
+ // call the respective provisioner to allocate and start the container.
+ // Each container is then started its state is changed from any place.
+ // The target provider is given the state of container and asked for its new state. For each
+ // state there is a corresponding handler function.
+
+ // TargetProvider should be stateless, given the state of cluster and existing participants
+ // it should return the same result
+ final TargetProviderResponse response =
+ targetProvider.evaluateExistingContainers(cluster, resourceId, participants);
+
+ // allocate new containers
+ for (final ContainerSpec spec : response.getContainersToAcquire()) {
+ final ParticipantId participantId = spec.getParticipantId();
+ List<String> instancesInCluster =
+ helixAdmin.getInstancesInCluster(cluster.getId().stringify());
+ if (!instancesInCluster.contains(participantId.stringify())) {
+ // create a new Participant, attach the container spec
+ InstanceConfig instanceConfig = new InstanceConfig(participantId);
+ instanceConfig.setContainerSpec(spec);
+ // create a helix_participant in ACQUIRING state
+ instanceConfig.setContainerState(ContainerState.ACQUIRING);
+ // create the helix participant and add it to cluster
+ helixAdmin.addInstance(cluster.getId().toString(), instanceConfig);
+ }
+ LOG.info("Allocating container for " + participantId);
+ ListenableFuture<ContainerId> future = containerProvider.allocateContainer(spec);
+ FutureCallback<ContainerId> callback = new FutureCallback<ContainerId>() {
+ @Override
+ public void onSuccess(ContainerId containerId) {
+ LOG.info("Container " + containerId + " acquired. Marking " + participantId);
+ InstanceConfig existingInstance =
+ helixAdmin
+ .getInstanceConfig(cluster.getId().toString(), participantId.toString());
+ existingInstance.setContainerId(containerId);
+ existingInstance.setContainerState(ContainerState.ACQUIRED);
+ accessor.updateProperty(keyBuilder.instanceConfig(participantId.toString()),
+ existingInstance);
+ }
+
+ @Override
+ public void onFailure(Throwable t) {
+ LOG.error("Could not allocate a container for participant " + participantId, t);
+ updateContainerState(helixAdmin, accessor, keyBuilder, cluster, participantId,
+ ContainerState.FAILED);
+ }
+ };
+ safeAddCallback(future, callback);
+ }
+
+ // start new containers
+ for (final Participant participant : response.getContainersToStart()) {
+ final InstanceConfig existingInstance =
+ helixAdmin.getInstanceConfig(cluster.getId().toString(), participant.getId()
+ .toString());
+ final ContainerId containerId = existingInstance.getContainerId();
+ existingInstance.setContainerState(ContainerState.CONNECTING);
+ accessor.updateProperty(keyBuilder.instanceConfig(participant.getId().toString()),
+ existingInstance);
+ // create the helix participant and add it to cluster
+ LOG.info("Starting container " + containerId + " for " + participant.getId());
+ ListenableFuture<Boolean> future =
+ containerProvider.startContainer(containerId, participant);
+ FutureCallback<Boolean> callback = new FutureCallback<Boolean>() {
+ @Override
+ public void onSuccess(Boolean result) {
+ // Do nothing yet, need to wait for live instance
+ LOG.info("Container " + containerId + " started for " + participant.getId());
+ }
+
+ @Override
+ public void onFailure(Throwable t) {
+ LOG.error("Could not start container" + containerId + "for participant "
+ + participant.getId(), t);
+ updateContainerState(helixAdmin, accessor, keyBuilder, cluster, participant.getId(),
+ ContainerState.FAILED);
+ }
+ };
+ safeAddCallback(future, callback);
+ }
+
+ // release containers
+ for (final Participant participant : response.getContainersToRelease()) {
+ // mark it as finalizing
+ final InstanceConfig existingInstance =
+ helixAdmin.getInstanceConfig(cluster.getId().toString(), participant.getId()
+ .toString());
+ final ContainerId containerId = existingInstance.getContainerId();
+ existingInstance.setContainerState(ContainerState.FINALIZING);
+ accessor.updateProperty(keyBuilder.instanceConfig(participant.getId().toString()),
+ existingInstance);
+ // remove the participant
+ LOG.info("Deallocating container " + containerId + " for " + participant.getId());
+ ListenableFuture<Boolean> future = containerProvider.deallocateContainer(containerId);
+ FutureCallback<Boolean> callback = new FutureCallback<Boolean>() {
+ @Override
+ public void onSuccess(Boolean result) {
+ LOG.info("Container " + containerId + " deallocated. Dropping " + participant.getId());
+ InstanceConfig existingInstance =
+ helixAdmin.getInstanceConfig(cluster.getId().toString(), participant.getId()
+ .toString());
+ helixAdmin.dropInstance(cluster.getId().toString(), existingInstance);
+ }
+
+ @Override
+ public void onFailure(Throwable t) {
+ LOG.error("Could not deallocate container" + containerId + "for participant "
+ + participant.getId(), t);
+ updateContainerState(helixAdmin, accessor, keyBuilder, cluster, participant.getId(),
+ ContainerState.FAILED);
+ }
+ };
+ safeAddCallback(future, callback);
+ }
+
+ // stop but don't remove
+ for (final Participant participant : response.getContainersToStop()) {
+ // disable the node first
+ final InstanceConfig existingInstance =
+ helixAdmin.getInstanceConfig(cluster.getId().toString(), participant.getId()
+ .toString());
+ final ContainerId containerId = existingInstance.getContainerId();
+ existingInstance.setInstanceEnabled(false);
+ existingInstance.setContainerState(ContainerState.HALTING);
+ accessor.updateProperty(keyBuilder.instanceConfig(participant.getId().toString()),
+ existingInstance);
+ // stop the container
+ LOG.info("Stopping container " + containerId + " for " + participant.getId());
+ ListenableFuture<Boolean> future = containerProvider.stopContainer(containerId);
+ FutureCallback<Boolean> callback = new FutureCallback<Boolean>() {
+ @Override
+ public void onSuccess(Boolean result) {
+ // Don't update the state here, wait for the live instance, but do send a shutdown
+ // message
+ LOG.info("Container " + containerId + " stopped for " + participant.getId());
+ if (participant.isAlive()) {
+ Message message = new Message(MessageType.SHUTDOWN, UUID.randomUUID().toString());
+ message.setTgtName(participant.getId().toString());
+ message.setTgtSessionId(participant.getRunningInstance().getSessionId());
+ message.setMsgId(message.getId());
+ accessor.createProperty(
+ keyBuilder.message(participant.getId().toString(), message.getId()), message);
+ }
+ }
+
+ @Override
+ public void onFailure(Throwable t) {
+ LOG.error(
+ "Could not stop container" + containerId + "for participant "
+ + participant.getId(), t);
+ updateContainerState(helixAdmin, accessor, keyBuilder, cluster, participant.getId(),
+ ContainerState.FAILED);
+ }
+ };
+ safeAddCallback(future, callback);
+ }
+ }
+ }
+ }
+
+ /**
+ * Update a participant with a new container state
+ * @param helixAdmin
+ * @param accessor
+ * @param keyBuilder
+ * @param cluster
+ * @param participantId
+ * @param state
+ */
+ private void updateContainerState(HelixAdmin helixAdmin, HelixDataAccessor accessor,
+ PropertyKey.Builder keyBuilder, Cluster cluster, ParticipantId participantId,
+ ContainerState state) {
+ InstanceConfig existingInstance =
+ helixAdmin.getInstanceConfig(cluster.getId().toString(), participantId.toString());
+ existingInstance.setContainerState(state);
+ existingInstance.setInstanceEnabled(state.equals(ContainerState.CONNECTED));
+ accessor.updateProperty(keyBuilder.instanceConfig(participantId.toString()), existingInstance);
+ }
+
+ /**
+ * Add a callback, failing if the add fails
+ * @param future the future to listen on
+ * @param callback the callback to invoke
+ */
+ private <T> void safeAddCallback(ListenableFuture<T> future, FutureCallback<T> callback) {
+ try {
+ Futures.addCallback(future, callback);
+ } catch (Throwable t) {
+ callback.onFailure(t);
+ }
+ }
+ }
http://git-wip-us.apache.org/repos/asf/helix/blob/713586c4/helix-core/src/main/java/org/apache/helix/controller/stages/ResourceComputationStage.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/helix/blob/713586c4/helix-core/src/main/java/org/apache/helix/controller/strategy/AutoRebalanceStrategy.java
----------------------------------------------------------------------
diff --cc helix-core/src/main/java/org/apache/helix/controller/strategy/AutoRebalanceStrategy.java
index 5a8de20,bff7e46..09b66c1
--- a/helix-core/src/main/java/org/apache/helix/controller/strategy/AutoRebalanceStrategy.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/strategy/AutoRebalanceStrategy.java
@@@ -510,13 -491,12 +510,13 @@@ public class AutoRebalanceStrategy
* @return The current assignments that do not conform to the preferred assignment
*/
private Map<Replica, Node> computeExistingNonPreferredPlacement(
- Map<String, Map<String, String>> currentMapping) {
+ Map<PartitionId, Map<ParticipantId, State>> currentMapping) {
Map<Replica, Node> existingNonPreferredAssignment = new TreeMap<Replica, Node>();
int count = countStateReplicas();
- for (String partition : currentMapping.keySet()) {
- Map<String, String> nodeStateMap = currentMapping.get(partition);
- for (String nodeId : nodeStateMap.keySet()) {
+ for (PartitionId partition : currentMapping.keySet()) {
+ Map<ParticipantId, State> nodeStateMap = currentMapping.get(partition);
++ nodeStateMap.keySet().retainAll(_nodeMap.keySet());
+ for (ParticipantId nodeId : nodeStateMap.keySet()) {
- nodeStateMap.keySet().retainAll(_nodeMap.keySet());
Node node = _nodeMap.get(nodeId);
boolean skip = false;
for (Replica replica : node.preferred) {
@@@ -580,13 -560,12 +580,13 @@@
* @return Assignments that conform to the preferred placement
*/
private Map<Replica, Node> computeExistingPreferredPlacement(
- final Map<String, Map<String, String>> currentMapping) {
+ final Map<PartitionId, Map<ParticipantId, State>> currentMapping) {
Map<Replica, Node> existingPreferredAssignment = new TreeMap<Replica, Node>();
int count = countStateReplicas();
- for (String partition : currentMapping.keySet()) {
- Map<String, String> nodeStateMap = currentMapping.get(partition);
- for (String nodeId : nodeStateMap.keySet()) {
+ for (PartitionId partition : currentMapping.keySet()) {
+ Map<ParticipantId, State> nodeStateMap = currentMapping.get(partition);
++ nodeStateMap.keySet().retainAll(_nodeMap.keySet());
+ for (ParticipantId nodeId : nodeStateMap.keySet()) {
- nodeStateMap.keySet().retainAll(_nodeMap.keySet());
Node node = _nodeMap.get(nodeId);
node.currentlyAssigned = node.currentlyAssigned + 1;
// check if its in one of the preferred position
http://git-wip-us.apache.org/repos/asf/helix/blob/713586c4/helix-core/src/main/java/org/apache/helix/controller/strategy/knapsack/AbstractBaseKnapsackSolver.java
----------------------------------------------------------------------
diff --cc helix-core/src/main/java/org/apache/helix/controller/strategy/knapsack/AbstractBaseKnapsackSolver.java
index 0000000,4d27bd7..a0de0e7
mode 000000,100644..100644
--- a/helix-core/src/main/java/org/apache/helix/controller/strategy/knapsack/AbstractBaseKnapsackSolver.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/strategy/knapsack/AbstractBaseKnapsackSolver.java
@@@ -1,0 -1,32 +1,51 @@@
+ package org.apache.helix.controller.strategy.knapsack;
+
++/*
++ * Licensed to the Apache Software Foundation (ASF) under one
++ * or more contributor license agreements. See the NOTICE file
++ * distributed with this work for additional information
++ * regarding copyright ownership. The ASF licenses this file
++ * to you under the Apache License, Version 2.0 (the
++ * "License"); you may not use this file except in compliance
++ * with the License. You may obtain a copy of the License at
++ *
++ * http://www.apache.org/licenses/LICENSE-2.0
++ *
++ * Unless required by applicable law or agreed to in writing,
++ * software distributed under the License is distributed on an
++ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
++ * KIND, either express or implied. See the License for the
++ * specific language governing permissions and limitations
++ * under the License.
++ */
++
+ /**
+ * Common implementation of a knapsack solver<br/>
+ * <br/>
+ * Based on the C++ knapsack solver in Google's or-tools package.
+ */
+ public abstract class AbstractBaseKnapsackSolver implements BaseKnapsackSolver {
+ private final String _solverName;
+
+ /**
+ * Initialize the solver
+ * @param solverName the name of the solvers
+ */
+ public AbstractBaseKnapsackSolver(final String solverName) {
+ _solverName = solverName;
+ }
+
+ @Override
+ public long[] getLowerAndUpperBoundWhenItem(int itemId, boolean isItemIn, long lowerBound,
+ long upperBound) {
+ return new long[] {
+ 0L, Long.MAX_VALUE
+ };
+ }
+
+ @Override
+ public String getName() {
+ return _solverName;
+ }
+
+ }
http://git-wip-us.apache.org/repos/asf/helix/blob/713586c4/helix-core/src/main/java/org/apache/helix/controller/strategy/knapsack/AbstractKnapsackPropagator.java
----------------------------------------------------------------------
diff --cc helix-core/src/main/java/org/apache/helix/controller/strategy/knapsack/AbstractKnapsackPropagator.java
index 0000000,0663990..6f9de66
mode 000000,100644..100644
--- a/helix-core/src/main/java/org/apache/helix/controller/strategy/knapsack/AbstractKnapsackPropagator.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/strategy/knapsack/AbstractKnapsackPropagator.java
@@@ -1,0 -1,104 +1,123 @@@
+ package org.apache.helix.controller.strategy.knapsack;
+
++/*
++ * Licensed to the Apache Software Foundation (ASF) under one
++ * or more contributor license agreements. See the NOTICE file
++ * distributed with this work for additional information
++ * regarding copyright ownership. The ASF licenses this file
++ * to you under the Apache License, Version 2.0 (the
++ * "License"); you may not use this file except in compliance
++ * with the License. You may obtain a copy of the License at
++ *
++ * http://www.apache.org/licenses/LICENSE-2.0
++ *
++ * Unless required by applicable law or agreed to in writing,
++ * software distributed under the License is distributed on an
++ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
++ * KIND, either express or implied. See the License for the
++ * specific language governing permissions and limitations
++ * under the License.
++ */
++
+ import java.util.ArrayList;
+
+ /**
+ * Common implementation of a knapsack constraint satisfier<br/>
+ * <br/>
+ * Based on the C++ knapsack solver in Google's or-tools package.
+ */
+ public abstract class AbstractKnapsackPropagator implements KnapsackPropagator {
+ private ArrayList<KnapsackItem> _items;
+ private long _currentProfit;
+ private long _profitLowerBound;
+ private long _profitUpperBound;
+ private KnapsackState _state;
+
+ /**
+ * Initialize the propagator
+ * @param state the current knapsack state
+ */
+ public AbstractKnapsackPropagator(final KnapsackState state) {
+ _items = new ArrayList<KnapsackItem>();
+ _currentProfit = 0L;
+ _profitLowerBound = 0L;
+ _profitUpperBound = Long.MAX_VALUE;
+ _state = state;
+ }
+
+ @Override
+ public void init(ArrayList<Long> profits, ArrayList<Long> weights) {
+ final int numberOfItems = profits.size();
+ _items.clear();
+ for (int i = 0; i < numberOfItems; i++) {
+ _items.add(new KnapsackItem(i, weights.get(i), profits.get(i)));
+ }
+ _currentProfit = 0;
+ _profitLowerBound = Long.MIN_VALUE;
+ _profitUpperBound = Long.MAX_VALUE;
+ initPropagator();
+ }
+
+ @Override
+ public boolean update(boolean revert, KnapsackAssignment assignment) {
+ if (assignment.isIn) {
+ if (revert) {
+ _currentProfit -= _items.get(assignment.itemId).profit;
+ } else {
+ _currentProfit += _items.get(assignment.itemId).profit;
+ }
+ }
+ return updatePropagator(revert, assignment);
+ }
+
+ @Override
+ public long currentProfit() {
+ return _currentProfit;
+ }
+
+ @Override
+ public long profitLowerBound() {
+ return _profitLowerBound;
+ }
+
+ @Override
+ public long profitUpperBound() {
+ return _profitUpperBound;
+ }
+
+ @Override
+ public void copyCurrentStateToSolution(boolean hasOnePropagator, ArrayList<Boolean> solution) {
+ if (solution == null) {
+ throw new RuntimeException("solution cannot be null!");
+ }
+ for (KnapsackItem item : _items) {
+ final int itemId = item.id;
+ solution.set(itemId, _state.isBound(itemId) && _state.isIn(itemId));
+ }
+ if (hasOnePropagator) {
+ copyCurrentStateToSolutionPropagator(solution);
+ }
+ }
+
+ protected abstract void initPropagator();
+
+ protected abstract boolean updatePropagator(boolean revert, final KnapsackAssignment assignment);
+
+ protected abstract void copyCurrentStateToSolutionPropagator(ArrayList<Boolean> solution);
+
+ protected KnapsackState state() {
+ return _state;
+ }
+
+ protected ArrayList<KnapsackItem> items() {
+ return _items;
+ }
+
+ protected void setProfitLowerBound(long profit) {
+ _profitLowerBound = profit;
+ }
+
+ protected void setProfitUpperBound(long profit) {
+ _profitUpperBound = profit;
+ }
+ }
http://git-wip-us.apache.org/repos/asf/helix/blob/713586c4/helix-core/src/main/java/org/apache/helix/controller/strategy/knapsack/BaseKnapsackSolver.java
----------------------------------------------------------------------
diff --cc helix-core/src/main/java/org/apache/helix/controller/strategy/knapsack/BaseKnapsackSolver.java
index 0000000,1d71a22..51221e5
mode 000000,100644..100644
--- a/helix-core/src/main/java/org/apache/helix/controller/strategy/knapsack/BaseKnapsackSolver.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/strategy/knapsack/BaseKnapsackSolver.java
@@@ -1,0 -1,49 +1,68 @@@
+ package org.apache.helix.controller.strategy.knapsack;
+
++/*
++ * Licensed to the Apache Software Foundation (ASF) under one
++ * or more contributor license agreements. See the NOTICE file
++ * distributed with this work for additional information
++ * regarding copyright ownership. The ASF licenses this file
++ * to you under the Apache License, Version 2.0 (the
++ * "License"); you may not use this file except in compliance
++ * with the License. You may obtain a copy of the License at
++ *
++ * http://www.apache.org/licenses/LICENSE-2.0
++ *
++ * Unless required by applicable law or agreed to in writing,
++ * software distributed under the License is distributed on an
++ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
++ * KIND, either express or implied. See the License for the
++ * specific language governing permissions and limitations
++ * under the License.
++ */
++
+ import java.util.ArrayList;
+
+ /**
+ * The interface of any multidimensional knapsack solver<br/>
+ * <br/>
+ * Based on the C++ knapsack solver in Google's or-tools package.
+ */
+ public interface BaseKnapsackSolver {
+ /**
+ * Initialize the solver
+ * @param profits profit of adding each item to the knapsack
+ * @param weights cost of adding each item in each dimension
+ * @param capacities maximum weight per dimension
+ */
+ void init(final ArrayList<Long> profits, final ArrayList<ArrayList<Long>> weights,
+ final ArrayList<Long> capacities);
+
+ /**
+ * Compute an upper and lower bound on the knapsack given the assignment state of the knapsack
+ * @param itemId the item id
+ * @param isItemIn true if the item is in the knapsack, false otherwise
+ * @param lowerBound the current lower bound
+ * @param upperBound the current upper bound
+ * @return the new lower and upper bounds
+ */
+ long[] getLowerAndUpperBoundWhenItem(int itemId, boolean isItemIn, long lowerBound,
+ long upperBound);
+
+ /**
+ * Solve the knapsack problem
+ * @return the (approximate) optimal profit
+ */
+ long solve();
+
+ /**
+ * Check if an item is in the final solution
+ * @param itemId the item id
+ * @return true if the item is present, false otherwise
+ */
+ boolean bestSolution(int itemId);
+
+ /**
+ * Get the solver name
+ * @return solver name
+ */
+ String getName();
+ }
http://git-wip-us.apache.org/repos/asf/helix/blob/713586c4/helix-core/src/main/java/org/apache/helix/controller/strategy/knapsack/KnapsackAssignment.java
----------------------------------------------------------------------
diff --cc helix-core/src/main/java/org/apache/helix/controller/strategy/knapsack/KnapsackAssignment.java
index 0000000,bfd29d7..50f58a7
mode 000000,100644..100644
--- a/helix-core/src/main/java/org/apache/helix/controller/strategy/knapsack/KnapsackAssignment.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/strategy/knapsack/KnapsackAssignment.java
@@@ -1,0 -1,21 +1,40 @@@
+ package org.apache.helix.controller.strategy.knapsack;
+
++/*
++ * Licensed to the Apache Software Foundation (ASF) under one
++ * or more contributor license agreements. See the NOTICE file
++ * distributed with this work for additional information
++ * regarding copyright ownership. The ASF licenses this file
++ * to you under the Apache License, Version 2.0 (the
++ * "License"); you may not use this file except in compliance
++ * with the License. You may obtain a copy of the License at
++ *
++ * http://www.apache.org/licenses/LICENSE-2.0
++ *
++ * Unless required by applicable law or agreed to in writing,
++ * software distributed under the License is distributed on an
++ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
++ * KIND, either express or implied. See the License for the
++ * specific language governing permissions and limitations
++ * under the License.
++ */
++
+ /**
+ * The assignment of a knapsack item to a knapsack<br/>
+ * <br/>
+ * Based on the C++ knapsack solver in Google's or-tools package.
+ */
+ public class KnapsackAssignment {
+ public int itemId;
+ public boolean isIn;
+
+ /**
+ * Create the assignment
+ * @param itemId the item id
+ * @param isIn true if the item is in the knapsack, false otherwise
+ */
+ public KnapsackAssignment(int itemId, boolean isIn) {
+ this.itemId = itemId;
+ this.isIn = isIn;
+ }
+ }
http://git-wip-us.apache.org/repos/asf/helix/blob/713586c4/helix-core/src/main/java/org/apache/helix/controller/strategy/knapsack/KnapsackCapacityPropagatorImpl.java
----------------------------------------------------------------------
diff --cc helix-core/src/main/java/org/apache/helix/controller/strategy/knapsack/KnapsackCapacityPropagatorImpl.java
index 0000000,357cc2a..e630b3c
mode 000000,100644..100644
--- a/helix-core/src/main/java/org/apache/helix/controller/strategy/knapsack/KnapsackCapacityPropagatorImpl.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/strategy/knapsack/KnapsackCapacityPropagatorImpl.java
@@@ -1,0 -1,218 +1,237 @@@
+ package org.apache.helix.controller.strategy.knapsack;
+
++/*
++ * Licensed to the Apache Software Foundation (ASF) under one
++ * or more contributor license agreements. See the NOTICE file
++ * distributed with this work for additional information
++ * regarding copyright ownership. The ASF licenses this file
++ * to you under the Apache License, Version 2.0 (the
++ * "License"); you may not use this file except in compliance
++ * with the License. You may obtain a copy of the License at
++ *
++ * http://www.apache.org/licenses/LICENSE-2.0
++ *
++ * Unless required by applicable law or agreed to in writing,
++ * software distributed under the License is distributed on an
++ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
++ * KIND, either express or implied. See the License for the
++ * specific language governing permissions and limitations
++ * under the License.
++ */
++
+ import java.util.ArrayList;
+ import java.util.Collections;
+ import java.util.Comparator;
+
+ /**
+ * A knapsack propagator that constrains assignments based on knapsack capacity for a given
+ * dimension<br/>
+ * <br/>
+ * Based on the C++ knapsack solver in Google's or-tools package.
+ */
+ public class KnapsackCapacityPropagatorImpl extends AbstractKnapsackPropagator {
+ private static final long ALL_BITS_64 = 0xFFFFFFFFFFFFFFFFL;
+ private static final int NO_SELECTION = -1;
+
+ private long _capacity;
+ private long _consumedCapacity;
+ private int _breakItemId;
+ private ArrayList<KnapsackItem> _sortedItems;
+ private long _profitMax;
+
+ /**
+ * Initialize the propagator
+ * @param state the current knapsack state
+ * @param capacity the knapsack capacity for this dimension
+ */
+ public KnapsackCapacityPropagatorImpl(KnapsackState state, long capacity) {
+ super(state);
+ _capacity = capacity;
+ _consumedCapacity = 0L;
+ _breakItemId = NO_SELECTION;
+ _sortedItems = new ArrayList<KnapsackItem>();
+ _profitMax = 0L;
+ }
+
+ @Override
+ public void computeProfitBounds() {
+ setProfitLowerBound(currentProfit());
+ _breakItemId = NO_SELECTION;
+
+ long remainingCapacity = _capacity - _consumedCapacity;
+ int breakSortedItemId = NO_SELECTION;
+ final int numberOfSortedItems = _sortedItems.size();
+ for (int sortedId = 0; sortedId < numberOfSortedItems; sortedId++) {
+ final KnapsackItem item = _sortedItems.get(sortedId);
+ if (!state().isBound(item.id)) {
+ _breakItemId = item.id;
+
+ if (remainingCapacity >= item.weight) {
+ remainingCapacity -= item.weight;
+ setProfitLowerBound(profitLowerBound() + item.profit);
+ } else {
+ breakSortedItemId = sortedId;
+ break;
+ }
+ }
+ }
+ setProfitUpperBound(profitLowerBound());
+ if (breakSortedItemId != NO_SELECTION) {
+ final long additionalProfit = getAdditionalProfit(remainingCapacity, breakSortedItemId);
+ setProfitUpperBound(profitUpperBound() + additionalProfit);
+ }
+ }
+
+ @Override
+ public int getNextItemId() {
+ return _breakItemId;
+ }
+
+ @Override
+ protected void initPropagator() {
+ _consumedCapacity = 0L;
+ _breakItemId = NO_SELECTION;
+ _sortedItems = new ArrayList<KnapsackItem>(items());
+ _profitMax = 0L;
+ for (KnapsackItem item : _sortedItems) {
+ _profitMax = Math.max(_profitMax, item.profit);
+ }
+ _profitMax++;
+ Collections.sort(_sortedItems, new KnapsackItemDecreasingEfficiencyComparator(_profitMax));
+ }
+
+ @Override
+ protected boolean updatePropagator(boolean revert, KnapsackAssignment assignment) {
+ if (assignment.isIn) {
+ if (revert) {
+ _consumedCapacity -= items().get(assignment.itemId).weight;
+ } else {
+ _consumedCapacity += items().get(assignment.itemId).weight;
+ if (_consumedCapacity > _capacity) {
+ return false;
+ }
+ }
+ }
+ return true;
+ }
+
+ @Override
+ protected void copyCurrentStateToSolutionPropagator(ArrayList<Boolean> solution) {
+ if (solution == null) {
+ throw new RuntimeException("solution cannot be null!");
+ }
+ long remainingCapacity = _capacity - _consumedCapacity;
+ for (KnapsackItem item : _sortedItems) {
+ if (!state().isBound(item.id)) {
+ if (remainingCapacity >= item.weight) {
+ remainingCapacity -= item.weight;
+ solution.set(item.id, true);
+ } else {
+ return;
+ }
+ }
+ }
+ }
+
+ private long getAdditionalProfit(long remainingCapacity, int breakItemId) {
+ final int afterBreakItemId = breakItemId + 1;
+ long additionalProfitWhenNoBreakItem = 0L;
+ if (afterBreakItemId < _sortedItems.size()) {
+ final long nextWeight = _sortedItems.get(afterBreakItemId).weight;
+ final long nextProfit = _sortedItems.get(afterBreakItemId).profit;
+ additionalProfitWhenNoBreakItem =
+ upperBoundOfRatio(remainingCapacity, nextProfit, nextWeight);
+ }
+
+ final int beforeBreakItemId = breakItemId - 1;
+ long additionalProfitWhenBreakItem = 0L;
+ if (beforeBreakItemId >= 0) {
+ final long previousWeight = _sortedItems.get(beforeBreakItemId).weight;
+ if (previousWeight != 0) {
+ final long previousProfit = _sortedItems.get(beforeBreakItemId).profit;
+ final long overusedCapacity = _sortedItems.get(breakItemId).weight - remainingCapacity;
+ final long ratio = upperBoundOfRatio(overusedCapacity, previousProfit, previousWeight);
+
+ additionalProfitWhenBreakItem = _sortedItems.get(breakItemId).profit - ratio;
+ }
+ }
+
+ final long additionalProfit =
+ Math.max(additionalProfitWhenNoBreakItem, additionalProfitWhenBreakItem);
+ return additionalProfit;
+ }
+
+ private int mostSignificantBitsPosition64(long n) {
+ int b = 0;
+ if (0 != (n & (ALL_BITS_64 << (1 << 5)))) {
+ b |= (1 << 5);
+ n >>= (1 << 5);
+ }
+ if (0 != (n & (ALL_BITS_64 << (1 << 4)))) {
+ b |= (1 << 4);
+ n >>= (1 << 4);
+ }
+ if (0 != (n & (ALL_BITS_64 << (1 << 3)))) {
+ b |= (1 << 3);
+ n >>= (1 << 3);
+ }
+ if (0 != (n & (ALL_BITS_64 << (1 << 2)))) {
+ b |= (1 << 2);
+ n >>= (1 << 2);
+ }
+ if (0 != (n & (ALL_BITS_64 << (1 << 1)))) {
+ b |= (1 << 1);
+ n >>= (1 << 1);
+ }
+ if (0 != (n & (ALL_BITS_64 << (1 << 0)))) {
+ b |= (1 << 0);
+ }
+ return b;
+ }
+
+ private boolean willProductOverflow(long value1, long value2) {
+ final int mostSignificantBitsPosition1 = mostSignificantBitsPosition64(value1);
+ final int mostSignificantBitsPosition2 = mostSignificantBitsPosition64(value2);
+ final int overflow = 61;
+ return mostSignificantBitsPosition1 + mostSignificantBitsPosition2 > overflow;
+ }
+
+ private long upperBoundOfRatio(long numerator1, long numerator2, long denominator) {
+ if (!willProductOverflow(numerator1, numerator2)) {
+ final long numerator = numerator1 * numerator2;
+ final long result = numerator / denominator;
+ return result;
+ } else {
+ final double ratio = (((double) numerator1) * ((double) numerator2)) / ((double) denominator);
+ final long result = ((long) Math.floor(ratio + 0.5));
+ return result;
+ }
+ }
+
+ /**
+ * A special comparator that orders knapsack items by decreasing efficiency (profit to weight
+ * ratio)
+ */
+ private static class KnapsackItemDecreasingEfficiencyComparator implements
+ Comparator<KnapsackItem> {
+ private final long _profitMax;
+
+ public KnapsackItemDecreasingEfficiencyComparator(long profitMax) {
+ _profitMax = profitMax;
+ }
+
+ @Override
+ public int compare(KnapsackItem item1, KnapsackItem item2) {
+ double eff1 = item1.getEfficiency(_profitMax);
+ double eff2 = item2.getEfficiency(_profitMax);
+ if (eff1 < eff2) {
+ return 1;
+ } else if (eff1 > eff2) {
+ return -1;
+ } else {
+ return 0;
+ }
+ }
+
+ }
+ }
http://git-wip-us.apache.org/repos/asf/helix/blob/713586c4/helix-core/src/main/java/org/apache/helix/controller/strategy/knapsack/KnapsackGenericSolverImpl.java
----------------------------------------------------------------------
diff --cc helix-core/src/main/java/org/apache/helix/controller/strategy/knapsack/KnapsackGenericSolverImpl.java
index 0000000,1bf1d3f..dec9b2d
mode 000000,100644..100644
--- a/helix-core/src/main/java/org/apache/helix/controller/strategy/knapsack/KnapsackGenericSolverImpl.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/strategy/knapsack/KnapsackGenericSolverImpl.java
@@@ -1,0 -1,269 +1,288 @@@
+ package org.apache.helix.controller.strategy.knapsack;
+
++/*
++ * Licensed to the Apache Software Foundation (ASF) under one
++ * or more contributor license agreements. See the NOTICE file
++ * distributed with this work for additional information
++ * regarding copyright ownership. The ASF licenses this file
++ * to you under the Apache License, Version 2.0 (the
++ * "License"); you may not use this file except in compliance
++ * with the License. You may obtain a copy of the License at
++ *
++ * http://www.apache.org/licenses/LICENSE-2.0
++ *
++ * Unless required by applicable law or agreed to in writing,
++ * software distributed under the License is distributed on an
++ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
++ * KIND, either express or implied. See the License for the
++ * specific language governing permissions and limitations
++ * under the License.
++ */
++
+ import java.util.ArrayList;
+ import java.util.Comparator;
+ import java.util.PriorityQueue;
+
+ /**
+ * A generic knapsack solver that supports multiple dimensions<br/>
+ * <br/>
+ * Based on the C++ knapsack solver in Google's or-tools package.
+ */
+ public class KnapsackGenericSolverImpl extends AbstractBaseKnapsackSolver {
+ private static final int MASTER_PROPAGATOR_ID = 0;
+ private static final int NO_SELECTION = -1;
+
+ private ArrayList<KnapsackPropagator> _propagators;
+ private int _masterPropagatorId;
+ private ArrayList<KnapsackSearchNode> _searchNodes;
+ private KnapsackState _state;
+ private long _bestSolutionProfit;
+ private ArrayList<Boolean> _bestSolution;
+
+ /**
+ * Create the solver
+ * @param solverName name of the solver
+ */
+ public KnapsackGenericSolverImpl(String solverName) {
+ super(solverName);
+ _propagators = new ArrayList<KnapsackPropagator>();
+ _masterPropagatorId = MASTER_PROPAGATOR_ID;
+ _searchNodes = new ArrayList<KnapsackSearchNode>();
+ _state = new KnapsackStateImpl();
+ _bestSolutionProfit = 0L;
+ _bestSolution = new ArrayList<Boolean>();
+ }
+
+ @Override
+ public void init(ArrayList<Long> profits, ArrayList<ArrayList<Long>> weights,
+ ArrayList<Long> capacities) {
+ clear();
+ final int numberOfItems = profits.size();
+ final int numberOfDimensions = weights.size();
+ _state.init(numberOfItems);
+
+ _bestSolution.clear();
+ for (int i = 0; i < numberOfItems; i++) {
+ _bestSolution.add(false);
+ }
+
+ for (int i = 0; i < numberOfDimensions; i++) {
+ KnapsackPropagator propagator = new KnapsackCapacityPropagatorImpl(_state, capacities.get(i));
+ propagator.init(profits, weights.get(i));
+ _propagators.add(propagator);
+ }
+ _masterPropagatorId = MASTER_PROPAGATOR_ID;
+ }
+
+ public int getNumberOfItems() {
+ return _state.getNumberOfItems();
+ }
+
+ @Override
+ public long[] getLowerAndUpperBoundWhenItem(int itemId, boolean isItemIn, long lowerBound,
+ long upperBound) {
+ long[] result = {
+ lowerBound, upperBound
+ };
+ KnapsackAssignment assignment = new KnapsackAssignment(itemId, isItemIn);
+ final boolean fail = !incrementalUpdate(false, assignment);
+ if (fail) {
+ result[0] = 0L;
+ result[1] = 0L;
+ } else {
+ result[0] =
+ (hasOnePropagator()) ? _propagators.get(_masterPropagatorId).profitLowerBound() : 0L;
+ result[1] = getAggregatedProfitUpperBound();
+ }
+
+ final boolean failRevert = !incrementalUpdate(true, assignment);
+ if (failRevert) {
+ result[0] = 0L;
+ result[1] = 0L;
+ }
+ return result;
+ }
+
+ public void setMasterPropagatorId(int masterPropagatorId) {
+ _masterPropagatorId = masterPropagatorId;
+ }
+
+ @Override
+ public long solve() {
+ _bestSolutionProfit = 0L;
+ PriorityQueue<KnapsackSearchNode> searchQueue =
+ new PriorityQueue<KnapsackSearchNode>(11,
+ new KnapsackSearchNodeInDecreasingUpperBoundComparator());
+ KnapsackAssignment assignment = new KnapsackAssignment(NO_SELECTION, true);
+ KnapsackSearchNode rootNode = new KnapsackSearchNodeImpl(null, assignment);
+ rootNode.setCurrentProfit(getCurrentProfit());
+ rootNode.setProfitUpperBound(getAggregatedProfitUpperBound());
+ rootNode.setNextItemId(getNextItemId());
+ _searchNodes.add(rootNode);
+
+ if (makeNewNode(rootNode, false)) {
+ searchQueue.add(_searchNodes.get(_searchNodes.size() - 1));
+ }
+ if (makeNewNode(rootNode, true)) {
+ searchQueue.add(_searchNodes.get(_searchNodes.size() - 1));
+ }
+
+ KnapsackSearchNode currentNode = rootNode;
+ while (!searchQueue.isEmpty() && searchQueue.peek().profitUpperBound() > _bestSolutionProfit) {
+ KnapsackSearchNode node = searchQueue.poll();
+
+ // TODO: check if equality is enough
+ if (node != currentNode) {
+ KnapsackSearchPath path = new KnapsackSearchPathImpl(currentNode, node);
+ path.init();
+ final boolean noFail = updatePropagators(path);
+ currentNode = node;
+ if (!noFail) {
+ throw new RuntimeException("solver failed to update propagators");
+ }
+ }
+
+ if (makeNewNode(node, false)) {
+ searchQueue.add(_searchNodes.get(_searchNodes.size() - 1));
+ }
+ if (makeNewNode(node, true)) {
+ searchQueue.add(_searchNodes.get(_searchNodes.size() - 1));
+ }
+ }
+ return _bestSolutionProfit;
+ }
+
+ @Override
+ public boolean bestSolution(int itemId) {
+ return _bestSolution.get(itemId);
+ }
+
+ private void clear() {
+ _propagators.clear();
+ _searchNodes.clear();
+ }
+
+ private boolean updatePropagators(final KnapsackSearchPath path) {
+ boolean noFail = true;
+ KnapsackSearchNode node = path.from();
+ KnapsackSearchNode via = path.via();
+ while (node != via) {
+ noFail = incrementalUpdate(true, node.assignment()) && noFail;
+ node = node.parent();
+ }
+ node = path.to();
+ while (node != via) {
+ noFail = incrementalUpdate(false, node.assignment()) && noFail;
+ node = node.parent();
+ }
+ return noFail;
+ }
+
+ private boolean incrementalUpdate(boolean revert, final KnapsackAssignment assignment) {
+ boolean noFail = _state.updateState(revert, assignment);
+ for (KnapsackPropagator propagator : _propagators) {
+ noFail = propagator.update(revert, assignment) && noFail;
+ }
+ return noFail;
+ }
+
+ private void updateBestSolution() {
+ final long profitLowerBound =
+ (hasOnePropagator()) ? _propagators.get(_masterPropagatorId).profitLowerBound()
+ : _propagators.get(_masterPropagatorId).currentProfit();
+
+ if (_bestSolutionProfit < profitLowerBound) {
+ _bestSolutionProfit = profitLowerBound;
+ _propagators.get(_masterPropagatorId).copyCurrentStateToSolution(hasOnePropagator(),
+ _bestSolution);
+ }
+ }
+
+ private boolean makeNewNode(final KnapsackSearchNode node, boolean isIn) {
+ if (node.nextItemId() == NO_SELECTION) {
+ return false;
+ }
+ KnapsackAssignment assignment = new KnapsackAssignment(node.nextItemId(), isIn);
+ KnapsackSearchNode newNode = new KnapsackSearchNodeImpl(node, assignment);
+
+ KnapsackSearchPath path = new KnapsackSearchPathImpl(node, newNode);
+ path.init();
+ final boolean noFail = updatePropagators(path);
+ if (noFail) {
+ newNode.setCurrentProfit(getCurrentProfit());
+ newNode.setProfitUpperBound(getAggregatedProfitUpperBound());
+ newNode.setNextItemId(getNextItemId());
+ updateBestSolution();
+ }
+
+ KnapsackSearchPath revertPath = new KnapsackSearchPathImpl(newNode, node);
+ revertPath.init();
+ updatePropagators(revertPath);
+
+ if (!noFail || newNode.profitUpperBound() < _bestSolutionProfit) {
+ return false;
+ }
+
+ KnapsackSearchNode relevantNode = new KnapsackSearchNodeImpl(node, assignment);
+ relevantNode.setCurrentProfit(newNode.currentProfit());
+ relevantNode.setProfitUpperBound(newNode.profitUpperBound());
+ relevantNode.setNextItemId(newNode.nextItemId());
+ _searchNodes.add(relevantNode);
+
+ return true;
+ }
+
+ private long getAggregatedProfitUpperBound() {
+ long upperBound = Long.MAX_VALUE;
+ for (KnapsackPropagator propagator : _propagators) {
+ propagator.computeProfitBounds();
+ final long propagatorUpperBound = propagator.profitUpperBound();
+ upperBound = Math.min(upperBound, propagatorUpperBound);
+ }
+ return upperBound;
+ }
+
+ private boolean hasOnePropagator() {
+ return _propagators.size() == 1;
+ }
+
+ private long getCurrentProfit() {
+ return _propagators.get(_masterPropagatorId).currentProfit();
+ }
+
+ private int getNextItemId() {
+ return _propagators.get(_masterPropagatorId).getNextItemId();
+ }
+
+ /**
+ * A special comparator that orders knapsack search nodes in decreasing potential profit order
+ */
+ // TODO: check order
+ private static class KnapsackSearchNodeInDecreasingUpperBoundComparator implements
+ Comparator<KnapsackSearchNode> {
+ @Override
+ public int compare(KnapsackSearchNode node1, KnapsackSearchNode node2) {
+ final long profitUpperBound1 = node1.profitUpperBound();
+ final long profitUpperBound2 = node2.profitUpperBound();
+ if (profitUpperBound1 == profitUpperBound2) {
+ final long currentProfit1 = node1.currentProfit();
+ final long currentProfit2 = node2.currentProfit();
+ if (currentProfit1 > currentProfit2) {
+ return -1;
+ } else if (currentProfit1 < currentProfit2) {
+ return 1;
+ } else {
+ return 0;
+ }
+ }
+ if (profitUpperBound1 > profitUpperBound2) {
+ return -1;
+ } else if (profitUpperBound1 < profitUpperBound2) {
+ return 1;
+ } else {
+ return 0;
+ }
+ }
+
+ }
+ }
http://git-wip-us.apache.org/repos/asf/helix/blob/713586c4/helix-core/src/main/java/org/apache/helix/controller/strategy/knapsack/KnapsackItem.java
----------------------------------------------------------------------
diff --cc helix-core/src/main/java/org/apache/helix/controller/strategy/knapsack/KnapsackItem.java
index 0000000,3996816..70824a9
mode 000000,100644..100644
--- a/helix-core/src/main/java/org/apache/helix/controller/strategy/knapsack/KnapsackItem.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/strategy/knapsack/KnapsackItem.java
@@@ -1,0 -1,33 +1,52 @@@
+ package org.apache.helix.controller.strategy.knapsack;
+
++/*
++ * Licensed to the Apache Software Foundation (ASF) under one
++ * or more contributor license agreements. See the NOTICE file
++ * distributed with this work for additional information
++ * regarding copyright ownership. The ASF licenses this file
++ * to you under the Apache License, Version 2.0 (the
++ * "License"); you may not use this file except in compliance
++ * with the License. You may obtain a copy of the License at
++ *
++ * http://www.apache.org/licenses/LICENSE-2.0
++ *
++ * Unless required by applicable law or agreed to in writing,
++ * software distributed under the License is distributed on an
++ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
++ * KIND, either express or implied. See the License for the
++ * specific language governing permissions and limitations
++ * under the License.
++ */
++
+ /**
+ * Basic structure of an item in a knapsack<br/>
+ * <br/>
+ * Based on the C++ knapsack solver in Google's or-tools package.
+ */
+ public class KnapsackItem {
+ public final int id;
+ public final long weight;
+ public final long profit;
+
+ /**
+ * Initialize the item
+ * @param id the item id
+ * @param weight the cost to place the item in the knapsack for one dimension
+ * @param profit the benefit of placing the item in the knapsack
+ */
+ public KnapsackItem(int id, long weight, long profit) {
+ this.id = id;
+ this.weight = weight;
+ this.profit = profit;
+ }
+
+ /**
+ * Get the profit to weight ratio
+ * @param profitMax the maximum possible profit for this item
+ * @return the item addition effciency
+ */
+ public double getEfficiency(long profitMax) {
+ return (weight > 0) ? ((double) profit) / ((double) weight) : ((double) profitMax);
+ }
+ }
http://git-wip-us.apache.org/repos/asf/helix/blob/713586c4/helix-core/src/main/java/org/apache/helix/controller/strategy/knapsack/KnapsackPropagator.java
----------------------------------------------------------------------
diff --cc helix-core/src/main/java/org/apache/helix/controller/strategy/knapsack/KnapsackPropagator.java
index 0000000,702bf1e..cb3eca7
mode 000000,100644..100644
--- a/helix-core/src/main/java/org/apache/helix/controller/strategy/knapsack/KnapsackPropagator.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/strategy/knapsack/KnapsackPropagator.java
@@@ -1,0 -1,61 +1,80 @@@
+ package org.apache.helix.controller.strategy.knapsack;
+
++/*
++ * Licensed to the Apache Software Foundation (ASF) under one
++ * or more contributor license agreements. See the NOTICE file
++ * distributed with this work for additional information
++ * regarding copyright ownership. The ASF licenses this file
++ * to you under the Apache License, Version 2.0 (the
++ * "License"); you may not use this file except in compliance
++ * with the License. You may obtain a copy of the License at
++ *
++ * http://www.apache.org/licenses/LICENSE-2.0
++ *
++ * Unless required by applicable law or agreed to in writing,
++ * software distributed under the License is distributed on an
++ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
++ * KIND, either express or implied. See the License for the
++ * specific language governing permissions and limitations
++ * under the License.
++ */
++
+ import java.util.ArrayList;
+
+ /**
+ * Constraint enforcer for a single dimenstion on a knapsack solution search<br/>
+ * <br/>
+ * Based on the C++ knapsack solver in Google's or-tools package.
+ */
+ public interface KnapsackPropagator {
+ /**
+ * Initialize the propagator
+ * @param profits profits for selecting each item
+ * @param weights weights of each item for this dimension
+ */
+ void init(final ArrayList<Long> profits, final ArrayList<Long> weights);
+
+ /**
+ * Update the search
+ * @param revert revert the assignment
+ * @param assignment the assignment to use for the update
+ * @return true if successful, false if failed
+ */
+ boolean update(boolean revert, final KnapsackAssignment assignment);
+
+ /**
+ * Compute the upper and lower bounds of potential profits
+ */
+ void computeProfitBounds();
+
+ /**
+ * Get the next item to use in the search
+ * @return item id
+ */
+ int getNextItemId();
+
+ /**
+ * Get the current profit of the search
+ * @return current profit
+ */
+ long currentProfit();
+
+ /**
+ * Get the lowest possible profit of the search
+ * @return profit lower bound
+ */
+ long profitLowerBound();
+
+ /**
+ * Get the highest possible profit of the search
+ * @return profit upper bound
+ */
+ long profitUpperBound();
+
+ /**
+ * Copy the current computed state to the final solution
+ * @param hasOnePropagator true if there is only one propagator, i.e. 1 dimension
+ * @param solution the solution vector
+ */
+ void copyCurrentStateToSolution(boolean hasOnePropagator, ArrayList<Boolean> solution);
+ }