You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by ka...@apache.org on 2014/07/10 19:04:48 UTC
[05/50] [abbrv] git commit: Made container states more consistent,
changed yarn target provider logic
Made container states more consistent, changed yarn target provider logic
Project: http://git-wip-us.apache.org/repos/asf/helix/repo
Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/57b4b180
Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/57b4b180
Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/57b4b180
Branch: refs/heads/master
Commit: 57b4b180e0c0b7f3ae0c21191af1f72bca61732f
Parents: cb6aa4f
Author: Kanak Biscuitwala <ka...@apache.org>
Authored: Wed Feb 19 18:58:00 2014 -0800
Committer: Kanak Biscuitwala <ka...@apache.org>
Committed: Wed Feb 19 18:58:00 2014 -0800
----------------------------------------------------------------------
.../controller/provisioner/ContainerState.java | 5 +-
.../stages/ContainerProvisioningStage.java | 9 ++-
.../integration/TestLocalContainerProvider.java | 4 +-
.../provisioning/yarn/YarnProvisioner.java | 83 ++++++++++++--------
4 files changed, 61 insertions(+), 40 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/helix/blob/57b4b180/helix-core/src/main/java/org/apache/helix/controller/provisioner/ContainerState.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/provisioner/ContainerState.java b/helix-core/src/main/java/org/apache/helix/controller/provisioner/ContainerState.java
index cf4b736..449f636 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/provisioner/ContainerState.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/provisioner/ContainerState.java
@@ -23,8 +23,9 @@ public enum ContainerState {
ACQUIRING,
ACQUIRED,
CONNECTING,
- ACTIVE,
- TEARDOWN,
+ CONNECTED,
+ DISCONNECTED,
+ HALTING,
HALTED,
FINALIZING,
FINALIZED,
http://git-wip-us.apache.org/repos/asf/helix/blob/57b4b180/helix-core/src/main/java/org/apache/helix/controller/stages/ContainerProvisioningStage.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/ContainerProvisioningStage.java b/helix-core/src/main/java/org/apache/helix/controller/stages/ContainerProvisioningStage.java
index 48166bf..42c8218 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/ContainerProvisioningStage.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/ContainerProvisioningStage.java
@@ -22,7 +22,6 @@ package org.apache.helix.controller.stages;
import java.util.Collection;
import java.util.HashMap;
import java.util.Map;
-import java.util.UUID;
import org.apache.helix.HelixAdmin;
import org.apache.helix.HelixDataAccessor;
@@ -166,12 +165,13 @@ public class ContainerProvisioningStage extends AbstractBaseStage {
accessor.updateProperty(keyBuilder.instanceConfig(participant.getId().toString()),
existingInstance);
// create the helix participant and add it to cluster
- ListenableFuture<Boolean> future = containerProvider.startContainer(containerId, participant);
+ ListenableFuture<Boolean> future =
+ containerProvider.startContainer(containerId, participant);
FutureCallback<Boolean> callback = new FutureCallback<Boolean>() {
@Override
public void onSuccess(Boolean result) {
updateContainerState(helixAdmin, accessor, keyBuilder, cluster, participant.getId(),
- ContainerState.ACTIVE);
+ ContainerState.CONNECTED);
}
@Override
@@ -225,7 +225,7 @@ public class ContainerProvisioningStage extends AbstractBaseStage {
.toString());
final ContainerId containerId = existingInstance.getContainerId();
existingInstance.setInstanceEnabled(false);
- existingInstance.setContainerState(ContainerState.TEARDOWN);
+ existingInstance.setContainerState(ContainerState.HALTING);
accessor.updateProperty(keyBuilder.instanceConfig(participant.getId().toString()),
existingInstance);
// stop the container
@@ -267,6 +267,7 @@ public class ContainerProvisioningStage extends AbstractBaseStage {
InstanceConfig existingInstance =
helixAdmin.getInstanceConfig(cluster.getId().toString(), participantId.toString());
existingInstance.setContainerState(state);
+ existingInstance.setInstanceEnabled(state.equals(ContainerState.CONNECTED));
accessor.updateProperty(keyBuilder.instanceConfig(participantId.toString()), existingInstance);
}
http://git-wip-us.apache.org/repos/asf/helix/blob/57b4b180/helix-core/src/test/java/org/apache/helix/integration/TestLocalContainerProvider.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/TestLocalContainerProvider.java b/helix-core/src/test/java/org/apache/helix/integration/TestLocalContainerProvider.java
index 0e4c803..0f7be64 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/TestLocalContainerProvider.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/TestLocalContainerProvider.java
@@ -250,7 +250,7 @@ public class TestLocalContainerProvider extends ZkUnitTestBase {
participantService.startAsync();
participantService.awaitRunning();
_participants.put(containerId, participantService);
- _states.put(containerId, ContainerState.ACTIVE);
+ _states.put(containerId, ContainerState.CONNECTED);
started++;
SettableFuture<Boolean> future = SettableFuture.create();
future.set(true);
@@ -294,7 +294,7 @@ public class TestLocalContainerProvider extends ZkUnitTestBase {
// acquired containers are ready to start
containersToStart.add(participant);
break;
- case ACTIVE:
+ case CONNECTED:
// stop at most two active at a time, wait for everything to be up first
if (stopCount < 2 && _askCount >= MAX_PARTICIPANTS) {
containersToStop.add(participant);
http://git-wip-us.apache.org/repos/asf/helix/blob/57b4b180/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/YarnProvisioner.java
----------------------------------------------------------------------
diff --git a/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/YarnProvisioner.java b/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/YarnProvisioner.java
index 4fcc219..daac87b 100644
--- a/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/YarnProvisioner.java
+++ b/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/YarnProvisioner.java
@@ -1,10 +1,6 @@
package org.apache.helix.provisioning.yarn;
import java.io.File;
-import java.io.FileInputStream;
-import java.io.IOException;
-import java.io.InputStream;
-import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
@@ -13,22 +9,13 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.Vector;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.Future;
import java.util.concurrent.Executors;
-import org.apache.commons.compress.archivers.ArchiveStreamFactory;
-import org.apache.commons.compress.archivers.tar.TarArchiveEntry;
-import org.apache.commons.compress.archivers.tar.TarArchiveInputStream;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.DataOutputBuffer;
-import org.apache.hadoop.security.Credentials;
-import org.apache.hadoop.security.UserGroupInformation;
-import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.yarn.api.ApplicationConstants;
import org.apache.hadoop.yarn.api.ApplicationConstants.Environment;
import org.apache.hadoop.yarn.api.records.Container;
@@ -55,13 +42,13 @@ import org.apache.helix.controller.provisioner.ContainerProvider;
import org.apache.helix.controller.provisioner.ContainerSpec;
import org.apache.helix.controller.provisioner.ContainerState;
import org.apache.helix.controller.provisioner.Provisioner;
-import org.apache.helix.controller.provisioner.ProvisionerConfig;
import org.apache.helix.controller.provisioner.TargetProvider;
import org.apache.helix.controller.provisioner.TargetProviderResponse;
import org.apache.helix.model.InstanceConfig;
-import com.google.common.collect.Lists;
import com.google.common.base.Function;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ListeningExecutorService;
@@ -274,39 +261,57 @@ public class YarnProvisioner implements Provisioner, TargetProvider, ContainerPr
.getProvisionerConfig();
int targetNumContainers = provisionerConfig.getNumContainers();
+ // Any container that is in a state should be put in this set
Set<ContainerId> existingContainersIdSet = new HashSet<ContainerId>();
-
+
+ // Cache halted containers to determine which to restart and which to release
+ Map<ContainerId, Participant> excessHaltedContainers = Maps.newHashMap();
+
+ // Cache participants to ensure that excess participants are stopped
+ Map<ContainerId, Participant> excessActiveContainers = Maps.newHashMap();
for (Participant participant : participants) {
ContainerConfig containerConfig = participant.getContainerConfig();
if (containerConfig != null && containerConfig.getState() != null) {
ContainerState state = containerConfig.getState();
switch (state) {
+ case ACQUIRING:
+ existingContainersIdSet.add(containerConfig.getId());
+ break;
case ACQUIRED:
// acquired containers are ready to start
+ existingContainersIdSet.add(containerConfig.getId());
containersToStart.add(participant);
break;
- case ACTIVE:
+ case CONNECTING:
existingContainersIdSet.add(containerConfig.getId());
break;
- case HALTED:
- // halted containers can be released
- containersToRelease.add(participant);
+ case CONNECTED:
+ // active containers can be stopped or kept active
+ existingContainersIdSet.add(containerConfig.getId());
+ excessActiveContainers.put(containerConfig.getId(), participant);
break;
- case ACQUIRING:
+ case DISCONNECTED:
+ // disconnected containers must be stopped
+ existingContainersIdSet.add(containerConfig.getId());
+ containersToStop.add(participant);
+ case HALTING:
existingContainersIdSet.add(containerConfig.getId());
break;
- case CONNECTING:
+ case HALTED:
+ // halted containers can be released or restarted
+ existingContainersIdSet.add(containerConfig.getId());
+ excessHaltedContainers.put(containerConfig.getId(), participant);
break;
- case FAILED:
- //remove the failed instance
- _helixManager.getClusterManagmentTool().dropInstance(cluster.getId().toString(), new InstanceConfig(participant.getId()));
+ case FINALIZING:
+ existingContainersIdSet.add(containerConfig.getId());
break;
case FINALIZED:
break;
- case FINALIZING:
- break;
- case TEARDOWN:
+ case FAILED:
+ // remove the failed instance
+ _helixManager.getClusterManagmentTool().dropInstance(cluster.getId().toString(),
+ new InstanceConfig(participant.getId()));
break;
default:
break;
@@ -318,18 +323,32 @@ public class YarnProvisioner implements Provisioner, TargetProvider, ContainerPr
}
}
}
-
+
for (int i = 0; i < targetNumContainers; i++) {
ContainerId containerId = ContainerId.from(resourceId + "_container_" + (i));
- if(!existingContainersIdSet.contains(containerId)){
+ excessActiveContainers.remove(containerId); // don't stop this container if active
+ if (excessHaltedContainers.containsKey(containerId)) {
+ // Halted containers can be restarted if necessary
+ Participant participant = excessHaltedContainers.get(containerId);
+ containersToStart.add(participant);
+ excessHaltedContainers.remove(containerId); // don't release this container
+ } else if (!existingContainersIdSet.contains(containerId)) {
+ // Unallocated containers must be allocated
ContainerSpec containerSpec = new ContainerSpec(containerId);
ParticipantId participantId = ParticipantId.from(containerId.stringify());
- ParticipantConfig participantConfig = applicationSpec.getParticipantConfig(resourceId.stringify(), participantId);
+ ParticipantConfig participantConfig =
+ applicationSpec.getParticipantConfig(resourceId.stringify(), participantId);
containerSpec.setMemory(participantConfig.getUserConfig().getIntField("memory", 1024));
containersToAcquire.add(containerSpec);
}
}
-
+
+ // Add all the containers that should be stopped because they fall outside the target range
+ containersToStop.addAll(excessActiveContainers.values());
+
+ // Add halted containers that should not be restarted
+ containersToRelease.addAll(excessHaltedContainers.values());
+
response.setContainersToAcquire(containersToAcquire);
response.setContainersToStart(containersToStart);
response.setContainersToRelease(containersToRelease);