You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by ka...@apache.org on 2014/07/10 19:05:32 UTC
[49/50] [abbrv] Merge remote-tracking branch
'origin/helix-provisioning'
http://git-wip-us.apache.org/repos/asf/helix/blob/713586c4/helix-core/src/main/java/org/apache/helix/controller/strategy/knapsack/KnapsackSearchNode.java
----------------------------------------------------------------------
diff --cc helix-core/src/main/java/org/apache/helix/controller/strategy/knapsack/KnapsackSearchNode.java
index 0000000,1ac8061..1ae635b
mode 000000,100644..100644
--- a/helix-core/src/main/java/org/apache/helix/controller/strategy/knapsack/KnapsackSearchNode.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/strategy/knapsack/KnapsackSearchNode.java
@@@ -1,0 -1,62 +1,81 @@@
+ package org.apache.helix.controller.strategy.knapsack;
+
++/*
++ * Licensed to the Apache Software Foundation (ASF) under one
++ * or more contributor license agreements. See the NOTICE file
++ * distributed with this work for additional information
++ * regarding copyright ownership. The ASF licenses this file
++ * to you under the Apache License, Version 2.0 (the
++ * "License"); you may not use this file except in compliance
++ * with the License. You may obtain a copy of the License at
++ *
++ * http://www.apache.org/licenses/LICENSE-2.0
++ *
++ * Unless required by applicable law or agreed to in writing,
++ * software distributed under the License is distributed on an
++ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
++ * KIND, either express or implied. See the License for the
++ * specific language governing permissions and limitations
++ * under the License.
++ */
++
+ /**
+ * Description of a knapsack element during the search process<br/>
+ * <br/>
+ * Based on the C++ knapsack solver in Google's or-tools package.
+ */
+ public interface KnapsackSearchNode {
+ /**
+ * Depth of the node in this search
+ * @return node depth
+ */
+ int depth();
+
+ /**
+ * The parent node in this search
+ * @return the node's immediate parent
+ */
+ KnapsackSearchNode parent();
+
+ /**
+ * The current node assignment
+ * @return KnapsackAssignment instance
+ */
+ KnapsackAssignment assignment();
+
+ /**
+ * The current profit with this node and search
+ * @return current profit
+ */
+ long currentProfit();
+
+ /**
+ * Set the current profit with this node and search
+ * @param profit current profit
+ */
+ void setCurrentProfit(long profit);
+
+ /**
+ * The maximum possible profit with this node and search
+ * @return profit upper bound
+ */
+ long profitUpperBound();
+
+ /**
+ * Set the maximum possible profit with this node and search
+ * @param profit profit upper bound
+ */
+ void setProfitUpperBound(long profit);
+
+ /**
+ * The next item given this node and search
+ * @return next item id
+ */
+ int nextItemId();
+
+ /**
+ * Set the next item given this node and search
+ * @param id next item id
+ */
+ void setNextItemId(int id);
+ }
http://git-wip-us.apache.org/repos/asf/helix/blob/713586c4/helix-core/src/main/java/org/apache/helix/controller/strategy/knapsack/KnapsackSearchNodeImpl.java
----------------------------------------------------------------------
diff --cc helix-core/src/main/java/org/apache/helix/controller/strategy/knapsack/KnapsackSearchNodeImpl.java
index 0000000,ea9cb98..aeba786
mode 000000,100644..100644
--- a/helix-core/src/main/java/org/apache/helix/controller/strategy/knapsack/KnapsackSearchNodeImpl.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/strategy/knapsack/KnapsackSearchNodeImpl.java
@@@ -1,0 -1,77 +1,96 @@@
+ package org.apache.helix.controller.strategy.knapsack;
+
++/*
++ * Licensed to the Apache Software Foundation (ASF) under one
++ * or more contributor license agreements. See the NOTICE file
++ * distributed with this work for additional information
++ * regarding copyright ownership. The ASF licenses this file
++ * to you under the Apache License, Version 2.0 (the
++ * "License"); you may not use this file except in compliance
++ * with the License. You may obtain a copy of the License at
++ *
++ * http://www.apache.org/licenses/LICENSE-2.0
++ *
++ * Unless required by applicable law or agreed to in writing,
++ * software distributed under the License is distributed on an
++ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
++ * KIND, either express or implied. See the License for the
++ * specific language governing permissions and limitations
++ * under the License.
++ */
++
+ /**
+ * Implementation of {@link KnapsackSearchNode}<br/>
+ * <br/>
+ * Based on the C++ knapsack solver in Google's or-tools package.
+ */
+ public class KnapsackSearchNodeImpl implements KnapsackSearchNode {
+ private static final int NO_SELECTION = -1;
+
+ private int _depth;
+ private KnapsackSearchNode _parent;
+ private KnapsackAssignment _assignment;
+ private long _currentProfit;
+ private long _profitUpperBound;
+ private int _nextItemId;
+
+ /**
+ * Initialize a search node
+ * @param parent the node's parent
+ * @param assignment the node's assignment
+ */
+ public KnapsackSearchNodeImpl(final KnapsackSearchNode parent, final KnapsackAssignment assignment) {
+ _depth = (parent == null) ? 0 : parent.depth() + 1;
+ _parent = parent;
+ _assignment = assignment;
+ _currentProfit = 0L;
+ _profitUpperBound = Long.MAX_VALUE;
+ _nextItemId = NO_SELECTION;
+ }
+
+ @Override
+ public int depth() {
+ return _depth;
+ }
+
+ @Override
+ public KnapsackSearchNode parent() {
+ return _parent;
+ }
+
+ @Override
+ public KnapsackAssignment assignment() {
+ return _assignment;
+ }
+
+ @Override
+ public long currentProfit() {
+ return _currentProfit;
+ }
+
+ @Override
+ public void setCurrentProfit(long profit) {
+ _currentProfit = profit;
+ }
+
+ @Override
+ public long profitUpperBound() {
+ return _profitUpperBound;
+ }
+
+ @Override
+ public void setProfitUpperBound(long profit) {
+ _profitUpperBound = profit;
+ }
+
+ @Override
+ public int nextItemId() {
+ return _nextItemId;
+ }
+
+ @Override
+ public void setNextItemId(int id) {
+ _nextItemId = id;
+ }
+
+ }
http://git-wip-us.apache.org/repos/asf/helix/blob/713586c4/helix-core/src/main/java/org/apache/helix/controller/strategy/knapsack/KnapsackSearchPath.java
----------------------------------------------------------------------
diff --cc helix-core/src/main/java/org/apache/helix/controller/strategy/knapsack/KnapsackSearchPath.java
index 0000000,d977143..012e9c0
mode 000000,100644..100644
--- a/helix-core/src/main/java/org/apache/helix/controller/strategy/knapsack/KnapsackSearchPath.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/strategy/knapsack/KnapsackSearchPath.java
@@@ -1,0 -1,39 +1,58 @@@
+ package org.apache.helix.controller.strategy.knapsack;
+
++/*
++ * Licensed to the Apache Software Foundation (ASF) under one
++ * or more contributor license agreements. See the NOTICE file
++ * distributed with this work for additional information
++ * regarding copyright ownership. The ASF licenses this file
++ * to you under the Apache License, Version 2.0 (the
++ * "License"); you may not use this file except in compliance
++ * with the License. You may obtain a copy of the License at
++ *
++ * http://www.apache.org/licenses/LICENSE-2.0
++ *
++ * Unless required by applicable law or agreed to in writing,
++ * software distributed under the License is distributed on an
++ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
++ * KIND, either express or implied. See the License for the
++ * specific language governing permissions and limitations
++ * under the License.
++ */
++
+ /**
+ * Construction of the path between search nodes in a knapsack<br/>
+ * <br/>
+ * Based on the C++ knapsack solver in Google's or-tools package.
+ */
+ public interface KnapsackSearchPath {
+ /**
+ * Initialize the path
+ */
+ void init();
+
+ /**
+ * Get the source node
+ * @return starting KnapsackSearchNode
+ */
+ KnapsackSearchNode from();
+
+ /**
+ * Get the intermediate node
+ * @return KnapsackSearchNode between source and destination
+ */
+ KnapsackSearchNode via();
+
+ /**
+ * Get the destination node
+ * @return terminating KnapsackSearchNode
+ */
+ KnapsackSearchNode to();
+
+ /**
+ * Get an ancestor of a given search node
+ * @param node the search node
+ * @param depth the depth of the ancestor
+ * @return the ancestor node
+ */
+ KnapsackSearchNode moveUpToDepth(final KnapsackSearchNode node, int depth);
+ }
http://git-wip-us.apache.org/repos/asf/helix/blob/713586c4/helix-core/src/main/java/org/apache/helix/controller/strategy/knapsack/KnapsackSearchPathImpl.java
----------------------------------------------------------------------
diff --cc helix-core/src/main/java/org/apache/helix/controller/strategy/knapsack/KnapsackSearchPathImpl.java
index 0000000,06a9ec7..1e02768
mode 000000,100644..100644
--- a/helix-core/src/main/java/org/apache/helix/controller/strategy/knapsack/KnapsackSearchPathImpl.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/strategy/knapsack/KnapsackSearchPathImpl.java
@@@ -1,0 -1,65 +1,84 @@@
+ package org.apache.helix.controller.strategy.knapsack;
+
++/*
++ * Licensed to the Apache Software Foundation (ASF) under one
++ * or more contributor license agreements. See the NOTICE file
++ * distributed with this work for additional information
++ * regarding copyright ownership. The ASF licenses this file
++ * to you under the Apache License, Version 2.0 (the
++ * "License"); you may not use this file except in compliance
++ * with the License. You may obtain a copy of the License at
++ *
++ * http://www.apache.org/licenses/LICENSE-2.0
++ *
++ * Unless required by applicable law or agreed to in writing,
++ * software distributed under the License is distributed on an
++ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
++ * KIND, either express or implied. See the License for the
++ * specific language governing permissions and limitations
++ * under the License.
++ */
++
+ /**
+ * Implementation of {@link KnapsackSearchPath}<br/>
+ * <br/>
+ * Based on the C++ knapsack solver in Google's or-tools package.
+ */
+ public class KnapsackSearchPathImpl implements KnapsackSearchPath {
+ private KnapsackSearchNode _from;
+ private KnapsackSearchNode _via;
+ private KnapsackSearchNode _to;
+
+ /**
+ * Create a search path between nodes in a knapsack
+ * @param from the source node
+ * @param to the destination node
+ */
+ public KnapsackSearchPathImpl(final KnapsackSearchNode from, final KnapsackSearchNode to) {
+ _from = from;
+ _via = null;
+ _to = to;
+ }
+
+ @Override
+ public void init() {
+ KnapsackSearchNode nodeFrom = moveUpToDepth(_from, _to.depth());
+ KnapsackSearchNode nodeTo = moveUpToDepth(_to, _from.depth());
+ if (nodeFrom.depth() != nodeTo.depth()) {
+ throw new RuntimeException("to and from depths do not match!");
+ }
+
+ // Find common parent
+ // TODO: check if basic equality is enough
+ while (nodeFrom != nodeTo) {
+ nodeFrom = nodeFrom.parent();
+ nodeTo = nodeTo.parent();
+ }
+ _via = nodeFrom;
+ }
+
+ @Override
+ public KnapsackSearchNode from() {
+ return _from;
+ }
+
+ @Override
+ public KnapsackSearchNode via() {
+ return _via;
+ }
+
+ @Override
+ public KnapsackSearchNode to() {
+ return _to;
+ }
+
+ @Override
+ public KnapsackSearchNode moveUpToDepth(KnapsackSearchNode node, int depth) {
+ KnapsackSearchNode currentNode = node;
+ while (currentNode.depth() > depth) {
+ currentNode = currentNode.parent();
+ }
+ return currentNode;
+ }
+
+ }
http://git-wip-us.apache.org/repos/asf/helix/blob/713586c4/helix-core/src/main/java/org/apache/helix/controller/strategy/knapsack/KnapsackSolver.java
----------------------------------------------------------------------
diff --cc helix-core/src/main/java/org/apache/helix/controller/strategy/knapsack/KnapsackSolver.java
index 0000000,832a470..624082b
mode 000000,100644..100644
--- a/helix-core/src/main/java/org/apache/helix/controller/strategy/knapsack/KnapsackSolver.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/strategy/knapsack/KnapsackSolver.java
@@@ -1,0 -1,60 +1,79 @@@
+ package org.apache.helix.controller.strategy.knapsack;
+
++/*
++ * Licensed to the Apache Software Foundation (ASF) under one
++ * or more contributor license agreements. See the NOTICE file
++ * distributed with this work for additional information
++ * regarding copyright ownership. The ASF licenses this file
++ * to you under the Apache License, Version 2.0 (the
++ * "License"); you may not use this file except in compliance
++ * with the License. You may obtain a copy of the License at
++ *
++ * http://www.apache.org/licenses/LICENSE-2.0
++ *
++ * Unless required by applicable law or agreed to in writing,
++ * software distributed under the License is distributed on an
++ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
++ * KIND, either express or implied. See the License for the
++ * specific language governing permissions and limitations
++ * under the License.
++ */
++
+ import java.util.ArrayList;
+
+ /**
+ * Interface for a factory of multidimensional 0-1 knapsack solvers that support reductions<br/>
+ * <br/>
+ * Based on the C++ knapsack solver in Google's or-tools package.
+ */
+ public interface KnapsackSolver {
+ /**
+ * Collection of supported algorithms
+ */
+ enum SolverType {
+ /**
+ * A solver that uses the branch-and-bound technique, supports multiple dimensions
+ */
+ KNAPSACK_MULTIDIMENSION_BRANCH_AND_BOUND_SOLVER
+ }
+
+ /**
+ * Initialize the solver
+ * @param profits profit for each element if selected
+ * @param weights cost of each element in each dimension
+ * @param capacities maximum total weight in each dimension
+ */
+ void init(final ArrayList<Long> profits, final ArrayList<ArrayList<Long>> weights,
+ final ArrayList<Long> capacities);
+
+ /**
+ * Solve the knapsack problem
+ * @return the approximated optimal weight
+ */
+ long solve();
+
+ /**
+ * Check if an element was selected in the optimal solution
+ * @param itemId the index of the element to check
+ * @return true if the item is present, false otherwise
+ */
+ boolean bestSolutionContains(int itemId);
+
+ /**
+ * Get the name of this solver
+ * @return solver name
+ */
+ String getName();
+
+ /**
+ * Check if a reduction should be used to prune paths early on
+ * @return true if reduction enabled, false otherwise
+ */
+ boolean useReduction();
+
+ /**
+ * Set whether a reduction should be used to prune paths early on
+ * @param useReduction true to enable, false to disable
+ */
+ void setUseReduction(boolean useReduction);
+ }
http://git-wip-us.apache.org/repos/asf/helix/blob/713586c4/helix-core/src/main/java/org/apache/helix/controller/strategy/knapsack/KnapsackSolverImpl.java
----------------------------------------------------------------------
diff --cc helix-core/src/main/java/org/apache/helix/controller/strategy/knapsack/KnapsackSolverImpl.java
index 0000000,eeab0b1..2d521f0
mode 000000,100644..100644
--- a/helix-core/src/main/java/org/apache/helix/controller/strategy/knapsack/KnapsackSolverImpl.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/strategy/knapsack/KnapsackSolverImpl.java
@@@ -1,0 -1,191 +1,210 @@@
+ package org.apache.helix.controller.strategy.knapsack;
+
++/*
++ * Licensed to the Apache Software Foundation (ASF) under one
++ * or more contributor license agreements. See the NOTICE file
++ * distributed with this work for additional information
++ * regarding copyright ownership. The ASF licenses this file
++ * to you under the Apache License, Version 2.0 (the
++ * "License"); you may not use this file except in compliance
++ * with the License. You may obtain a copy of the License at
++ *
++ * http://www.apache.org/licenses/LICENSE-2.0
++ *
++ * Unless required by applicable law or agreed to in writing,
++ * software distributed under the License is distributed on an
++ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
++ * KIND, either express or implied. See the License for the
++ * specific language governing permissions and limitations
++ * under the License.
++ */
++
+ import java.util.ArrayList;
+
+ /**
+ * Implementation of {@link KnapsackSolver}<br/>
+ * <br/>
+ * Based on the C++ knapsack solver in Google's or-tools package.
+ */
+ public class KnapsackSolverImpl implements KnapsackSolver {
+ private final BaseKnapsackSolver _solver;
+ private final ArrayList<Boolean> _knownValue;
+ private final ArrayList<Boolean> _bestSolution;
+ private final ArrayList<Integer> _mappingReducedItemId;
+ private boolean _isProblemSolved;
+ private long _additionalProfit;
+ private boolean _useReduction;
+
+ /**
+ * Initialize a generic knapsack solver
+ * @param solverName the name of the solver
+ */
+ public KnapsackSolverImpl(String solverName) {
+ _solver = new KnapsackGenericSolverImpl(solverName);
+ _knownValue = new ArrayList<Boolean>();
+ _bestSolution = new ArrayList<Boolean>();
+ _mappingReducedItemId = new ArrayList<Integer>();
+ _isProblemSolved = false;
+ _additionalProfit = 0L;
+ _useReduction = true;
+ }
+
+ /**
+ * Initialize a specified knapsack solver
+ * @param solverType the type of solver
+ * @param solverName the name of the solver
+ */
+ public KnapsackSolverImpl(SolverType solverType, String solverName) {
+ _knownValue = new ArrayList<Boolean>();
+ _bestSolution = new ArrayList<Boolean>();
+ _mappingReducedItemId = new ArrayList<Integer>();
+ _isProblemSolved = false;
+ _additionalProfit = 0L;
+ _useReduction = true;
+ BaseKnapsackSolver solver = null;
+ switch (solverType) {
+ case KNAPSACK_MULTIDIMENSION_BRANCH_AND_BOUND_SOLVER:
+ solver = new KnapsackGenericSolverImpl(solverName);
+ break;
+ default:
+ throw new RuntimeException("Solver " + solverType + " not supported");
+ }
+ _solver = solver;
+ }
+
+ @Override
+ public void init(ArrayList<Long> profits, ArrayList<ArrayList<Long>> weights,
+ ArrayList<Long> capacities) {
+ _additionalProfit = 0L;
+ _isProblemSolved = false;
+ _solver.init(profits, weights, capacities);
+ if (_useReduction) {
+ final int numItems = profits.size();
+ final int numReducedItems = reduceProblem(numItems);
+
+ if (numReducedItems > 0) {
+ computeAdditionalProfit(profits);
+ }
+
+ if (numReducedItems > 0 && numReducedItems < numItems) {
+ initReducedProblem(profits, weights, capacities);
+ }
+ }
+ }
+
+ @Override
+ public long solve() {
+ return _additionalProfit + ((_isProblemSolved) ? 0 : _solver.solve());
+ }
+
+ @Override
+ public boolean bestSolutionContains(int itemId) {
+ final int mappedItemId = (_useReduction) ? _mappingReducedItemId.get(itemId) : itemId;
+ return (_useReduction && _knownValue.get(itemId)) ? _bestSolution.get(itemId) : _solver
+ .bestSolution(mappedItemId);
+ }
+
+ @Override
+ public String getName() {
+ return _solver.getName();
+ }
+
+ @Override
+ public boolean useReduction() {
+ return _useReduction;
+ }
+
+ @Override
+ public void setUseReduction(boolean useReduction) {
+ _useReduction = useReduction;
+ }
+
+ private int reduceProblem(int numItems) {
+ _knownValue.clear();
+ _bestSolution.clear();
+ _mappingReducedItemId.clear();
+ ArrayList<Long> j0UpperBounds = new ArrayList<Long>();
+ ArrayList<Long> j1UpperBounds = new ArrayList<Long>();
+ for (int i = 0; i < numItems; i++) {
+ _knownValue.add(false);
+ _bestSolution.add(false);
+ _mappingReducedItemId.add(i);
+ j0UpperBounds.add(Long.MAX_VALUE);
+ j1UpperBounds.add(Long.MAX_VALUE);
+ }
+ _additionalProfit = 0L;
+ long bestLowerBound = 0L;
+ for (int itemId = 0; itemId < numItems; itemId++) {
+ long upperBound = 0L;
+ long lowerBound = Long.MAX_VALUE;
+ long[] bounds = _solver.getLowerAndUpperBoundWhenItem(itemId, false, upperBound, lowerBound);
+ lowerBound = bounds[0];
+ upperBound = bounds[1];
+ j1UpperBounds.set(itemId, upperBound);
+ bestLowerBound = Math.max(bestLowerBound, lowerBound);
+ bounds = _solver.getLowerAndUpperBoundWhenItem(itemId, true, upperBound, lowerBound);
+ lowerBound = bounds[0];
+ upperBound = bounds[1];
+ j0UpperBounds.set(itemId, upperBound);
+ bestLowerBound = Math.max(bestLowerBound, lowerBound);
+ }
+
+ int numReducedItems = 0;
+ for (int itemId = 0; itemId < numItems; itemId++) {
+ if (bestLowerBound > j0UpperBounds.get(itemId)) {
+ _knownValue.set(itemId, true);
+ _bestSolution.set(itemId, false);
+ numReducedItems++;
+ } else if (bestLowerBound > j1UpperBounds.get(itemId)) {
+ _knownValue.set(itemId, true);
+ _bestSolution.set(itemId, true);
+ numReducedItems++;
+ }
+ }
+ _isProblemSolved = numReducedItems == numItems;
+ return numReducedItems;
+ }
+
+ private void computeAdditionalProfit(final ArrayList<Long> profits) {
+ final int numItems = profits.size();
+ _additionalProfit = 0L;
+ for (int itemId = 0; itemId < numItems; itemId++) {
+ if (_knownValue.get(itemId) && _bestSolution.get(itemId)) {
+ _additionalProfit += profits.get(itemId);
+ }
+ }
+ }
+
+ private void initReducedProblem(final ArrayList<Long> profits,
+ final ArrayList<ArrayList<Long>> weights, final ArrayList<Long> capacities) {
+ final int numItems = profits.size();
+ final int numDimensions = capacities.size();
+
+ ArrayList<Long> reducedProfits = new ArrayList<Long>();
+ for (int itemId = 0; itemId < numItems; itemId++) {
+ if (!_knownValue.get(itemId)) {
+ _mappingReducedItemId.set(itemId, reducedProfits.size());
+ reducedProfits.add(profits.get(itemId));
+ }
+ }
+
+ ArrayList<ArrayList<Long>> reducedWeights = new ArrayList<ArrayList<Long>>();
+ ArrayList<Long> reducedCapacities = new ArrayList<Long>(capacities);
+ for (int dim = 0; dim < numDimensions; dim++) {
+ final ArrayList<Long> oneDimensionWeights = weights.get(dim);
+ ArrayList<Long> oneDimensionReducedWeights = new ArrayList<Long>();
+ for (int itemId = 0; itemId < numItems; itemId++) {
+ if (_knownValue.get(itemId)) {
+ if (_bestSolution.get(itemId)) {
+ reducedCapacities
+ .set(dim, reducedCapacities.get(dim) - oneDimensionWeights.get(itemId));
+ }
+ } else {
+ oneDimensionReducedWeights.add(oneDimensionWeights.get(itemId));
+ }
+ }
+ reducedWeights.add(oneDimensionReducedWeights);
+ }
+ _solver.init(reducedProfits, reducedWeights, reducedCapacities);
+ }
+ }
http://git-wip-us.apache.org/repos/asf/helix/blob/713586c4/helix-core/src/main/java/org/apache/helix/controller/strategy/knapsack/KnapsackState.java
----------------------------------------------------------------------
diff --cc helix-core/src/main/java/org/apache/helix/controller/strategy/knapsack/KnapsackState.java
index 0000000,66713eb..c07ebf7
mode 000000,100644..100644
--- a/helix-core/src/main/java/org/apache/helix/controller/strategy/knapsack/KnapsackState.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/strategy/knapsack/KnapsackState.java
@@@ -1,0 -1,42 +1,61 @@@
+ package org.apache.helix.controller.strategy.knapsack;
+
++/*
++ * Licensed to the Apache Software Foundation (ASF) under one
++ * or more contributor license agreements. See the NOTICE file
++ * distributed with this work for additional information
++ * regarding copyright ownership. The ASF licenses this file
++ * to you under the Apache License, Version 2.0 (the
++ * "License"); you may not use this file except in compliance
++ * with the License. You may obtain a copy of the License at
++ *
++ * http://www.apache.org/licenses/LICENSE-2.0
++ *
++ * Unless required by applicable law or agreed to in writing,
++ * software distributed under the License is distributed on an
++ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
++ * KIND, either express or implied. See the License for the
++ * specific language governing permissions and limitations
++ * under the License.
++ */
++
+ /**
+ * The current state of the knapsack<br/>
+ * <br/>
+ * Based on the C++ knapsack solver in Google's or-tools package.
+ */
+ public interface KnapsackState {
+ /**
+ * Initialize the knapsack with the number of items
+ * @param numberOfItems the number of items
+ */
+ void init(int numberOfItems);
+
+ /**
+ * Update this state with an assignment
+ * @param revert true to revert to the previous state, false otherwise
+ * @param assignment the assignment that was made
+ * @return true on success, false on failure
+ */
+ boolean updateState(boolean revert, final KnapsackAssignment assignment);
+
+ /**
+ * Get the current number of items in the knapsack
+ * @return number of items
+ */
+ int getNumberOfItems();
+
+ /**
+ * Check if an item is currently bound to the knapsack
+ * @param id the item id
+ * @return true if bound, false otherwise
+ */
+ boolean isBound(int id);
+
+ /**
+ * Check if an item is currently in the knapsack
+ * @param id the item id
+ * @return true if inside, false otherwise
+ */
+ boolean isIn(int id);
+ }
http://git-wip-us.apache.org/repos/asf/helix/blob/713586c4/helix-core/src/main/java/org/apache/helix/controller/strategy/knapsack/KnapsackStateImpl.java
----------------------------------------------------------------------
diff --cc helix-core/src/main/java/org/apache/helix/controller/strategy/knapsack/KnapsackStateImpl.java
index 0000000,8e86872..1fcd797
mode 000000,100644..100644
--- a/helix-core/src/main/java/org/apache/helix/controller/strategy/knapsack/KnapsackStateImpl.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/strategy/knapsack/KnapsackStateImpl.java
@@@ -1,0 -1,61 +1,80 @@@
+ package org.apache.helix.controller.strategy.knapsack;
+
++/*
++ * Licensed to the Apache Software Foundation (ASF) under one
++ * or more contributor license agreements. See the NOTICE file
++ * distributed with this work for additional information
++ * regarding copyright ownership. The ASF licenses this file
++ * to you under the Apache License, Version 2.0 (the
++ * "License"); you may not use this file except in compliance
++ * with the License. You may obtain a copy of the License at
++ *
++ * http://www.apache.org/licenses/LICENSE-2.0
++ *
++ * Unless required by applicable law or agreed to in writing,
++ * software distributed under the License is distributed on an
++ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
++ * KIND, either express or implied. See the License for the
++ * specific language governing permissions and limitations
++ * under the License.
++ */
++
+ import java.util.ArrayList;
+
+ /**
+ * Implementation of {@link KnapsackState}<br/>
+ * <br/>
+ * Based on the C++ knapsack solver in Google's or-tools package.
+ */
+ public class KnapsackStateImpl implements KnapsackState {
+ private ArrayList<Boolean> _isBound;
+ private ArrayList<Boolean> _isIn;
+
+ /**
+ * Initialize the knapsack state
+ */
+ public KnapsackStateImpl() {
+ _isBound = new ArrayList<Boolean>();
+ _isIn = new ArrayList<Boolean>();
+ }
+
+ @Override
+ public void init(int numberOfItems) {
+ _isBound.clear();
+ _isIn.clear();
+ for (int i = 0; i < numberOfItems; i++) {
+ _isBound.add(false);
+ _isIn.add(false);
+ }
+ }
+
+ @Override
+ public boolean updateState(boolean revert, KnapsackAssignment assignment) {
+ if (revert) {
+ _isBound.set(assignment.itemId, false);
+ } else {
+ if (_isBound.get(assignment.itemId) && _isIn.get(assignment.itemId) != assignment.isIn) {
+ return false;
+ }
+ _isBound.set(assignment.itemId, true);
+ _isIn.set(assignment.itemId, assignment.isIn);
+ }
+ return true;
+ }
+
+ @Override
+ public int getNumberOfItems() {
+ return _isBound.size();
+ }
+
+ @Override
+ public boolean isBound(int id) {
+ return _isBound.get(id);
+ }
+
+ @Override
+ public boolean isIn(int id) {
+ return _isIn.get(id);
+ }
+
+ }
http://git-wip-us.apache.org/repos/asf/helix/blob/713586c4/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixAdmin.java
----------------------------------------------------------------------
diff --cc helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixAdmin.java
index 6e30074,39ea1c5..6dc5541
--- a/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixAdmin.java
+++ b/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixAdmin.java
@@@ -164,7 -164,7 +164,7 @@@ public class ZKHelixAdmin implements He
PropertyPathConfig.getPath(PropertyType.CONFIGS, clusterName,
ConfigScopeProperty.PARTICIPANT.toString(), instanceName);
if (!_zkClient.exists(instanceConfigPath)) {
-- throw new HelixException("instance" + instanceName + " does not exist in cluster "
++ throw new HelixException("instance " + instanceName + " does not exist in cluster "
+ clusterName);
}
http://git-wip-us.apache.org/repos/asf/helix/blob/713586c4/helix-core/src/main/java/org/apache/helix/manager/zk/ZkHelixParticipant.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/helix/blob/713586c4/helix-core/src/main/java/org/apache/helix/model/ClusterConfiguration.java
----------------------------------------------------------------------
diff --cc helix-core/src/main/java/org/apache/helix/model/ClusterConfiguration.java
index 5e2daa6,63f5776..a7a7088
--- a/helix-core/src/main/java/org/apache/helix/model/ClusterConfiguration.java
+++ b/helix-core/src/main/java/org/apache/helix/model/ClusterConfiguration.java
@@@ -30,11 -19,19 +19,24 @@@ package org.apache.helix.model
* under the License.
*/
++import java.util.Map;
++
+ import org.apache.helix.HelixProperty;
+ import org.apache.helix.ZNRecord;
+ import org.apache.helix.api.config.NamespacedConfig;
+ import org.apache.helix.api.config.UserConfig;
+ import org.apache.helix.api.id.ClusterId;
+ import org.apache.helix.manager.zk.ZKHelixManager;
+ import org.apache.log4j.Logger;
+
++import com.google.common.collect.Maps;
++
/**
* Persisted configuration properties for a cluster
*/
public class ClusterConfiguration extends HelixProperty {
+ private static final String IDEAL_STATE_RULE_PREFIX = "IdealStateRule";
+ private static final Logger LOG = Logger.getLogger(ClusterConfiguration.class);
/**
* Instantiate for an id
http://git-wip-us.apache.org/repos/asf/helix/blob/713586c4/helix-core/src/main/java/org/apache/helix/model/IdealState.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/helix/blob/713586c4/helix-core/src/main/java/org/apache/helix/model/ResourceConfiguration.java
----------------------------------------------------------------------
diff --cc helix-core/src/main/java/org/apache/helix/model/ResourceConfiguration.java
index aae58a8,46d7ed7..452ca65
--- a/helix-core/src/main/java/org/apache/helix/model/ResourceConfiguration.java
+++ b/helix-core/src/main/java/org/apache/helix/model/ResourceConfiguration.java
@@@ -1,17 -1,19 +1,5 @@@
package org.apache.helix.model;
--import org.apache.helix.HelixProperty;
--import org.apache.helix.ZNRecord;
--import org.apache.helix.api.config.NamespacedConfig;
--import org.apache.helix.api.config.ResourceConfig.ResourceType;
--import org.apache.helix.api.config.UserConfig;
--import org.apache.helix.api.id.ResourceId;
- import org.apache.helix.controller.rebalancer.config.RebalancerConfig;
- import org.apache.helix.controller.rebalancer.config.RebalancerConfigHolder;
-
- import com.google.common.base.Enums;
- import com.google.common.base.Optional;
-
-import org.apache.helix.controller.provisioner.ProvisionerConfig;
-import org.apache.helix.controller.rebalancer.config.RebalancerConfig;
-import org.apache.helix.controller.rebalancer.config.RebalancerConfigHolder;
-import org.apache.log4j.Logger;
-
-import com.google.common.base.Enums;
-import com.google.common.base.Optional;
-
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
@@@ -31,6 -33,6 +19,20 @@@
* under the License.
*/
++import org.apache.helix.HelixProperty;
++import org.apache.helix.ZNRecord;
++import org.apache.helix.api.config.NamespacedConfig;
++import org.apache.helix.api.config.ResourceConfig.ResourceType;
++import org.apache.helix.api.config.UserConfig;
++import org.apache.helix.api.id.ResourceId;
++import org.apache.helix.controller.provisioner.ProvisionerConfig;
++import org.apache.helix.controller.rebalancer.config.RebalancerConfig;
++import org.apache.helix.controller.rebalancer.config.RebalancerConfigHolder;
++import org.apache.log4j.Logger;
++
++import com.google.common.base.Enums;
++import com.google.common.base.Optional;
++
/**
* Persisted configuration properties for a resource
*/
@@@ -114,12 -122,12 +122,22 @@@ public class ResourceConfiguration exte
}
/**
+ * Check if this resource config has a rebalancer config
+ * @return true if a rebalancer config is attached, false otherwise
+ */
+ public boolean hasRebalancerConfig() {
+ return _record.getSimpleFields().containsKey(
+ RebalancerConfigHolder.class.getSimpleName() + NamespacedConfig.PREFIX_CHAR
+ + RebalancerConfigHolder.Fields.REBALANCER_CONFIG);
+ }
++
++ /**
+ * Get a ProvisionerConfig, if available
+ * @param clazz the class to cast to
+ * @return ProvisionerConfig, or null
+ */
+ public <T extends ProvisionerConfig> T getProvisionerConfig(Class<T> clazz) {
+ ProvisionerConfigHolder configHolder = new ProvisionerConfigHolder(this);
+ return configHolder.getProvisionerConfig(clazz);
+ }
}
http://git-wip-us.apache.org/repos/asf/helix/blob/713586c4/helix-core/src/test/java/org/apache/helix/integration/TestHelixConnection.java
----------------------------------------------------------------------
diff --cc helix-core/src/test/java/org/apache/helix/integration/TestHelixConnection.java
index 318ab66,9b772f2..d0c9bb8
--- a/helix-core/src/test/java/org/apache/helix/integration/TestHelixConnection.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/TestHelixConnection.java
@@@ -141,8 -141,7 +141,8 @@@ public class TestHelixConnection extend
participant.getStateMachineEngine().registerStateModelFactory(
StateModelDefId.from("MasterSlave"), new MockStateModelFactory());
- participant.startAsync();
+ participant.start();
+ Thread.sleep(1000);
// verify
final HelixDataAccessor accessor = connection.createDataAccessor(clusterId);
http://git-wip-us.apache.org/repos/asf/helix/blob/713586c4/helix-core/src/test/java/org/apache/helix/integration/TestLocalContainerProvider.java
----------------------------------------------------------------------
diff --cc helix-core/src/test/java/org/apache/helix/integration/TestLocalContainerProvider.java
index 0000000,f4153cc..f27ce79
mode 000000,100644..100644
--- a/helix-core/src/test/java/org/apache/helix/integration/TestLocalContainerProvider.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/TestLocalContainerProvider.java
@@@ -1,0 -1,340 +1,346 @@@
+ package org.apache.helix.integration;
+
+ /*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+ import java.util.Collection;
+ import java.util.Date;
+ import java.util.List;
+ import java.util.Map;
++import java.util.concurrent.CountDownLatch;
++import java.util.concurrent.TimeUnit;
+
+ import org.apache.helix.HelixConnection;
+ import org.apache.helix.HelixController;
+ import org.apache.helix.HelixManager;
+ import org.apache.helix.HelixParticipant;
+ import org.apache.helix.TestHelper;
+ import org.apache.helix.ZkUnitTestBase;
+ import org.apache.helix.api.Cluster;
+ import org.apache.helix.api.Participant;
+ import org.apache.helix.api.accessor.ClusterAccessor;
+ import org.apache.helix.api.config.ClusterConfig;
+ import org.apache.helix.api.config.ContainerConfig;
+ import org.apache.helix.api.config.ResourceConfig;
+ import org.apache.helix.api.id.ClusterId;
+ import org.apache.helix.api.id.ControllerId;
+ import org.apache.helix.api.id.ParticipantId;
+ import org.apache.helix.api.id.ResourceId;
+ import org.apache.helix.api.id.StateModelDefId;
+ import org.apache.helix.controller.provisioner.ContainerId;
+ import org.apache.helix.controller.provisioner.ContainerProvider;
+ import org.apache.helix.controller.provisioner.ContainerSpec;
+ import org.apache.helix.controller.provisioner.ContainerState;
+ import org.apache.helix.controller.provisioner.Provisioner;
+ import org.apache.helix.controller.provisioner.ProvisionerConfig;
+ import org.apache.helix.controller.provisioner.ProvisionerRef;
+ import org.apache.helix.controller.provisioner.TargetProvider;
+ import org.apache.helix.controller.provisioner.TargetProviderResponse;
+ import org.apache.helix.controller.rebalancer.config.FullAutoRebalancerConfig;
+ import org.apache.helix.controller.rebalancer.config.RebalancerConfig;
+ import org.apache.helix.controller.serializer.DefaultStringSerializer;
+ import org.apache.helix.controller.serializer.StringSerializer;
+ import org.apache.helix.manager.zk.ZkHelixConnection;
+ import org.apache.helix.model.StateModelDefinition;
+ import org.apache.helix.tools.StateModelConfigGenerator;
+ import org.codehaus.jackson.annotate.JsonProperty;
+ import org.testng.Assert;
+ import org.testng.annotations.Test;
+
+ import com.google.common.collect.Lists;
+ import com.google.common.collect.Maps;
+ import com.google.common.util.concurrent.AbstractService;
+ import com.google.common.util.concurrent.ListenableFuture;
+ import com.google.common.util.concurrent.SettableFuture;
+
+ public class TestLocalContainerProvider extends ZkUnitTestBase {
+ private static final int MAX_PARTICIPANTS = 10;
+ static String clusterName = null;
+ static String resourceName = null;
+ static int allocated = 0;
+ static int started = 0;
+ static int stopped = 0;
+ static int deallocated = 0;
+ static HelixConnection connection = null;
++ static CountDownLatch latch = new CountDownLatch(MAX_PARTICIPANTS);
+
+ @Test
+ public void testBasic() throws Exception {
+ final int NUM_PARTITIONS = 4;
+ final int NUM_REPLICAS = 2;
+ resourceName = "TestDB0";
+
+ String className = TestHelper.getTestClassName();
+ String methodName = TestHelper.getTestMethodName();
+ clusterName = className + "_" + methodName;
+ System.out.println("START " + clusterName + " at " + new Date(System.currentTimeMillis()));
+
+ allocated = 0;
+ started = 0;
+ stopped = 0;
+ deallocated = 0;
+
+ // connect
+ connection = new ZkHelixConnection(ZK_ADDR);
+ connection.connect();
+
+ // create the cluster
+ ClusterId clusterId = ClusterId.from(clusterName);
+ ClusterAccessor clusterAccessor = connection.createClusterAccessor(clusterId);
+ StateModelDefinition masterSlave =
+ new StateModelDefinition(StateModelConfigGenerator.generateConfigForMasterSlave());
+ clusterAccessor.createCluster(new ClusterConfig.Builder(clusterId).addStateModelDefinition(
+ masterSlave).build());
+
+ // add the resource with the local provisioner
+ ResourceId resourceId = ResourceId.from(resourceName);
+ ProvisionerConfig provisionerConfig = new LocalProvisionerConfig(resourceId);
+ RebalancerConfig rebalancerConfig =
+ new FullAutoRebalancerConfig.Builder(resourceId).addPartitions(NUM_PARTITIONS)
+ .replicaCount(NUM_REPLICAS).stateModelDefId(masterSlave.getStateModelDefId()).build();
+ clusterAccessor.addResourceToCluster(new ResourceConfig.Builder(ResourceId.from(resourceName))
+ .provisionerConfig(provisionerConfig).rebalancerConfig(rebalancerConfig).build());
+
+ // start controller
+ ControllerId controllerId = ControllerId.from("controller1");
+ HelixController controller = connection.createController(clusterId, controllerId);
+ controller.start();
+
- Thread.sleep(10000);
++ latch.await(10000, TimeUnit.MILLISECONDS);
+
+ // clean up
+ controller.stop();
+ connection.disconnect();
+
+ Assert.assertEquals(allocated, MAX_PARTICIPANTS);
+ Assert.assertEquals(started, MAX_PARTICIPANTS);
+ Assert.assertEquals(stopped, MAX_PARTICIPANTS);
+ Assert.assertEquals(deallocated, MAX_PARTICIPANTS);
+ }
+
+ /**
+ * Use Guava's service to wrap a participant lifecycle
+ */
+ public static class ParticipantService extends AbstractService {
+ private final ClusterId _clusterId;
+ private final ParticipantId _participantId;
+ private HelixParticipant _participant;
+
+ public ParticipantService(ClusterId clusterId, ParticipantId participantId) {
+ // TODO: probably should pass a connection in here
+ _clusterId = clusterId;
+ _participantId = participantId;
+ }
+
+ @Override
+ protected void doStart() {
+ _participant = connection.createParticipant(_clusterId, _participantId);
+ _participant.getStateMachineEngine().registerStateModelFactory(
+ StateModelDefId.from("MasterSlave"), new TestHelixConnection.MockStateModelFactory());
+ _participant.start();
+ notifyStarted();
+ }
+
+ @Override
+ protected void doStop() {
+ _participant.stop();
+ notifyStopped();
+ }
+
+ }
+
+ /**
+ * Bare-bones ProvisionerConfig
+ */
+ public static class LocalProvisionerConfig implements ProvisionerConfig {
+ private ResourceId _resourceId;
+ private Class<? extends StringSerializer> _serializerClass;
+ private ProvisionerRef _provisionerRef;
+
+ public LocalProvisionerConfig(@JsonProperty("resourceId") ResourceId resourceId) {
+ _resourceId = resourceId;
+ _serializerClass = DefaultStringSerializer.class;
+ _provisionerRef = ProvisionerRef.from(LocalProvisioner.class.getName());
+ }
+
+ @Override
+ public ResourceId getResourceId() {
+ return _resourceId;
+ }
+
+ @Override
+ public ProvisionerRef getProvisionerRef() {
+ return _provisionerRef;
+ }
+
+ public void setProvisionerRef(ProvisionerRef provisionerRef) {
+ _provisionerRef = provisionerRef;
+ }
+
+ @Override
+ public Class<? extends StringSerializer> getSerializerClass() {
+ return _serializerClass;
+ }
+
+ public void setSerializerClass(Class<? extends StringSerializer> serializerClass) {
+ _serializerClass = serializerClass;
+ }
+ }
+
+ /**
+ * Provisioner that will start and stop participants locally
+ */
+ public static class LocalProvisioner implements Provisioner, TargetProvider, ContainerProvider {
+ private HelixManager _helixManager;
+ private ClusterId _clusterId;
+ private int _askCount;
+ private Map<ContainerId, ContainerState> _states;
+ private Map<ContainerId, ParticipantId> _containerParticipants;
+ private Map<ContainerId, ParticipantService> _participants;
+
+ @Override
+ public void init(HelixManager helixManager, ResourceConfig resourceConfig) {
+ // TODO: would be nice to have a HelixConnection instead of a HelixManager
+ _helixManager = helixManager;
+ _clusterId = ClusterId.from(_helixManager.getClusterName());
+ _askCount = 0;
+ _states = Maps.newHashMap();
+ _containerParticipants = Maps.newHashMap();
+ _participants = Maps.newHashMap();
+ }
+
+ @Override
+ public ListenableFuture<ContainerId> allocateContainer(ContainerSpec spec) {
+ // allocation is a no-op
- ContainerId containerId = spec.getContainerId();
++ ContainerId containerId = ContainerId.from(spec.getParticipantId().toString());
+ _states.put(containerId, ContainerState.ACQUIRED);
++ _containerParticipants.put(containerId, spec.getParticipantId());
+ allocated++;
+ SettableFuture<ContainerId> future = SettableFuture.create();
+ future.set(containerId);
+ return future;
+ }
+
+ @Override
+ public ListenableFuture<Boolean> deallocateContainer(ContainerId containerId) {
+ // deallocation is a no-op
+ _states.put(containerId, ContainerState.FINALIZED);
+ deallocated++;
++ latch.countDown();
+ SettableFuture<Boolean> future = SettableFuture.create();
+ future.set(true);
+ return future;
+ }
+
+ @Override
+ public ListenableFuture<Boolean> startContainer(ContainerId containerId, Participant participant) {
+ ParticipantService participantService =
+ new ParticipantService(_clusterId, _containerParticipants.get(containerId));
+ participantService.startAsync();
+ participantService.awaitRunning();
+ _participants.put(containerId, participantService);
+ _states.put(containerId, ContainerState.CONNECTED);
+ started++;
+ SettableFuture<Boolean> future = SettableFuture.create();
+ future.set(true);
+ return future;
+ }
+
+ @Override
+ public ListenableFuture<Boolean> stopContainer(ContainerId containerId) {
+ ParticipantService participant = _participants.get(containerId);
+ participant.stopAsync();
+ participant.awaitTerminated();
+ _states.put(containerId, ContainerState.HALTED);
+ stopped++;
+ SettableFuture<Boolean> future = SettableFuture.create();
+ future.set(true);
+ return future;
+ }
+
+ @Override
+ public TargetProviderResponse evaluateExistingContainers(Cluster cluster,
+ ResourceId resourceId, Collection<Participant> participants) {
+ TargetProviderResponse response = new TargetProviderResponse();
+ // ask for two containers at a time
+ List<ContainerSpec> containersToAcquire = Lists.newArrayList();
+ boolean asked = false;
+ if (_askCount < MAX_PARTICIPANTS) {
+ containersToAcquire.add(new ContainerSpec(ParticipantId.from("container" + _askCount)));
- containersToAcquire.add(new ContainerSpec(ParticipantId.from("container" + (_askCount + 1))));
++ containersToAcquire
++ .add(new ContainerSpec(ParticipantId.from("container" + (_askCount + 1))));
+ asked = true;
+ }
+ List<Participant> containersToStart = Lists.newArrayList();
+ List<Participant> containersToStop = Lists.newArrayList();
+ List<Participant> containersToRelease = Lists.newArrayList();
+ int stopCount = 0;
+ for (Participant participant : participants) {
+ ContainerConfig containerConfig = participant.getContainerConfig();
+ if (containerConfig != null && containerConfig.getState() != null) {
+ ContainerState state = containerConfig.getState();
+ switch (state) {
+ case ACQUIRED:
+ // acquired containers are ready to start
+ containersToStart.add(participant);
+ break;
+ case CONNECTED:
+ // stop at most two active at a time, wait for everything to be up first
+ if (stopCount < 2 && _askCount >= MAX_PARTICIPANTS) {
+ containersToStop.add(participant);
+ stopCount++;
+ }
+ break;
+ case HALTED:
+ // halted containers can be released
+ containersToRelease.add(participant);
+ break;
+ default:
+ break;
+ }
+ ContainerId containerId = containerConfig.getId();
+ if (containerId != null) {
+ _containerParticipants.put(containerId, participant.getId());
+ _states.put(containerId, state);
+ }
+ }
+ }
+ // update acquire request count
+ if (asked) {
+ _askCount += 2;
+ }
+ // set the response
+ response.setContainersToAcquire(containersToAcquire);
+ response.setContainersToStart(containersToStart);
+ response.setContainersToStop(containersToStop);
+ response.setContainersToRelease(containersToRelease);
+ return response;
+ }
+
+ @Override
+ public ContainerProvider getContainerProvider() {
+ return this;
+ }
+
+ @Override
+ public TargetProvider getTargetProvider() {
+ return this;
+ }
+ }
+ }
http://git-wip-us.apache.org/repos/asf/helix/blob/713586c4/helix-core/src/test/java/org/apache/helix/integration/TestSharedConnection.java
----------------------------------------------------------------------
diff --cc helix-core/src/test/java/org/apache/helix/integration/TestSharedConnection.java
index bf89cdb,0000000..0aef00e
mode 100644,000000..100644
--- a/helix-core/src/test/java/org/apache/helix/integration/TestSharedConnection.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/TestSharedConnection.java
@@@ -1,199 -1,0 +1,199 @@@
+package org.apache.helix.integration;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.util.Date;
+import java.util.Map;
+
+import org.apache.helix.HelixAdmin;
+import org.apache.helix.HelixConnection;
+import org.apache.helix.HelixController;
+import org.apache.helix.HelixParticipant;
+import org.apache.helix.TestHelper;
+import org.apache.helix.ZkUnitTestBase;
+import org.apache.helix.api.State;
+import org.apache.helix.api.id.ClusterId;
+import org.apache.helix.api.id.ControllerId;
+import org.apache.helix.api.id.ParticipantId;
+import org.apache.helix.api.id.PartitionId;
+import org.apache.helix.api.id.StateModelDefId;
+import org.apache.helix.manager.zk.HelixConnectionAdaptor;
+import org.apache.helix.manager.zk.ZkHelixConnection;
+import org.apache.helix.manager.zk.ZkHelixLeaderElection;
+import org.apache.helix.model.IdealState;
+import org.apache.helix.model.IdealState.RebalanceMode;
+import org.apache.helix.tools.ClusterStateVerifier;
+import org.apache.helix.tools.ClusterStateVerifier.BestPossAndExtViewZkVerifier;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+/**
+ * Ensure that the external view is able to update properly when participants share a connection.
+ */
+public class TestSharedConnection extends ZkUnitTestBase {
+ /**
+ * Ensure that the external view is able to update properly when participants share a connection.
+ */
+ @Test
+ public void testSharedParticipantConnection() throws Exception {
+ final int NUM_PARTICIPANTS = 2;
+ final int NUM_PARTITIONS = 4;
+ final int NUM_REPLICAS = 2;
+ final String RESOURCE_NAME = "TestDB0";
+
+ String className = TestHelper.getTestClassName();
+ String methodName = TestHelper.getTestMethodName();
+ String clusterName = className + "_" + methodName;
+ System.out.println("START " + clusterName + " at " + new Date(System.currentTimeMillis()));
+
+ // Set up cluster
+ TestHelper.setupCluster(clusterName, ZK_ADDR, 12918, // participant port
+ "localhost", // participant name prefix
+ "TestDB", // resource name prefix
+ 1, // resources
+ NUM_PARTITIONS, // partitions per resource
+ NUM_PARTICIPANTS, // number of nodes
+ NUM_REPLICAS, // replicas
+ "OnlineOffline", RebalanceMode.CUSTOMIZED, true); // do rebalance
+
+ // Connect
+ HelixConnection connection = new ZkHelixConnection(ZK_ADDR);
+ connection.connect();
+
+ // Start some participants
+ HelixParticipant[] participants = new HelixParticipant[NUM_PARTICIPANTS];
+ for (int i = 0; i < NUM_PARTICIPANTS; i++) {
+ participants[i] =
+ connection.createParticipant(ClusterId.from(clusterName),
+ ParticipantId.from("localhost_" + (12918 + i)));
+ participants[i].getStateMachineEngine().registerStateModelFactory(
+ StateModelDefId.from("OnlineOffline"), new TestHelixConnection.MockStateModelFactory());
- participants[i].startAsync();
++ participants[i].start();
+ }
+
+ // Start the controller
+ HelixController controller =
+ connection.createController(ClusterId.from(clusterName), ControllerId.from("controller"));
- controller.startAsync();
++ controller.start();
+ Thread.sleep(500);
+
+ // Verify balanced cluster
+ boolean result =
+ ClusterStateVerifier.verifyByZkCallback(new BestPossAndExtViewZkVerifier(ZK_ADDR,
+ clusterName));
+ Assert.assertTrue(result);
+
+ // Drop a partition from the first participant
+ HelixAdmin admin = connection.createClusterManagementTool();
+ IdealState idealState = admin.getResourceIdealState(clusterName, RESOURCE_NAME);
+ Map<ParticipantId, State> participantStateMap =
+ idealState.getParticipantStateMap(PartitionId.from(RESOURCE_NAME + "_0"));
+ participantStateMap.remove(ParticipantId.from("localhost_12918"));
+ idealState.setParticipantStateMap(PartitionId.from(RESOURCE_NAME + "_0"), participantStateMap);
+ admin.setResourceIdealState(clusterName, RESOURCE_NAME, idealState);
+ Thread.sleep(1000);
+
+ // Verify balanced cluster
+ result =
+ ClusterStateVerifier.verifyByZkCallback(new BestPossAndExtViewZkVerifier(ZK_ADDR,
+ clusterName));
+ Assert.assertTrue(result);
+
+ // Drop a partition from the second participant
+ participantStateMap = idealState.getParticipantStateMap(PartitionId.from(RESOURCE_NAME + "_1"));
+ participantStateMap.remove(ParticipantId.from("localhost_12919"));
+ idealState.setParticipantStateMap(PartitionId.from(RESOURCE_NAME + "_1"), participantStateMap);
+ admin.setResourceIdealState(clusterName, RESOURCE_NAME, idealState);
+ Thread.sleep(1000);
+
+ // Verify balanced cluster
+ result =
+ ClusterStateVerifier.verifyByZkCallback(new BestPossAndExtViewZkVerifier(ZK_ADDR,
+ clusterName));
+ Assert.assertTrue(result);
+
+ // Clean up
- controller.stopAsync();
++ controller.stop();
+ for (HelixParticipant participant : participants) {
- participant.stopAsync();
++ participant.stop();
+ }
+ admin.dropCluster(clusterName);
+ System.out.println("END " + clusterName + " at " + new Date(System.currentTimeMillis()));
+ }
+
+ /**
+ * Ensure that only one controller with a shared connection thinks it's leader
+ */
+ @Test
+ public void testSharedControllerConnection() throws Exception {
+ final int NUM_PARTICIPANTS = 2;
+ final int NUM_PARTITIONS = 4;
+ final int NUM_REPLICAS = 2;
+ final int NUM_CONTROLLERS = 2;
+
+ String className = TestHelper.getTestClassName();
+ String methodName = TestHelper.getTestMethodName();
+ String clusterName = className + "_" + methodName;
+ System.out.println("START " + clusterName + " at " + new Date(System.currentTimeMillis()));
+
+ // Set up cluster
+ TestHelper.setupCluster(clusterName, ZK_ADDR, 12918, // participant port
+ "localhost", // participant name prefix
+ "TestDB", // resource name prefix
+ 1, // resources
+ NUM_PARTITIONS, // partitions per resource
+ NUM_PARTICIPANTS, // number of nodes
+ NUM_REPLICAS, // replicas
+ "OnlineOffline", RebalanceMode.CUSTOMIZED, true); // do rebalance
+
+ // Connect
+ HelixConnection connection = new ZkHelixConnection(ZK_ADDR);
+ connection.connect();
+
+ // Create a couple controllers
+ HelixController[] controllers = new HelixController[NUM_CONTROLLERS];
+ for (int i = 0; i < NUM_CONTROLLERS; i++) {
+ controllers[i] =
+ connection.createController(ClusterId.from(clusterName),
+ ControllerId.from("controller_" + i));
- controllers[i].startAsync();
++ controllers[i].start();
+ }
+ Thread.sleep(1000);
+
+ // Now verify that exactly one is leader
+ int leaderCount = 0;
+ for (HelixController controller : controllers) {
+ HelixConnectionAdaptor adaptor = new HelixConnectionAdaptor(controller);
+ boolean result = ZkHelixLeaderElection.tryUpdateController(adaptor);
+ if (result) {
+ leaderCount++;
+ }
+ }
+ Assert.assertEquals(leaderCount, 1);
+
+ // Clean up
+ for (HelixController controller : controllers) {
- controller.stopAsync();
++ controller.stop();
+ }
+ HelixAdmin admin = connection.createClusterManagementTool();
+ admin.dropCluster(clusterName);
+ System.out.println("END " + clusterName + " at " + new Date(System.currentTimeMillis()));
+ }
+}
http://git-wip-us.apache.org/repos/asf/helix/blob/713586c4/helix-provisioning/README.md
----------------------------------------------------------------------
diff --cc helix-provisioning/README.md
index 0000000,77a4e81..30e160d
mode 000000,100644..100644
--- a/helix-provisioning/README.md
+++ b/helix-provisioning/README.md
@@@ -1,0 -1,16 +1,35 @@@
-Checkout helix provisioning branch
++<!---
++Licensed to the Apache Software Foundation (ASF) under one
++or more contributor license agreements. See the NOTICE file
++distributed with this work for additional information
++regarding copyright ownership. The ASF licenses this file
++to you under the Apache License, Version 2.0 (the
++"License"); you may not use this file except in compliance
++with the License. You may obtain a copy of the License at
++
++ http://www.apache.org/licenses/LICENSE-2.0
++
++Unless required by applicable law or agreed to in writing,
++software distributed under the License is distributed on an
++"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
++KIND, either express or implied. See the License for the
++specific language governing permissions and limitations
++under the License.
++-->
++
++checkout helix provisioning branch
+ cd helix
+ mvn clean package -DskipTests
+ cd helix-provisioning
+
+
-yyDownload and install YARN start all services (datanode, resourcemanage, nodemanager, jobHistoryServer(optional))
++Download and install YARN start all services (datanode, resourcemanage, nodemanager, jobHistoryServer(optional))
+
+ Will post the instructions to get a local YARN cluster.
+
+ target/helix-provisioning-pkg/bin/app-launcher.sh org.apache.helix.provisioning.yarn.example.HelloWordAppSpecFactory /Users/kgopalak/Documents/projects/incubator-helix/helix-provisioning/src/main/resources/hello_world_app_spec.yaml
+
+
+
+
+
http://git-wip-us.apache.org/repos/asf/helix/blob/713586c4/helix-provisioning/src/main/java/org/apache/helix/provisioning/AppConfig.java
----------------------------------------------------------------------
diff --cc helix-provisioning/src/main/java/org/apache/helix/provisioning/AppConfig.java
index 0000000,a51db1c..81a9a2c
mode 000000,100644..100644
--- a/helix-provisioning/src/main/java/org/apache/helix/provisioning/AppConfig.java
+++ b/helix-provisioning/src/main/java/org/apache/helix/provisioning/AppConfig.java
@@@ -1,0 -1,17 +1,35 @@@
+ package org.apache.helix.provisioning;
+
++/*
++ * Licensed to the Apache Software Foundation (ASF) under one
++ * or more contributor license agreements. See the NOTICE file
++ * distributed with this work for additional information
++ * regarding copyright ownership. The ASF licenses this file
++ * to you under the Apache License, Version 2.0 (the
++ * "License"); you may not use this file except in compliance
++ * with the License. You may obtain a copy of the License at
++ *
++ * http://www.apache.org/licenses/LICENSE-2.0
++ *
++ * Unless required by applicable law or agreed to in writing,
++ * software distributed under the License is distributed on an
++ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
++ * KIND, either express or implied. See the License for the
++ * specific language governing permissions and limitations
++ * under the License.
++ */
++
+ import java.util.HashMap;
+ import java.util.Map;
+
-
+ public class AppConfig {
- public Map<String, String> config = new HashMap<String, String>();
-
- public String getValue(String key) {
- return (config != null ? config.get(key) : null);
- }
-
- public void setValue(String key, String value){
- config.put(key, value);
- }
++ public Map<String, String> config = new HashMap<String, String>();
++
++ public String getValue(String key) {
++ return (config != null ? config.get(key) : null);
++ }
++
++ public void setValue(String key, String value) {
++ config.put(key, value);
++ }
+ }
http://git-wip-us.apache.org/repos/asf/helix/blob/713586c4/helix-provisioning/src/main/java/org/apache/helix/provisioning/ApplicationSpec.java
----------------------------------------------------------------------
diff --cc helix-provisioning/src/main/java/org/apache/helix/provisioning/ApplicationSpec.java
index 0000000,f7454d2..e50cfb4
mode 000000,100644..100644
--- a/helix-provisioning/src/main/java/org/apache/helix/provisioning/ApplicationSpec.java
+++ b/helix-provisioning/src/main/java/org/apache/helix/provisioning/ApplicationSpec.java
@@@ -1,0 -1,29 +1,46 @@@
+ package org.apache.helix.provisioning;
+
++/*
++ * Licensed to the Apache Software Foundation (ASF) under one
++ * or more contributor license agreements. See the NOTICE file
++ * distributed with this work for additional information
++ * regarding copyright ownership. The ASF licenses this file
++ * to you under the Apache License, Version 2.0 (the
++ * "License"); you may not use this file except in compliance
++ * with the License. You may obtain a copy of the License at
++ *
++ * http://www.apache.org/licenses/LICENSE-2.0
++ *
++ * Unless required by applicable law or agreed to in writing,
++ * software distributed under the License is distributed on an
++ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
++ * KIND, either express or implied. See the License for the
++ * specific language governing permissions and limitations
++ * under the License.
++ */
++
+ import java.net.URI;
+ import java.util.List;
+
-
-
+ public interface ApplicationSpec {
+ /**
+ * Returns the name of the application
+ * @return
+ */
+ String getAppName();
+
+ AppConfig getConfig();
+
+ List<String> getServices();
+
+ URI getAppMasterPackage();
-
++
+ URI getServicePackage(String serviceName);
-
++
+ String getServiceMainClass(String service);
+
+ ServiceConfig getServiceConfig(String serviceName);
+
+ List<TaskConfig> getTaskConfigs();
+
+ }
http://git-wip-us.apache.org/repos/asf/helix/blob/713586c4/helix-provisioning/src/main/java/org/apache/helix/provisioning/ApplicationSpecFactory.java
----------------------------------------------------------------------
diff --cc helix-provisioning/src/main/java/org/apache/helix/provisioning/ApplicationSpecFactory.java
index 0000000,0c524f2..866e10e
mode 000000,100644..100644
--- a/helix-provisioning/src/main/java/org/apache/helix/provisioning/ApplicationSpecFactory.java
+++ b/helix-provisioning/src/main/java/org/apache/helix/provisioning/ApplicationSpecFactory.java
@@@ -1,0 -1,9 +1,28 @@@
+ package org.apache.helix.provisioning;
+
++/*
++ * Licensed to the Apache Software Foundation (ASF) under one
++ * or more contributor license agreements. See the NOTICE file
++ * distributed with this work for additional information
++ * regarding copyright ownership. The ASF licenses this file
++ * to you under the Apache License, Version 2.0 (the
++ * "License"); you may not use this file except in compliance
++ * with the License. You may obtain a copy of the License at
++ *
++ * http://www.apache.org/licenses/LICENSE-2.0
++ *
++ * Unless required by applicable law or agreed to in writing,
++ * software distributed under the License is distributed on an
++ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
++ * KIND, either express or implied. See the License for the
++ * specific language governing permissions and limitations
++ * under the License.
++ */
++
+ import java.io.InputStream;
+
+ public interface ApplicationSpecFactory {
-
++
+ ApplicationSpec fromYaml(InputStream yamlFile);
+
+ }
http://git-wip-us.apache.org/repos/asf/helix/blob/713586c4/helix-provisioning/src/main/java/org/apache/helix/provisioning/ContainerAskResponse.java
----------------------------------------------------------------------
diff --cc helix-provisioning/src/main/java/org/apache/helix/provisioning/ContainerAskResponse.java
index 0000000,18f66d2..1a2b8aa
mode 000000,100644..100644
--- a/helix-provisioning/src/main/java/org/apache/helix/provisioning/ContainerAskResponse.java
+++ b/helix-provisioning/src/main/java/org/apache/helix/provisioning/ContainerAskResponse.java
@@@ -1,0 -1,17 +1,36 @@@
+ package org.apache.helix.provisioning;
+
++/*
++ * Licensed to the Apache Software Foundation (ASF) under one
++ * or more contributor license agreements. See the NOTICE file
++ * distributed with this work for additional information
++ * regarding copyright ownership. The ASF licenses this file
++ * to you under the Apache License, Version 2.0 (the
++ * "License"); you may not use this file except in compliance
++ * with the License. You may obtain a copy of the License at
++ *
++ * http://www.apache.org/licenses/LICENSE-2.0
++ *
++ * Unless required by applicable law or agreed to in writing,
++ * software distributed under the License is distributed on an
++ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
++ * KIND, either express or implied. See the License for the
++ * specific language governing permissions and limitations
++ * under the License.
++ */
++
+ import org.apache.hadoop.yarn.api.records.Container;
+
+ public class ContainerAskResponse {
-
++
+ Container container;
+
+ public Container getContainer() {
+ return container;
+ }
+
+ public void setContainer(Container container) {
+ this.container = container;
+ }
-
++
+ }
http://git-wip-us.apache.org/repos/asf/helix/blob/713586c4/helix-provisioning/src/main/java/org/apache/helix/provisioning/ContainerLaunchResponse.java
----------------------------------------------------------------------
diff --cc helix-provisioning/src/main/java/org/apache/helix/provisioning/ContainerLaunchResponse.java
index 0000000,ea6ef12..48ae3f1
mode 000000,100644..100644
--- a/helix-provisioning/src/main/java/org/apache/helix/provisioning/ContainerLaunchResponse.java
+++ b/helix-provisioning/src/main/java/org/apache/helix/provisioning/ContainerLaunchResponse.java
@@@ -1,0 -1,5 +1,24 @@@
+ package org.apache.helix.provisioning;
+
++/*
++ * Licensed to the Apache Software Foundation (ASF) under one
++ * or more contributor license agreements. See the NOTICE file
++ * distributed with this work for additional information
++ * regarding copyright ownership. The ASF licenses this file
++ * to you under the Apache License, Version 2.0 (the
++ * "License"); you may not use this file except in compliance
++ * with the License. You may obtain a copy of the License at
++ *
++ * http://www.apache.org/licenses/LICENSE-2.0
++ *
++ * Unless required by applicable law or agreed to in writing,
++ * software distributed under the License is distributed on an
++ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
++ * KIND, either express or implied. See the License for the
++ * specific language governing permissions and limitations
++ * under the License.
++ */
++
+ public class ContainerLaunchResponse {
+
+ }
http://git-wip-us.apache.org/repos/asf/helix/blob/713586c4/helix-provisioning/src/main/java/org/apache/helix/provisioning/ContainerReleaseResponse.java
----------------------------------------------------------------------
diff --cc helix-provisioning/src/main/java/org/apache/helix/provisioning/ContainerReleaseResponse.java
index 0000000,e4a5dc4..dc5289c
mode 000000,100644..100644
--- a/helix-provisioning/src/main/java/org/apache/helix/provisioning/ContainerReleaseResponse.java
+++ b/helix-provisioning/src/main/java/org/apache/helix/provisioning/ContainerReleaseResponse.java
@@@ -1,0 -1,5 +1,24 @@@
+ package org.apache.helix.provisioning;
+
++/*
++ * Licensed to the Apache Software Foundation (ASF) under one
++ * or more contributor license agreements. See the NOTICE file
++ * distributed with this work for additional information
++ * regarding copyright ownership. The ASF licenses this file
++ * to you under the Apache License, Version 2.0 (the
++ * "License"); you may not use this file except in compliance
++ * with the License. You may obtain a copy of the License at
++ *
++ * http://www.apache.org/licenses/LICENSE-2.0
++ *
++ * Unless required by applicable law or agreed to in writing,
++ * software distributed under the License is distributed on an
++ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
++ * KIND, either express or implied. See the License for the
++ * specific language governing permissions and limitations
++ * under the License.
++ */
++
+ public class ContainerReleaseResponse {
+
+ }
http://git-wip-us.apache.org/repos/asf/helix/blob/713586c4/helix-provisioning/src/main/java/org/apache/helix/provisioning/ContainerStopResponse.java
----------------------------------------------------------------------
diff --cc helix-provisioning/src/main/java/org/apache/helix/provisioning/ContainerStopResponse.java
index 0000000,d8c8a46..31f90a4
mode 000000,100644..100644
--- a/helix-provisioning/src/main/java/org/apache/helix/provisioning/ContainerStopResponse.java
+++ b/helix-provisioning/src/main/java/org/apache/helix/provisioning/ContainerStopResponse.java
@@@ -1,0 -1,5 +1,24 @@@
+ package org.apache.helix.provisioning;
+
++/*
++ * Licensed to the Apache Software Foundation (ASF) under one
++ * or more contributor license agreements. See the NOTICE file
++ * distributed with this work for additional information
++ * regarding copyright ownership. The ASF licenses this file
++ * to you under the Apache License, Version 2.0 (the
++ * "License"); you may not use this file except in compliance
++ * with the License. You may obtain a copy of the License at
++ *
++ * http://www.apache.org/licenses/LICENSE-2.0
++ *
++ * Unless required by applicable law or agreed to in writing,
++ * software distributed under the License is distributed on an
++ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
++ * KIND, either express or implied. See the License for the
++ * specific language governing permissions and limitations
++ * under the License.
++ */
++
+ public class ContainerStopResponse {
+
+ }
http://git-wip-us.apache.org/repos/asf/helix/blob/713586c4/helix-provisioning/src/main/java/org/apache/helix/provisioning/HelixYarnUtil.java
----------------------------------------------------------------------
diff --cc helix-provisioning/src/main/java/org/apache/helix/provisioning/HelixYarnUtil.java
index 0000000,80ac16b..dc71f94
mode 000000,100644..100644
--- a/helix-provisioning/src/main/java/org/apache/helix/provisioning/HelixYarnUtil.java
+++ b/helix-provisioning/src/main/java/org/apache/helix/provisioning/HelixYarnUtil.java
@@@ -1,0 -1,42 +1,61 @@@
+ package org.apache.helix.provisioning;
+
++/*
++ * Licensed to the Apache Software Foundation (ASF) under one
++ * or more contributor license agreements. See the NOTICE file
++ * distributed with this work for additional information
++ * regarding copyright ownership. The ASF licenses this file
++ * to you under the Apache License, Version 2.0 (the
++ * "License"); you may not use this file except in compliance
++ * with the License. You may obtain a copy of the License at
++ *
++ * http://www.apache.org/licenses/LICENSE-2.0
++ *
++ * Unless required by applicable law or agreed to in writing,
++ * software distributed under the License is distributed on an
++ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
++ * KIND, either express or implied. See the License for the
++ * specific language governing permissions and limitations
++ * under the License.
++ */
++
+ import org.apache.log4j.Logger;
+
+ public class HelixYarnUtil {
+ private static Logger LOG = Logger.getLogger(HelixYarnUtil.class);
+
+ @SuppressWarnings("unchecked")
+ public static <T extends ApplicationSpecFactory> T createInstance(String className) {
+ Class<ApplicationSpecFactory> factoryClazz = null;
+ {
+ try {
+ factoryClazz =
+ (Class<ApplicationSpecFactory>) Thread.currentThread().getContextClassLoader()
+ .loadClass(className);
+ } catch (ClassNotFoundException e) {
+ try {
+ factoryClazz =
+ (Class<ApplicationSpecFactory>) ClassLoader.getSystemClassLoader().loadClass(
+ className);
+ } catch (ClassNotFoundException e1) {
+ try {
+ factoryClazz = (Class<ApplicationSpecFactory>) Class.forName(className);
+ } catch (ClassNotFoundException e2) {
+
+ }
+ }
+ }
+ }
+ System.out.println(System.getProperty("java.class.path"));
+ if (factoryClazz == null) {
+ LOG.error("Unable to find class:" + className);
+ }
+ ApplicationSpecFactory factory = null;
+ try {
+ factory = factoryClazz.newInstance();
+ } catch (Exception e) {
+ LOG.error("Unable to create instance of class: " + className, e);
+ }
+ return (T) factory;
+ }
+ }
http://git-wip-us.apache.org/repos/asf/helix/blob/713586c4/helix-provisioning/src/main/java/org/apache/helix/provisioning/ParticipantLauncher.java
----------------------------------------------------------------------
diff --cc helix-provisioning/src/main/java/org/apache/helix/provisioning/ParticipantLauncher.java
index 0000000,59a9eb5..6bbe9ad
mode 000000,100644..100644
--- a/helix-provisioning/src/main/java/org/apache/helix/provisioning/ParticipantLauncher.java
+++ b/helix-provisioning/src/main/java/org/apache/helix/provisioning/ParticipantLauncher.java
@@@ -1,0 -1,137 +1,156 @@@
+ package org.apache.helix.provisioning;
+
++/*
++ * Licensed to the Apache Software Foundation (ASF) under one
++ * or more contributor license agreements. See the NOTICE file
++ * distributed with this work for additional information
++ * regarding copyright ownership. The ASF licenses this file
++ * to you under the Apache License, Version 2.0 (the
++ * "License"); you may not use this file except in compliance
++ * with the License. You may obtain a copy of the License at
++ *
++ * http://www.apache.org/licenses/LICENSE-2.0
++ *
++ * Unless required by applicable law or agreed to in writing,
++ * software distributed under the License is distributed on an
++ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
++ * KIND, either express or implied. See the License for the
++ * specific language governing permissions and limitations
++ * under the License.
++ */
++
+ import java.util.Arrays;
+ import java.util.concurrent.TimeUnit;
+
+ import org.apache.commons.cli.CommandLine;
+ import org.apache.commons.cli.GnuParser;
+ import org.apache.commons.cli.Options;
+ import org.apache.helix.HelixConnection;
+ import org.apache.helix.NotificationContext;
+ import org.apache.helix.api.id.ClusterId;
+ import org.apache.helix.api.id.ParticipantId;
+ import org.apache.helix.manager.zk.ZkHelixConnection;
+ import org.apache.helix.messaging.handling.HelixTaskResult;
+ import org.apache.helix.messaging.handling.MessageHandler;
+ import org.apache.helix.messaging.handling.MessageHandlerFactory;
+ import org.apache.helix.model.Message;
+ import org.apache.helix.model.Message.MessageType;
+ import org.apache.helix.participant.AbstractParticipantService;
+ import org.apache.log4j.Logger;
+
+ /**
+ * Main class that invokes the Participant Api
+ */
+ public class ParticipantLauncher {
+ private static Logger LOG = Logger.getLogger(ParticipantLauncher.class);
+
+ public static void main(String[] args) {
+
+ System.out.println("Starting Helix Participant: " + Arrays.toString(args));
+ Options opts;
+ opts = new Options();
+ opts.addOption("cluster", true, "Cluster name, default app name");
+ opts.addOption("participantId", true, "Participant Id");
+ opts.addOption("zkAddress", true, "Zookeeper address");
+ opts.addOption("participantClass", true, "Participant service class");
+ try {
+ CommandLine cliParser = new GnuParser().parse(opts, args);
+ String zkAddress = cliParser.getOptionValue("zkAddress");
+ final HelixConnection connection = new ZkHelixConnection(zkAddress);
+ connection.connect();
+ ClusterId clusterId = ClusterId.from(cliParser.getOptionValue("cluster"));
+ ParticipantId participantId = ParticipantId.from(cliParser.getOptionValue("participantId"));
+ String participantClass = cliParser.getOptionValue("participantClass");
+ @SuppressWarnings("unchecked")
+ Class<? extends AbstractParticipantService> clazz =
+ (Class<? extends AbstractParticipantService>) Class.forName(participantClass);
+ final AbstractParticipantService containerParticipant =
+ clazz.getConstructor(HelixConnection.class, ClusterId.class, ParticipantId.class)
+ .newInstance(connection, clusterId, participantId);
+ containerParticipant.startAsync();
+ containerParticipant.awaitRunning(60, TimeUnit.SECONDS);
+ containerParticipant
+ .getParticipant()
+ .getMessagingService()
+ .registerMessageHandlerFactory(MessageType.SHUTDOWN.toString(),
+ new ShutdownMessageHandlerFactory(containerParticipant, connection));
+ Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() {
+ @Override
+ public void run() {
+ LOG.info("Received a shutdown signal. Stopping participant");
+ containerParticipant.stopAsync();
+ containerParticipant.awaitTerminated();
+ connection.disconnect();
+ }
+ }) {
+
+ });
+ Thread.currentThread().join();
+ } catch (Exception e) {
+ e.printStackTrace();
+ System.out.println("Failed to start Helix participant" + e);
+ // System.exit(1);
+ }
+ try {
+ Thread.currentThread().join();
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ }
+
+ }
+
+ public static class ShutdownMessageHandlerFactory implements MessageHandlerFactory {
+ private final AbstractParticipantService _service;
+ private final HelixConnection _connection;
+
+ public ShutdownMessageHandlerFactory(AbstractParticipantService service,
+ HelixConnection connection) {
+ _service = service;
+ _connection = connection;
+ }
+
+ @Override
+ public MessageHandler createHandler(Message message, NotificationContext context) {
+ return new ShutdownMessageHandler(_service, _connection, message, context);
+ }
+
+ @Override
+ public String getMessageType() {
+ return MessageType.SHUTDOWN.toString();
+ }
+
+ @Override
+ public void reset() {
+ }
+
+ }
+
+ public static class ShutdownMessageHandler extends MessageHandler {
+ private final AbstractParticipantService _service;
+ private final HelixConnection _connection;
+
+ public ShutdownMessageHandler(AbstractParticipantService service, HelixConnection connection,
+ Message message, NotificationContext context) {
+ super(message, context);
+ _service = service;
+ _connection = connection;
+ }
+
+ @Override
+ public HelixTaskResult handleMessage() throws InterruptedException {
+ LOG.info("Received a shutdown message. Trying to shut down.");
+ _service.stopAsync();
+ _service.awaitTerminated();
+ _connection.disconnect();
+ LOG.info("Shutdown complete. Process exiting gracefully");
+ System.exit(0);
+ return null;
+ }
+
+ @Override
+ public void onError(Exception e, ErrorCode code, ErrorType type) {
+ LOG.error("Shutdown message error", e);
+ }
+
+ }
+ }
http://git-wip-us.apache.org/repos/asf/helix/blob/713586c4/helix-provisioning/src/main/java/org/apache/helix/provisioning/ServiceConfig.java
----------------------------------------------------------------------
diff --cc helix-provisioning/src/main/java/org/apache/helix/provisioning/ServiceConfig.java
index 0000000,55ca0ae..f2f8189
mode 000000,100644..100644
--- a/helix-provisioning/src/main/java/org/apache/helix/provisioning/ServiceConfig.java
+++ b/helix-provisioning/src/main/java/org/apache/helix/provisioning/ServiceConfig.java
@@@ -1,0 -1,16 +1,32 @@@
+ package org.apache.helix.provisioning;
+
-import java.util.HashMap;
-import java.util.Map;
++/*
++ * Licensed to the Apache Software Foundation (ASF) under one
++ * or more contributor license agreements. See the NOTICE file
++ * distributed with this work for additional information
++ * regarding copyright ownership. The ASF licenses this file
++ * to you under the Apache License, Version 2.0 (the
++ * "License"); you may not use this file except in compliance
++ * with the License. You may obtain a copy of the License at
++ *
++ * http://www.apache.org/licenses/LICENSE-2.0
++ *
++ * Unless required by applicable law or agreed to in writing,
++ * software distributed under the License is distributed on an
++ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
++ * KIND, either express or implied. See the License for the
++ * specific language governing permissions and limitations
++ * under the License.
++ */
+
+ import org.apache.helix.api.Scope;
+ import org.apache.helix.api.config.UserConfig;
+ import org.apache.helix.api.id.ResourceId;
+
-public class ServiceConfig extends UserConfig{
-
- public ServiceConfig(Scope<ResourceId> scope) {
- super(scope);
++public class ServiceConfig extends UserConfig {
++
++ public ServiceConfig(Scope<ResourceId> scope) {
++ super(scope);
+ }
-
++
+ }
http://git-wip-us.apache.org/repos/asf/helix/blob/713586c4/helix-provisioning/src/main/java/org/apache/helix/provisioning/TaskConfig.java
----------------------------------------------------------------------
diff --cc helix-provisioning/src/main/java/org/apache/helix/provisioning/TaskConfig.java
index 0000000,442d074..3e3b767
mode 000000,100644..100644
--- a/helix-provisioning/src/main/java/org/apache/helix/provisioning/TaskConfig.java
+++ b/helix-provisioning/src/main/java/org/apache/helix/provisioning/TaskConfig.java
@@@ -1,0 -1,29 +1,48 @@@
+ package org.apache.helix.provisioning;
+
++/*
++ * Licensed to the Apache Software Foundation (ASF) under one
++ * or more contributor license agreements. See the NOTICE file
++ * distributed with this work for additional information
++ * regarding copyright ownership. The ASF licenses this file
++ * to you under the Apache License, Version 2.0 (the
++ * "License"); you may not use this file except in compliance
++ * with the License. You may obtain a copy of the License at
++ *
++ * http://www.apache.org/licenses/LICENSE-2.0
++ *
++ * Unless required by applicable law or agreed to in writing,
++ * software distributed under the License is distributed on an
++ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
++ * KIND, either express or implied. See the License for the
++ * specific language governing permissions and limitations
++ * under the License.
++ */
++
+ import java.net.URI;
+ import java.net.URISyntaxException;
+ import java.util.HashMap;
+ import java.util.Map;
+
+ import org.apache.log4j.Logger;
+
+ public class TaskConfig {
+ private static final Logger LOG = Logger.getLogger(TaskConfig.class);
+
+ public Map<String, String> config = new HashMap<String, String>();
+ public String yamlFile;
+ public String name;
+
+ public URI getYamlURI() {
+ try {
+ return yamlFile != null ? new URI(yamlFile) : null;
+ } catch (URISyntaxException e) {
+ LOG.error("Error parsing URI for task config", e);
+ }
+ return null;
+ }
+
+ public String getValue(String key) {
+ return (config != null ? config.get(key) : null);
+ }
+ }