You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by ji...@apache.org on 2016/12/07 21:10:10 UTC
[10/76] [abbrv] hadoop git commit: YARN-5461. Initial code ported
from slider-core module. (jianhe)
http://git-wip-us.apache.org/repos/asf/hadoop/blob/d8cab88d/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/appmaster/state/AppStateBindingInfo.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/appmaster/state/AppStateBindingInfo.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/appmaster/state/AppStateBindingInfo.java
new file mode 100644
index 0000000..a8aa1a2
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/appmaster/state/AppStateBindingInfo.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.slider.server.appmaster.state;
+
+import com.google.common.base.Preconditions;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.yarn.api.records.Container;
+import org.apache.hadoop.yarn.api.records.NodeReport;
+import org.apache.slider.core.conf.AggregateConf;
+import org.apache.slider.providers.ProviderRole;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Binding information for application states; designed to be extensible
+ * so that tests don't have to be massivley reworked when new arguments
+ * are added.
+ */
+public class AppStateBindingInfo {
+ public AggregateConf instanceDefinition;
+ public Configuration serviceConfig = new Configuration();
+ public Configuration publishedProviderConf = new Configuration(false);
+ public List<ProviderRole> roles = new ArrayList<>();
+ public FileSystem fs;
+ public Path historyPath;
+ public List<Container> liveContainers = new ArrayList<>(0);
+ public Map<String, String> applicationInfo = new HashMap<>();
+ public ContainerReleaseSelector releaseSelector = new SimpleReleaseSelector();
+ /** node reports off the RM. */
+ public List<NodeReport> nodeReports = new ArrayList<>(0);
+
+ public void validate() throws IllegalArgumentException {
+ Preconditions.checkArgument(instanceDefinition != null, "null instanceDefinition");
+ Preconditions.checkArgument(serviceConfig != null, "null appmasterConfig");
+ Preconditions.checkArgument(publishedProviderConf != null, "null publishedProviderConf");
+ Preconditions.checkArgument(releaseSelector != null, "null releaseSelector");
+ Preconditions.checkArgument(roles != null, "null providerRoles");
+ Preconditions.checkArgument(fs != null, "null fs");
+ Preconditions.checkArgument(historyPath != null, "null historyDir");
+ Preconditions.checkArgument(nodeReports != null, "null nodeReports");
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/d8cab88d/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/appmaster/state/ContainerAllocationOutcome.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/appmaster/state/ContainerAllocationOutcome.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/appmaster/state/ContainerAllocationOutcome.java
new file mode 100644
index 0000000..5b3a93c
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/appmaster/state/ContainerAllocationOutcome.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.slider.server.appmaster.state;
+
+/**
+ * Outcome of the assignment
+ */
+public enum ContainerAllocationOutcome {
+ /**
+ * There wasn't a request for this
+ */
+ Unallocated,
+
+ /**
+ * Open placement
+ */
+ Open,
+
+ /**
+ * Allocated explicitly where requested
+ */
+ Placed,
+
+ /**
+ * This was an escalated placement
+ */
+ Escalated
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/d8cab88d/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/appmaster/state/ContainerAllocationResults.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/appmaster/state/ContainerAllocationResults.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/appmaster/state/ContainerAllocationResults.java
new file mode 100644
index 0000000..e80639e
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/appmaster/state/ContainerAllocationResults.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.slider.server.appmaster.state;
+
+import org.apache.slider.server.appmaster.operations.AbstractRMOperation;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * This is just a tuple of the outcome of a container allocation
+ */
+public class ContainerAllocationResults {
+
+ /**
+ * What was the outcome of this allocation: placed, escalated, ...
+ */
+ public ContainerAllocationOutcome outcome;
+
+ /**
+ * The outstanding request which originated this.
+ * This will be null if the outcome is {@link ContainerAllocationOutcome#Unallocated}
+ * as it wasn't expected.
+ */
+ public OutstandingRequest origin;
+
+ /**
+ * A possibly empty list of requests to add to the follow-up actions
+ */
+ public List<AbstractRMOperation> operations = new ArrayList<>(0);
+
+ public ContainerAllocationResults() {
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/d8cab88d/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/appmaster/state/ContainerAssignment.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/appmaster/state/ContainerAssignment.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/appmaster/state/ContainerAssignment.java
new file mode 100644
index 0000000..3e8a3c3
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/appmaster/state/ContainerAssignment.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.slider.server.appmaster.state;
+
+import org.apache.hadoop.yarn.api.records.Container;
+
+/**
+ * Static assignment structure
+ */
+public class ContainerAssignment {
+
+ /**
+ * Container that has been allocated
+ */
+ public final Container container;
+
+ /**
+ * Role to assign to it
+ */
+ public final RoleStatus role;
+
+ /**
+ * Placement outcome: was this from history or not
+ */
+ public final ContainerAllocationOutcome placement;
+
+ public ContainerAssignment(Container container,
+ RoleStatus role,
+ ContainerAllocationOutcome placement) {
+ this.container = container;
+ this.role = role;
+ this.placement = placement;
+ }
+
+ @Override
+ public String toString() {
+ final StringBuilder sb = new StringBuilder("ContainerAssignment{");
+ sb.append("container=").append(container);
+ sb.append(", role=").append(role);
+ sb.append(", placement=").append(placement);
+ sb.append('}');
+ return sb.toString();
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/d8cab88d/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/appmaster/state/ContainerOutcome.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/appmaster/state/ContainerOutcome.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/appmaster/state/ContainerOutcome.java
new file mode 100644
index 0000000..59ab30b
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/appmaster/state/ContainerOutcome.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.slider.server.appmaster.state;
+
+import org.apache.hadoop.yarn.api.records.ContainerExitStatus;
+
+/**
+ * Container outcomes we care about; slightly simplified from
+ * {@link ContainerExitStatus} -and hopefully able to handle
+ * any new exit codes.
+ */
+public enum ContainerOutcome {
+ Completed,
+ Failed,
+ Failed_limits_exceeded,
+ Node_failure,
+ Preempted;
+
+ /**
+ * Build a container outcome from an exit status.
+ * The values in {@link ContainerExitStatus} are used
+ * here.
+ * @param exitStatus exit status
+ * @return an enumeration of the outcome.
+ */
+ public static ContainerOutcome fromExitStatus(int exitStatus) {
+ switch (exitStatus) {
+ case ContainerExitStatus.ABORTED:
+ case ContainerExitStatus.KILLED_BY_APPMASTER:
+ case ContainerExitStatus.KILLED_BY_RESOURCEMANAGER:
+ case ContainerExitStatus.KILLED_AFTER_APP_COMPLETION:
+ // could either be a release or node failure. Treat as completion
+ return Completed;
+ case ContainerExitStatus.DISKS_FAILED:
+ return Node_failure;
+ case ContainerExitStatus.PREEMPTED:
+ return Preempted;
+ case ContainerExitStatus.KILLED_EXCEEDED_PMEM:
+ case ContainerExitStatus.KILLED_EXCEEDED_VMEM:
+ return Failed_limits_exceeded;
+ default:
+ return exitStatus == 0 ? Completed : Failed;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/d8cab88d/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/appmaster/state/ContainerPriority.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/appmaster/state/ContainerPriority.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/appmaster/state/ContainerPriority.java
new file mode 100644
index 0000000..df222fa
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/appmaster/state/ContainerPriority.java
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.slider.server.appmaster.state;
+
+import com.google.common.base.Preconditions;
+import org.apache.hadoop.yarn.api.records.Container;
+import org.apache.hadoop.yarn.api.records.Priority;
+import org.apache.hadoop.yarn.util.Records;
+
+import java.util.Locale;
+
+/**
+ * Class containing the logic to build/split container priorities into the
+ * different fields used by Slider
+ *
+ * The original design here had a requestID merged with the role, to
+ * track outstanding requests. However, this isn't possible, so
+ * the request ID has been dropped. A "location specified" flag was
+ * added to indicate whether or not the request was for a specific location
+ * -though this is currently unused.
+ *
+ * The methods are effectively surplus -but retained to preserve the
+ * option of changing behavior in future
+ */
+public final class ContainerPriority {
+
+ // bit that represents whether location is specified
+ static final int NOLOCATION = 1 << 30;
+
+ public static int buildPriority(int role,
+ boolean locationSpecified) {
+ int location = locationSpecified ? 0 : NOLOCATION;
+ return role | location;
+ }
+
+
+ public static Priority createPriority(int role,
+ boolean locationSpecified) {
+ Priority pri = Records.newRecord(Priority.class);
+ pri.setPriority(ContainerPriority.buildPriority(role,
+ locationSpecified));
+ return pri;
+ }
+
+ public static int extractRole(int priority) {
+ return priority >= NOLOCATION ? priority ^ NOLOCATION : priority;
+ }
+
+ /**
+ * Does the priority have location
+ * @param priority priority index
+ * @return true if the priority has the location marker
+ */
+ public static boolean hasLocation(int priority) {
+ return (priority ^ NOLOCATION ) == 0;
+ }
+
+ /**
+ * Map from a container to a role key by way of its priority
+ * @param container container
+ * @return role key
+ */
+ public static int extractRole(Container container) {
+ Priority priority = container.getPriority();
+ return extractRole(priority);
+ }
+
+ /**
+ * Priority record to role mapper
+ * @param priorityRecord priority record
+ * @return the role #
+ */
+ public static int extractRole(Priority priorityRecord) {
+ Preconditions.checkNotNull(priorityRecord);
+ return extractRole(priorityRecord.getPriority());
+ }
+
+ /**
+ * Convert a priority record to a string, extracting role and locality
+ * @param priorityRecord priority record. May be null
+ * @return a string value
+ */
+ public static String toString(Priority priorityRecord) {
+ if (priorityRecord==null) {
+ return "(null)";
+ } else {
+ return String.format(Locale.ENGLISH,
+ "role %d (locality=%b)",
+ extractRole(priorityRecord),
+ hasLocation(priorityRecord.getPriority()));
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/d8cab88d/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/appmaster/state/ContainerReleaseSelector.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/appmaster/state/ContainerReleaseSelector.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/appmaster/state/ContainerReleaseSelector.java
new file mode 100644
index 0000000..fafbada
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/appmaster/state/ContainerReleaseSelector.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.slider.server.appmaster.state;
+
+import java.util.List;
+
+/**
+ * Interface implemented by anything that must choose containers to release
+ *
+ */
+public interface ContainerReleaseSelector {
+
+ /**
+ * Given a list of candidate containers, return a sorted version of the priority
+ * in which they should be released.
+ * @param candidates candidate list ... everything considered suitable
+ * @return the list of candidates
+ */
+ List<RoleInstance> sortCandidates(int roleId,
+ List<RoleInstance> candidates);
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/d8cab88d/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/appmaster/state/MostRecentContainerReleaseSelector.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/appmaster/state/MostRecentContainerReleaseSelector.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/appmaster/state/MostRecentContainerReleaseSelector.java
new file mode 100644
index 0000000..38c5b8e
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/appmaster/state/MostRecentContainerReleaseSelector.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.slider.server.appmaster.state;
+
+import org.apache.slider.common.tools.Comparators;
+
+import java.io.Serializable;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.List;
+
+/**
+ * Sort the candidate list by the most recent container first.
+ */
+public class MostRecentContainerReleaseSelector implements ContainerReleaseSelector {
+
+ @Override
+ public List<RoleInstance> sortCandidates(int roleId,
+ List<RoleInstance> candidates) {
+ Collections.sort(candidates, new newerThan());
+ return candidates;
+ }
+
+ private static class newerThan implements Comparator<RoleInstance>, Serializable {
+ private final Comparator<Long> innerComparator =
+ new Comparators.ComparatorReverser<>(new Comparators.LongComparator());
+ public int compare(RoleInstance o1, RoleInstance o2) {
+ return innerComparator.compare(o1.createTime, o2.createTime);
+
+ }
+
+ }
+
+
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/d8cab88d/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/appmaster/state/NodeEntry.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/appmaster/state/NodeEntry.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/appmaster/state/NodeEntry.java
new file mode 100644
index 0000000..eb8ff03
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/appmaster/state/NodeEntry.java
@@ -0,0 +1,325 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.slider.server.appmaster.state;
+
+import com.google.common.annotations.VisibleForTesting;
+import org.apache.slider.api.types.NodeEntryInformation;
+
+/**
+ * Information about the state of a role on a specific node instance.
+ * No fields are synchronized; sync on the instance to work with it
+ * <p>
+ * The two fields `releasing` and `requested` are used to track the ongoing
+ * state of YARN requests; they do not need to be persisted across stop/start
+ * cycles. They may be relevant across AM restart, but without other data
+ * structures in the AM, not enough to track what the AM was up to before
+ * it was restarted. The strategy will be to ignore unexpected allocation
+ * responses (which may come from pre-restart) requests, while treating
+ * unexpected container release responses as failures.
+ * <p>
+ * The `active` counter is only decremented after a container release response
+ * has been received.
+ * <p>
+ *
+ */
+public class NodeEntry implements Cloneable {
+
+ public final int rolePriority;
+
+ public NodeEntry(int rolePriority) {
+ this.rolePriority = rolePriority;
+ }
+
+ /**
+ * instance explicitly requested on this node: it's OK if an allocation
+ * comes in that has not been (and when that happens, this count should
+ * not drop).
+ */
+ private int requested;
+
+ /** number of starting instances */
+ private int starting;
+
+ /** incrementing counter of instances that failed to start */
+ private int startFailed;
+
+ /** incrementing counter of instances that failed */
+ private int failed;
+
+ /**
+ * Counter of "failed recently" events. These are all failures
+ * which have happened since it was last reset.
+ */
+ private int failedRecently;
+
+ /** incrementing counter of instances that have been pre-empted. */
+ private int preempted;
+
+ /**
+ * Number of live nodes.
+ */
+ private int live;
+
+ /** number of containers being released off this node */
+ private int releasing;
+
+ /** timestamp of last use */
+ private long lastUsed;
+
+ /**
+ * Is the node available for assignments? That is, it is
+ * not running any instances of this type, nor are there
+ * any requests oustanding for it.
+ * @return true if a new request could be issued without taking
+ * the number of instances > 1.
+ */
+ public synchronized boolean isAvailable() {
+ return live + requested + starting - releasing <= 0;
+ }
+
+ /**
+ * Are the anti-affinity constraints held. That is, zero or one
+ * node running or starting
+ * @return true if the constraint holds.
+ */
+ public synchronized boolean isAntiAffinityConstraintHeld() {
+ return (live - releasing + starting) <= 1;
+ }
+
+ /**
+ * return no of active instances -those that could be released as they
+ * are live and not already being released
+ * @return a number, possibly 0
+ */
+ public synchronized int getActive() {
+ return (live - releasing);
+ }
+
+ /**
+ * Return true if the node is not busy, and it
+ * has not been used since the absolute time
+ * @param absoluteTime time
+ * @return true if the node could be cleaned up
+ */
+ public synchronized boolean notUsedSince(long absoluteTime) {
+ return isAvailable() && lastUsed < absoluteTime;
+ }
+
+ public synchronized int getLive() {
+ return live;
+ }
+
+ public int getStarting() {
+ return starting;
+ }
+
+ /**
+ * Set the live value directly -used on AM restart
+ * @param v value
+ */
+ public synchronized void setLive(int v) {
+ live = v;
+ }
+
+ private synchronized void incLive() {
+ ++live;
+ }
+
+ private synchronized void decLive() {
+ live = RoleHistoryUtils.decToFloor(live);
+ }
+
+ public synchronized void onStarting() {
+ ++starting;
+ }
+
+ private void decStarting() {
+ starting = RoleHistoryUtils.decToFloor(starting);
+ }
+
+ public synchronized void onStartCompleted() {
+ decStarting();
+ incLive();
+ }
+
+ /**
+ * start failed -decrement the starting flag.
+ * @return true if the node is now available
+ */
+ public synchronized boolean onStartFailed() {
+ decStarting();
+ ++startFailed;
+ return containerCompleted(false, ContainerOutcome.Failed);
+ }
+
+ /**
+ * no of requests made of this role of this node. If it goes above
+ * 1 there's a problem
+ */
+ public synchronized int getRequested() {
+ return requested;
+ }
+
+ /**
+ * request a node:
+ */
+ public synchronized void request() {
+ ++requested;
+ }
+
+ /**
+ * A request made explicitly to this node has completed
+ */
+ public synchronized void requestCompleted() {
+ requested = RoleHistoryUtils.decToFloor(requested);
+ }
+
+ /**
+ * No of instances in release state
+ */
+ public synchronized int getReleasing() {
+ return releasing;
+ }
+
+ /**
+ * Release an instance -which is no longer marked as active
+ */
+ public synchronized void release() {
+ releasing++;
+ }
+
+ /**
+ * completion event, which can be a planned or unplanned
+ * planned: dec our release count
+ * unplanned: dec our live count
+ * @param wasReleased true if this was planned
+ * @param outcome
+ * @return true if this node is now available
+ */
+ public synchronized boolean containerCompleted(boolean wasReleased, ContainerOutcome outcome) {
+ if (wasReleased) {
+ releasing = RoleHistoryUtils.decToFloor(releasing);
+ } else {
+ // for the node, we use the outcome of the faiure to decide
+ // whether this is potentially "node-related"
+ switch(outcome) {
+ // general "any reason" app failure
+ case Failed:
+ // specific node failure
+ case Node_failure:
+
+ ++failed;
+ ++failedRecently;
+ break;
+
+ case Preempted:
+ preempted++;
+ break;
+
+ // failures which are node-independent
+ case Failed_limits_exceeded:
+ case Completed:
+ default:
+ break;
+ }
+ }
+ decLive();
+ return isAvailable();
+ }
+
+ /**
+ * Time last used.
+ */
+ public synchronized long getLastUsed() {
+ return lastUsed;
+ }
+
+ public synchronized void setLastUsed(long lastUsed) {
+ this.lastUsed = lastUsed;
+ }
+
+ public synchronized int getStartFailed() {
+ return startFailed;
+ }
+
+ public synchronized int getFailed() {
+ return failed;
+ }
+
+ public synchronized int getFailedRecently() {
+ return failedRecently;
+ }
+
+ @VisibleForTesting
+ public synchronized void setFailedRecently(int failedRecently) {
+ this.failedRecently = failedRecently;
+ }
+
+ public synchronized int getPreempted() {
+ return preempted;
+ }
+
+
+ /**
+ * Reset the failed recently count.
+ */
+ public synchronized void resetFailedRecently() {
+ failedRecently = 0;
+ }
+
+ @Override
+ public String toString() {
+ final StringBuilder sb = new StringBuilder("NodeEntry{");
+ sb.append("priority=").append(rolePriority);
+ sb.append(", requested=").append(requested);
+ sb.append(", starting=").append(starting);
+ sb.append(", live=").append(live);
+ sb.append(", releasing=").append(releasing);
+ sb.append(", lastUsed=").append(lastUsed);
+ sb.append(", failedRecently=").append(failedRecently);
+ sb.append(", preempted=").append(preempted);
+ sb.append(", startFailed=").append(startFailed);
+ sb.append('}');
+ return sb.toString();
+ }
+
+ /**
+ * Produced a serialized form which can be served up as JSON
+ * @return a summary of the current role status.
+ */
+ public synchronized NodeEntryInformation serialize() {
+ NodeEntryInformation info = new NodeEntryInformation();
+ info.priority = rolePriority;
+ info.requested = requested;
+ info.releasing = releasing;
+ info.starting = starting;
+ info.startFailed = startFailed;
+ info.failed = failed;
+ info.failedRecently = failedRecently;
+ info.preempted = preempted;
+ info.live = live;
+ info.lastUsed = lastUsed;
+ return info;
+ }
+
+ @Override
+ public Object clone() throws CloneNotSupportedException {
+ return super.clone();
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/d8cab88d/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/appmaster/state/NodeInstance.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/appmaster/state/NodeInstance.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/appmaster/state/NodeInstance.java
new file mode 100644
index 0000000..cc17cf0
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/appmaster/state/NodeInstance.java
@@ -0,0 +1,409 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.slider.server.appmaster.state;
+
+import org.apache.hadoop.yarn.api.records.Container;
+import org.apache.hadoop.yarn.api.records.NodeReport;
+import org.apache.hadoop.yarn.api.records.NodeState;
+import org.apache.slider.api.types.NodeInformation;
+import org.apache.slider.common.tools.Comparators;
+import org.apache.slider.common.tools.SliderUtils;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.List;
+import java.util.ListIterator;
+import java.util.Map;
+
+/**
+ * A node instance -stores information about a node in the cluster.
+ * <p>
+ * Operations on the array/set of roles are synchronized.
+ */
+public class NodeInstance {
+
+ public final String hostname;
+
+ /**
+ * last state of node. Starts off as {@link NodeState#RUNNING},
+ * on the assumption that it is live.
+ */
+ private NodeState nodeState = NodeState.RUNNING;
+
+ /**
+ * Last node report. If null: none
+ */
+ private NodeReport nodeReport = null;
+
+ /**
+ * time of state update
+ */
+ private long nodeStateUpdateTime = 0;
+
+ /**
+ * Node labels.
+ *
+ * IMPORTANT: we assume that there is one label/node, which is the policy
+ * for Hadoop as of November 2015
+ */
+ private String nodeLabels = "";
+
+ /**
+ * An unordered list of node entries of specific roles. There's nothing
+ * indexed so as to support sparser datastructures.
+ */
+ private final List<NodeEntry> nodeEntries;
+
+ /**
+ * Create an instance and the (empty) array of nodes
+ * @param roles role count -the no. of roles
+ */
+ public NodeInstance(String hostname, int roles) {
+ this.hostname = hostname;
+ nodeEntries = new ArrayList<>(roles);
+ }
+
+ /**
+ * Update the node status.
+ * The return code is true if the node state changed enough to
+ * trigger a re-evaluation of pending requests. That is, either a node
+ * became available when it was previously not, or the label changed
+ * on an available node.
+ *
+ * Transitions of a node from live to dead aren't treated as significant,
+ * nor label changes on a dead node.
+ *
+ * @param report latest node report
+ * @return true if the node state changed enough for a request evaluation.
+ */
+ public synchronized boolean updateNode(NodeReport report) {
+ nodeStateUpdateTime = report.getLastHealthReportTime();
+ nodeReport = report;
+ NodeState oldState = nodeState;
+ boolean oldStateUnusable = oldState.isUnusable();
+ nodeState = report.getNodeState();
+ boolean newUsable = !nodeState.isUnusable();
+ boolean nodeNowAvailable = oldStateUnusable && newUsable;
+ String labels = this.nodeLabels;
+ nodeLabels = SliderUtils.extractNodeLabel(report);
+ return nodeNowAvailable
+ || newUsable && !this.nodeLabels.equals(labels);
+ }
+
+ public String getNodeLabels() {
+ return nodeLabels;
+ }
+
+ /**
+ * Get the entry for a role -if present
+ * @param role role index
+ * @return the entry
+ * null if the role is out of range
+ */
+ public synchronized NodeEntry get(int role) {
+ for (NodeEntry nodeEntry : nodeEntries) {
+ if (nodeEntry.rolePriority == role) {
+ return nodeEntry;
+ }
+ }
+ return null;
+ }
+
+ /**
+ * Get the entry for a role -if present
+ * @param role role index
+ * @return the entry
+ * @throws ArrayIndexOutOfBoundsException if the role is out of range
+ */
+ public synchronized NodeEntry getOrCreate(int role) {
+ NodeEntry entry = get(role);
+ if (entry == null) {
+ entry = new NodeEntry(role);
+ nodeEntries.add(entry);
+ }
+ return entry;
+ }
+
+ /**
+ * Get the node entry matching a container on this node
+ * @param container container
+ * @return matching node instance for the role
+ */
+ public NodeEntry getOrCreate(Container container) {
+ return getOrCreate(ContainerPriority.extractRole(container));
+ }
+
+ /**
+ * Count the number of active role instances on this node
+ * @param role role index
+ * @return 0 if there are none, otherwise the #of nodes that are running and
+ * not being released already.
+ */
+ public int getActiveRoleInstances(int role) {
+ NodeEntry nodeEntry = get(role);
+ return (nodeEntry != null ) ? nodeEntry.getActive() : 0;
+ }
+
+ /**
+ * Count the number of live role instances on this node
+ * @param role role index
+ * @return 0 if there are none, otherwise the #of nodes that are running
+ */
+ public int getLiveRoleInstances(int role) {
+ NodeEntry nodeEntry = get(role);
+ return (nodeEntry != null ) ? nodeEntry.getLive() : 0;
+ }
+
+ /**
+ * Is the node considered online
+ * @return the node
+ */
+ public boolean isOnline() {
+ return !nodeState.isUnusable();
+ }
+
+ /**
+ * Query for a node being considered unreliable
+ * @param role role key
+ * @param threshold threshold above which a node is considered unreliable
+ * @return true if the node is considered unreliable
+ */
+ public boolean isConsideredUnreliable(int role, int threshold) {
+ NodeEntry entry = get(role);
+ return entry != null && entry.getFailedRecently() > threshold;
+ }
+
+ /**
+ * Get the entry for a role -and remove it if present
+ * @param role the role index
+ * @return the entry that WAS there
+ */
+ public synchronized NodeEntry remove(int role) {
+ NodeEntry nodeEntry = get(role);
+ if (nodeEntry != null) {
+ nodeEntries.remove(nodeEntry);
+ }
+ return nodeEntry;
+ }
+
+ public synchronized void set(int role, NodeEntry nodeEntry) {
+ remove(role);
+ nodeEntries.add(nodeEntry);
+ }
+
+ /**
+ * run through each entry; gc'ing & removing old ones that don't have
+ * a recent failure count (we care about those)
+ * @param absoluteTime age in millis
+ * @return true if there are still entries left
+ */
+ public synchronized boolean purgeUnusedEntries(long absoluteTime) {
+ boolean active = false;
+ ListIterator<NodeEntry> entries = nodeEntries.listIterator();
+ while (entries.hasNext()) {
+ NodeEntry entry = entries.next();
+ if (entry.notUsedSince(absoluteTime) && entry.getFailedRecently() == 0) {
+ entries.remove();
+ } else {
+ active = true;
+ }
+ }
+ return active;
+ }
+
+
+ /**
+ * run through each entry resetting the failure count
+ */
+ public synchronized void resetFailedRecently() {
+ for (NodeEntry entry : nodeEntries) {
+ entry.resetFailedRecently();
+ }
+ }
+
+ @Override
+ public String toString() {
+ return hostname;
+ }
+
+ /**
+ * Full dump of entry including children
+ * @return a multi-line description fo the node
+ */
+ public String toFullString() {
+ final StringBuilder sb =
+ new StringBuilder(toString());
+ sb.append("{ ");
+ for (NodeEntry entry : nodeEntries) {
+ sb.append(String.format("\n [%02d] ", entry.rolePriority));
+ sb.append(entry.toString());
+ }
+ sb.append("} ");
+ return sb.toString();
+ }
+
+ /**
+ * Equality test is purely on the hostname of the node address
+ * @param o other
+ * @return true if the hostnames are equal
+ */
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ NodeInstance that = (NodeInstance) o;
+ return hostname.equals(that.hostname);
+ }
+
+ @Override
+ public int hashCode() {
+ return hostname.hashCode();
+ }
+
+
+ /**
+ * Predicate to query if the number of recent failures of a role
+ * on this node exceeds that role's failure threshold.
+ * If there is no record of a deployment of that role on this
+ * node, the failure count is taken as "0".
+ * @param role role to look up
+ * @return true if the failure rate is above the threshold.
+ */
+ public boolean exceedsFailureThreshold(RoleStatus role) {
+ NodeEntry entry = get(role.getKey());
+ int numFailuresOnLastHost = entry != null ? entry.getFailedRecently() : 0;
+ int failureThreshold = role.getNodeFailureThreshold();
+ return failureThreshold < 0 || numFailuresOnLastHost > failureThreshold;
+ }
+
+ /**
+ * Produced a serialized form which can be served up as JSON
+ * @param naming map of priority -> value for naming entries
+ * @return a summary of the current role status.
+ */
+ public synchronized NodeInformation serialize(Map<Integer, String> naming) {
+ NodeInformation info = new NodeInformation();
+ info.hostname = hostname;
+ // null-handling state constructor
+ info.state = "" + nodeState;
+ info.lastUpdated = nodeStateUpdateTime;
+ info.labels = nodeLabels;
+ if (nodeReport != null) {
+ info.httpAddress = nodeReport.getHttpAddress();
+ info.rackName = nodeReport.getRackName();
+ info.healthReport = nodeReport.getHealthReport();
+ }
+ info.entries = new HashMap<>(nodeEntries.size());
+ for (NodeEntry nodeEntry : nodeEntries) {
+ String name = naming.get(nodeEntry.rolePriority);
+ if (name == null) {
+ name = Integer.toString(nodeEntry.rolePriority);
+ }
+ info.entries.put(name, nodeEntry.serialize());
+ }
+ return info;
+ }
+
+ /**
+ * Is this node instance a suitable candidate for the specific role?
+ * @param role role ID
+ * @param label label which must match, or "" for no label checks
+ * @return true if the node has space for this role, is running and the labels
+ * match.
+ */
+ public boolean canHost(int role, String label) {
+ return isOnline()
+ && (SliderUtils.isUnset(label) || label.equals(nodeLabels)) // label match
+ && getOrCreate(role).isAvailable(); // no live role
+ }
+
+ /**
+ * A comparator for sorting entries where the node is preferred over another.
+ *
+ * The exact algorithm may change: current policy is "most recent first", so sorted
+ * on the lastUsed
+ *
+ * the comparision is a positive int if left is preferred to right;
+ * negative if right over left, 0 for equal
+ */
+ public static class Preferred implements Comparator<NodeInstance>, Serializable {
+
+ private static final Comparators.InvertedLongComparator comparator =
+ new Comparators.InvertedLongComparator();
+ private final int role;
+
+ public Preferred(int role) {
+ this.role = role;
+ }
+
+ @Override
+ public int compare(NodeInstance o1, NodeInstance o2) {
+ NodeEntry left = o1.get(role);
+ NodeEntry right = o2.get(role);
+ long ageL = left != null ? left.getLastUsed() : -1;
+ long ageR = right != null ? right.getLastUsed() : -1;
+ return comparator.compare(ageL, ageR);
+ }
+ }
+
+ /**
+ * A comparator for sorting entries where the role is newer than
+ * the other.
+ * This sort only compares the lastUsed field, not whether the
+ * node is in use or not
+ */
+ public static class MoreActiveThan implements Comparator<NodeInstance>,
+ Serializable {
+
+ private final int role;
+
+ public MoreActiveThan(int role) {
+ this.role = role;
+ }
+
+ @Override
+ public int compare(NodeInstance left, NodeInstance right) {
+ int activeLeft = left.getActiveRoleInstances(role);
+ int activeRight = right.getActiveRoleInstances(role);
+ return activeRight - activeLeft;
+ }
+ }
+ /**
+ * A comparator for sorting entries alphabetically
+ */
+ public static class CompareNames implements Comparator<NodeInstance>,
+ Serializable {
+
+ public CompareNames() {
+ }
+
+ @Override
+ public int compare(NodeInstance left, NodeInstance right) {
+ return left.hostname.compareTo(right.hostname);
+ }
+ }
+
+
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/d8cab88d/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/appmaster/state/NodeMap.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/appmaster/state/NodeMap.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/appmaster/state/NodeMap.java
new file mode 100644
index 0000000..3858b68
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/appmaster/state/NodeMap.java
@@ -0,0 +1,174 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.slider.server.appmaster.state;
+
+import com.google.common.annotations.VisibleForTesting;
+import org.apache.hadoop.yarn.api.records.NodeReport;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Node map map -and methods to work with it.
+ * Not Synchronized: caller is expected to lock access.
+ */
+public class NodeMap extends HashMap<String, NodeInstance> {
+ protected static final Logger log =
+ LoggerFactory.getLogger(NodeMap.class);
+
+ /**
+ * number of roles
+ */
+ private final int roleSize;
+
+ /**
+ * Construct
+ * @param roleSize number of roles
+ */
+ public NodeMap(int roleSize) {
+ this.roleSize = roleSize;
+ }
+
+ /**
+ * Get the node instance for the specific node -creating it if needed
+ * @param hostname node
+ * @return the instance
+ */
+ public NodeInstance getOrCreate(String hostname) {
+ NodeInstance node = get(hostname);
+ if (node == null) {
+ node = new NodeInstance(hostname, roleSize);
+ put(hostname, node);
+ }
+ return node;
+ }
+
+ /**
+ * List the active nodes
+ * @param role role
+ * @return a possibly empty sorted list of all nodes that are active
+ * in that role
+ */
+ public List<NodeInstance> listActiveNodes(int role) {
+ List<NodeInstance> nodes = new ArrayList<>();
+ for (NodeInstance instance : values()) {
+ if (instance.getActiveRoleInstances(role) > 0) {
+ nodes.add(instance);
+ }
+ }
+ Collections.sort(nodes, new NodeInstance.MoreActiveThan(role));
+ return nodes;
+ }
+
+ /**
+ * reset the failed recently counters
+ */
+ public void resetFailedRecently() {
+ for (Map.Entry<String, NodeInstance> entry : entrySet()) {
+ NodeInstance ni = entry.getValue();
+ ni.resetFailedRecently();
+ }
+ }
+
+ /**
+ * Update the node state. Return true if the node state changed: either by
+ * being created, or by changing its internal state as defined
+ * by {@link NodeInstance#updateNode(NodeReport)}.
+ *
+ * @param hostname host name
+ * @param report latest node report
+ * @return true if the node state changed enough for a request evaluation.
+ */
+ public boolean updateNode(String hostname, NodeReport report) {
+ boolean nodeExisted = get(hostname) != null;
+ boolean updated = getOrCreate(hostname).updateNode(report);
+ return updated || !nodeExisted;
+ }
+
+ /**
+ * Clone point
+ * @return a shallow clone
+ */
+ @Override
+ public Object clone() {
+ return super.clone();
+ }
+
+ /**
+ * Insert a list of nodes into the map; overwrite any with that name
+ * This is a bulk operation for testing.
+ * @param nodes collection of nodes.
+ */
+ @VisibleForTesting
+ public void insert(Collection<NodeInstance> nodes) {
+ for (NodeInstance node : nodes) {
+ put(node.hostname, node);
+ }
+ }
+
+ /**
+ * Test helper: build or update a cluster from a list of node reports
+ * @param reports the list of reports
+ * @return true if this has been considered to have changed the cluster
+ */
+ @VisibleForTesting
+ public boolean buildOrUpdate(List<NodeReport> reports) {
+ boolean updated = false;
+ for (NodeReport report : reports) {
+ updated |= getOrCreate(report.getNodeId().getHost()).updateNode(report);
+ }
+ return updated;
+ }
+
+ /**
+ * Scan the current node map for all nodes capable of hosting an instance
+ * @param role role ID
+ * @param label label which must match, or "" for no label checks
+ * @return a possibly empty list of node instances matching the criteria.
+ */
+ public List<NodeInstance> findAllNodesForRole(int role, String label) {
+ List<NodeInstance> nodes = new ArrayList<>(size());
+ for (NodeInstance instance : values()) {
+ if (instance.canHost(role, label)) {
+ nodes.add(instance);
+ }
+ }
+ Collections.sort(nodes, new NodeInstance.CompareNames());
+ return nodes;
+ }
+
+ @Override
+ public synchronized String toString() {
+ final StringBuilder sb = new StringBuilder("NodeMap{");
+ List<String> keys = new ArrayList<>(keySet());
+ Collections.sort(keys);
+ for (String key : keys) {
+ sb.append(key).append(": ");
+ sb.append(get(key).toFullString()).append("\n");
+ }
+ sb.append('}');
+ return sb.toString();
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/d8cab88d/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/appmaster/state/OutstandingRequest.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/appmaster/state/OutstandingRequest.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/appmaster/state/OutstandingRequest.java
new file mode 100644
index 0000000..4357ef8
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/appmaster/state/OutstandingRequest.java
@@ -0,0 +1,428 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.slider.server.appmaster.state;
+
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import org.apache.hadoop.yarn.api.records.Priority;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.client.api.AMRMClient;
+import org.apache.hadoop.yarn.client.api.InvalidContainerRequestException;
+import org.apache.hadoop.yarn.util.resource.Resources;
+import org.apache.slider.common.tools.SliderUtils;
+import org.apache.slider.server.appmaster.operations.CancelSingleRequest;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * Tracks an outstanding request. This is used to correlate an allocation response
+ * with the node and role used in the request.
+ * <p>
+ * The node identifier may be null -which indicates that a request was made without
+ * a specific target node
+ * <p>
+ * Equality and the hash code are based <i>only</i> on the role and hostname,
+ * which are fixed in the constructor. This means that a simple
+ * instance constructed with (role, hostname) can be used to look up
+ * a complete request instance in the {@link OutstandingRequestTracker} map
+ */
+public final class OutstandingRequest extends RoleHostnamePair {
+ protected static final Logger log =
+ LoggerFactory.getLogger(OutstandingRequest.class);
+
+ /**
+ * Node the request is for -may be null
+ */
+ public final NodeInstance node;
+
+ /**
+ * A list of all possible nodes to list in an AA request. For a non-AA
+ * request where {@link #node} is set, element 0 of the list is the same
+ * value.
+ */
+ public final List<NodeInstance> nodes = new ArrayList<>(1);
+
+ /**
+ * Optional label. This is cached as the request option (explicit-location + label) is forbidden,
+ * yet the label needs to be retained for escalation.
+ */
+ public String label;
+
+ /**
+ * Requested time in millis.
+ * <p>
+ * Only valid after {@link #buildContainerRequest(Resource, RoleStatus, long)}
+ */
+ private AMRMClient.ContainerRequest issuedRequest;
+
+ /**
+ * Requested time in millis.
+ * <p>
+ * Only valid after {@link #buildContainerRequest(Resource, RoleStatus, long)}
+ */
+ private long requestedTimeMillis;
+
+ /**
+ * Time in millis after which escalation should be triggered..
+ * <p>
+ * Only valid after {@link #buildContainerRequest(Resource, RoleStatus, long)}
+ */
+ private long escalationTimeoutMillis;
+
+ /**
+ * Has the placement request been escalated?
+ */
+ private boolean escalated;
+
+ /**
+ * Flag to indicate that escalation is allowed
+ */
+ private boolean mayEscalate;
+
+ /**
+ * Priority of request; only valid after the request is built up
+ */
+ private int priority = -1;
+
+ /**
+ * Is this an Anti-affine request which should be cancelled on
+ * a cluster resize?
+ */
+ private boolean antiAffine = false;
+
+ /**
+ * Create a request
+ * @param roleId role
+ * @param node node -can be null
+ */
+ public OutstandingRequest(int roleId,
+ NodeInstance node) {
+ super(roleId, node != null ? node.hostname : null);
+ this.node = node;
+ nodes.add(node);
+ }
+
+ /**
+ * Create an outstanding request with the given role and hostname
+ * Important: this is useful only for map lookups -the other constructor
+ * with the NodeInstance parameter is needed to generate node-specific
+ * container requests
+ * @param roleId role
+ * @param hostname hostname
+ */
+ public OutstandingRequest(int roleId, String hostname) {
+ super(roleId, hostname);
+ this.node = null;
+ }
+
+ /**
+ * Create an Anti-affine reques, including all listed nodes (there must be one)
+ * as targets.
+ * @param roleId role
+ * @param nodes list of nodes
+ */
+ public OutstandingRequest(int roleId, List<NodeInstance> nodes) {
+ super(roleId, nodes.get(0).hostname);
+ this.node = null;
+ this.antiAffine = true;
+ this.nodes.addAll(nodes);
+ }
+
+ /**
+ * Is the request located in the cluster, that is: does it have a node.
+ * @return true if a node instance was supplied in the constructor
+ */
+ public boolean isLocated() {
+ return node != null;
+ }
+
+ public long getRequestedTimeMillis() {
+ return requestedTimeMillis;
+ }
+
+ public long getEscalationTimeoutMillis() {
+ return escalationTimeoutMillis;
+ }
+
+ public synchronized boolean isEscalated() {
+ return escalated;
+ }
+
+ public boolean mayEscalate() {
+ return mayEscalate;
+ }
+
+ public AMRMClient.ContainerRequest getIssuedRequest() {
+ return issuedRequest;
+ }
+
+ public int getPriority() {
+ return priority;
+ }
+
+ public boolean isAntiAffine() {
+ return antiAffine;
+ }
+
+ public void setAntiAffine(boolean antiAffine) {
+ this.antiAffine = antiAffine;
+ }
+
+ /**
+ * Build a container request.
+ * <p>
+ * The value of {@link #node} is used to direct a lot of policy. If null,
+ * placement is relaxed.
+ * If not null, the choice of whether to use the suggested node
+ * is based on the placement policy and failure history.
+ * <p>
+ * If the request has an address, it is set in the container request
+ * (with a flag to enable relaxed priorities).
+ * <p>
+ * This operation sets the requested time flag, used for tracking timeouts
+ * on outstanding requests
+ * @param resource resource
+ * @param role role
+ * @param time time in millis to record as request time
+ * @return the request to raise
+ */
+ public synchronized AMRMClient.ContainerRequest buildContainerRequest(
+ Resource resource, RoleStatus role, long time) {
+ Preconditions.checkArgument(resource != null, "null `resource` arg");
+ Preconditions.checkArgument(role != null, "null `role` arg");
+
+ // cache label for escalation
+ label = role.getLabelExpression();
+ requestedTimeMillis = time;
+ escalationTimeoutMillis = time + role.getPlacementTimeoutSeconds() * 1000;
+ String[] hosts;
+ boolean relaxLocality;
+ boolean strictPlacement = role.isStrictPlacement();
+ NodeInstance target = this.node;
+ String nodeLabels;
+
+ if (isAntiAffine()) {
+ int size = nodes.size();
+ log.info("Creating anti-affine request across {} nodes; first node = {}",
+ size, hostname);
+ hosts = new String[size];
+ StringBuilder builder = new StringBuilder(size * 16);
+ int c = 0;
+ for (NodeInstance nodeInstance : nodes) {
+ hosts[c++] = nodeInstance.hostname;
+ builder.append(nodeInstance.hostname).append(" ");
+ }
+ log.debug("Full host list: [ {}]", builder);
+ escalated = false;
+ mayEscalate = false;
+ relaxLocality = false;
+ nodeLabels = null;
+ } else if (target != null) {
+ // placed request. Hostname is used in request
+ hosts = new String[1];
+ hosts[0] = target.hostname;
+ // and locality flag is set to false; Slider will decide when
+ // to relax things
+ relaxLocality = false;
+
+ log.info("Submitting request for container on {}", hosts[0]);
+ // enable escalation for all but strict placements.
+ escalated = false;
+ mayEscalate = !strictPlacement;
+ nodeLabels = null;
+ } else {
+ // no hosts
+ hosts = null;
+ // relax locality is mandatory on an unconstrained placement
+ relaxLocality = true;
+ // declare that the the placement is implicitly escalated.
+ escalated = true;
+ // and forbid it happening
+ mayEscalate = false;
+ nodeLabels = label;
+ }
+ Priority pri = ContainerPriority.createPriority(roleId, !relaxLocality);
+ priority = pri.getPriority();
+ issuedRequest = new AMRMClient.ContainerRequest(resource,
+ hosts,
+ null,
+ pri,
+ relaxLocality,
+ nodeLabels);
+ validate();
+ return issuedRequest;
+ }
+
+
+ /**
+ * Build an escalated container request, updating {@link #issuedRequest} with
+ * the new value.
+ * @return the new container request, which has the same resource and label requirements
+ * as the original one, and the same host, but: relaxed placement, and a changed priority
+ * so as to place it into the relaxed list.
+ */
+ public synchronized AMRMClient.ContainerRequest escalate() {
+ Preconditions.checkNotNull(issuedRequest, "cannot escalate if request not issued " + this);
+ log.debug("Escalating {}", this.toString());
+ escalated = true;
+
+ // this is now the priority
+ // it is tagged as unlocated because it needs to go into a different
+ // set of outstanding requests from the strict placements
+ Priority pri = ContainerPriority.createPriority(roleId, false);
+ // update the field
+ priority = pri.getPriority();
+
+ String[] nodes;
+ List<String> issuedRequestNodes = issuedRequest.getNodes();
+ if (SliderUtils.isUnset(label) && issuedRequestNodes != null) {
+ nodes = issuedRequestNodes.toArray(new String[issuedRequestNodes.size()]);
+ } else {
+ nodes = null;
+ }
+
+ issuedRequest = new AMRMClient.ContainerRequest(issuedRequest.getCapability(),
+ nodes,
+ null,
+ pri,
+ true,
+ label);
+ validate();
+ return issuedRequest;
+ }
+
+ /**
+ * Mark the request as completed (or canceled).
+ * <p>
+ * Current action: if a node is defined, its request count is decremented
+ */
+ public void completed() {
+ if (node != null) {
+ node.getOrCreate(roleId).requestCompleted();
+ }
+ }
+
+ /**
+ * Query to see if the request is available and ready to be escalated
+ * @param time time to check against
+ * @return true if escalation should begin
+ */
+ public synchronized boolean shouldEscalate(long time) {
+ return mayEscalate
+ && !escalated
+ && issuedRequest != null
+ && escalationTimeoutMillis < time;
+ }
+
+ /**
+ * Query for the resource requirements matching; always false before a request is issued
+ * @param resource
+ * @return
+ */
+ public synchronized boolean resourceRequirementsMatch(Resource resource) {
+ return issuedRequest != null && Resources.fitsIn(issuedRequest.getCapability(), resource);
+ }
+
+ @Override
+ public String toString() {
+ boolean requestHasLocation = ContainerPriority.hasLocation(getPriority());
+ final StringBuilder sb = new StringBuilder("OutstandingRequest{");
+ sb.append("roleId=").append(roleId);
+ if (hostname != null) {
+ sb.append(", hostname='").append(hostname).append('\'');
+ }
+ sb.append(", node=").append(node);
+ sb.append(", hasLocation=").append(requestHasLocation);
+ sb.append(", label=").append(label);
+ sb.append(", requestedTimeMillis=").append(requestedTimeMillis);
+ sb.append(", mayEscalate=").append(mayEscalate);
+ sb.append(", escalated=").append(escalated);
+ sb.append(", escalationTimeoutMillis=").append(escalationTimeoutMillis);
+ sb.append(", issuedRequest=").append(
+ issuedRequest != null ? SliderUtils.requestToString(issuedRequest) : "(null)");
+ sb.append('}');
+ return sb.toString();
+ }
+
+ /**
+ * Create a cancel operation
+ * @return an operation that can be used to cancel the request
+ */
+ public CancelSingleRequest createCancelOperation() {
+ Preconditions.checkState(issuedRequest != null, "No issued request to cancel");
+ return new CancelSingleRequest(issuedRequest);
+ }
+
+ /**
+ * Valid if a node label expression specified on container request is valid or
+ * not. Mimics the logic in AMRMClientImpl, so can be used for preflight checking
+ * and in mock tests
+ *
+ */
+ public void validate() throws InvalidContainerRequestException {
+ Preconditions.checkNotNull(issuedRequest, "request has not yet been built up");
+ AMRMClient.ContainerRequest containerRequest = issuedRequest;
+ String requestDetails = this.toString();
+ validateContainerRequest(containerRequest, priority, requestDetails);
+ }
+
+ /**
+ * Inner Validation logic for container request
+ * @param containerRequest request
+ * @param priority raw priority of role
+ * @param requestDetails details for error messages
+ */
+ @VisibleForTesting
+ public static void validateContainerRequest(AMRMClient.ContainerRequest containerRequest,
+ int priority, String requestDetails) {
+ String exp = containerRequest.getNodeLabelExpression();
+ boolean hasRacks = containerRequest.getRacks() != null &&
+ (!containerRequest.getRacks().isEmpty());
+ boolean hasNodes = containerRequest.getNodes() != null &&
+ (!containerRequest.getNodes().isEmpty());
+
+ boolean hasLabel = SliderUtils.isSet(exp);
+
+ // Don't support specifying >= 2 node labels in a node label expression now
+ if (hasLabel && (exp.contains("&&") || exp.contains("||"))) {
+ throw new InvalidContainerRequestException(
+ "Cannot specify more than two node labels"
+ + " in a single node label expression: " + requestDetails);
+ }
+
+ // Don't allow specify node label against ANY request listing hosts or racks
+ if (hasLabel && ( hasRacks || hasNodes)) {
+ throw new InvalidContainerRequestException(
+ "Cannot specify node label with rack or node: " + requestDetails);
+ }
+ }
+
+ /**
+ * Create a new role/hostname pair for indexing.
+ * @return a new index.
+ */
+ public RoleHostnamePair getIndex() {
+ return new RoleHostnamePair(roleId, hostname);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/d8cab88d/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/appmaster/state/OutstandingRequestTracker.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/appmaster/state/OutstandingRequestTracker.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/appmaster/state/OutstandingRequestTracker.java
new file mode 100644
index 0000000..c16aa3c
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/appmaster/state/OutstandingRequestTracker.java
@@ -0,0 +1,482 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.slider.server.appmaster.state;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import org.apache.hadoop.yarn.api.records.Container;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.client.api.AMRMClient;
+import org.apache.slider.common.tools.SliderUtils;
+import org.apache.slider.server.appmaster.operations.AbstractRMOperation;
+import org.apache.slider.server.appmaster.operations.CancelSingleRequest;
+import org.apache.slider.server.appmaster.operations.ContainerRequestOperation;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.ListIterator;
+import java.util.Map;
+import java.util.Set;
+
+/**
+ * Tracks outstanding requests made with a specific placement option.
+ * <p>
+ * <ol>
+ * <li>Used to decide when to return a node to 'can request containers here' list</li>
+ * <li>Used to identify requests where placement has timed out, and so issue relaxed requests</li>
+ * </ol>
+ * <p>
+ * If an allocation comes in that is not in the map: either the allocation
+ * was unplaced, or the placed allocation could not be met on the specified
+ * host, and the RM/scheduler fell back to another location.
+ */
+
+public class OutstandingRequestTracker {
+ protected static final Logger log =
+ LoggerFactory.getLogger(OutstandingRequestTracker.class);
+
+ /**
+ * no requests; saves creating a new list if not needed
+ */
+ private final List<AbstractRMOperation> NO_REQUESTS = new ArrayList<>(0);
+
+ private Map<RoleHostnamePair, OutstandingRequest> placedRequests = new HashMap<>();
+
+ /**
+ * List of open requests; no specific details on them.
+ */
+ private List<OutstandingRequest> openRequests = new ArrayList<>();
+
+ /**
+ * Create a new request for the specific role.
+ * <p>
+ * If a location is set, the request is added to {@link #placedRequests}.
+ * If not, it is added to {@link #openRequests}
+ * <p>
+ * This does not update the node instance's role's request count
+ * @param instance node instance to manager
+ * @param role role index
+ * @return a new request
+ */
+ public synchronized OutstandingRequest newRequest(NodeInstance instance, int role) {
+ OutstandingRequest request = new OutstandingRequest(role, instance);
+ if (request.isLocated()) {
+ placedRequests.put(request.getIndex(), request);
+ } else {
+ openRequests.add(request);
+ }
+ return request;
+ }
+
+ /**
+ * Create a new Anti-affine request for the specific role
+ * <p>
+ * It is added to {@link #openRequests}
+ * <p>
+ * This does not update the node instance's role's request count
+ * @param role role index
+ * @param nodes list of suitable nodes
+ * @param label label to use
+ * @return a new request
+ */
+ public synchronized OutstandingRequest newAARequest(int role,
+ List<NodeInstance> nodes,
+ String label) {
+ Preconditions.checkArgument(!nodes.isEmpty());
+ // safety check to verify the allocation will hold
+ for (NodeInstance node : nodes) {
+ Preconditions.checkState(node.canHost(role, label),
+ "Cannot allocate role ID %d to node %s", role, node);
+ }
+ OutstandingRequest request = new OutstandingRequest(role, nodes);
+ openRequests.add(request);
+ return request;
+ }
+
+ /**
+ * Look up any oustanding request to a (role, hostname).
+ * @param role role index
+ * @param hostname hostname
+ * @return the request or null if there was no outstanding one in the {@link #placedRequests}
+ */
+ @VisibleForTesting
+ public synchronized OutstandingRequest lookupPlacedRequest(int role, String hostname) {
+ Preconditions.checkArgument(hostname != null, "null hostname");
+ return placedRequests.get(new RoleHostnamePair(role, hostname));
+ }
+
+ /**
+ * Remove a request
+ * @param request matching request to find
+ * @return the request or null for no match in the {@link #placedRequests}
+ */
+ @VisibleForTesting
+ public synchronized OutstandingRequest removePlacedRequest(OutstandingRequest request) {
+ return placedRequests.remove(request);
+ }
+
+ /**
+ * Notification that a container has been allocated
+ *
+ * <ol>
+ * <li>drop it from the {@link #placedRequests} structure.</li>
+ * <li>generate the cancellation request</li>
+ * <li>for AA placement, any actions needed</li>
+ * </ol>
+ *
+ * @param role role index
+ * @param hostname hostname
+ * @return the allocation outcome
+ */
+ public synchronized ContainerAllocationResults onContainerAllocated(int role,
+ String hostname,
+ Container container) {
+ final String containerDetails = SliderUtils.containerToString(container);
+ log.debug("Processing allocation for role {} on {}", role,
+ containerDetails);
+ ContainerAllocationResults allocation = new ContainerAllocationResults();
+ ContainerAllocationOutcome outcome;
+ OutstandingRequest request = placedRequests.remove(new OutstandingRequest(role, hostname));
+ if (request != null) {
+ //satisfied request
+ log.debug("Found oustanding placed request for container: {}", request);
+ request.completed();
+ // derive outcome from status of tracked request
+ outcome = request.isEscalated()
+ ? ContainerAllocationOutcome.Escalated
+ : ContainerAllocationOutcome.Placed;
+ } else {
+ // not in the list; this is an open placement
+ // scan through all containers in the open request list
+ request = removeOpenRequest(container);
+ if (request != null) {
+ log.debug("Found open outstanding request for container: {}", request);
+ request.completed();
+ outcome = ContainerAllocationOutcome.Open;
+ } else {
+ log.warn("No oustanding request found for container {}, outstanding queue has {} entries ",
+ containerDetails,
+ openRequests.size());
+ outcome = ContainerAllocationOutcome.Unallocated;
+ }
+ }
+ if (request != null && request.getIssuedRequest() != null) {
+ allocation.operations.add(request.createCancelOperation());
+ } else {
+ // there's a request, but no idea what to cancel.
+ // rather than try to recover from it inelegantly, (and cause more confusion),
+ // log the event, but otherwise continue
+ log.warn("Unexpected allocation of container " + SliderUtils.containerToString(container));
+ }
+
+ allocation.origin = request;
+ allocation.outcome = outcome;
+ return allocation;
+ }
+
+ /**
+ * Find and remove an open request. Determine it by scanning open requests
+ * for one whose priority & resource requirements match that of the container
+ * allocated.
+ * @param container container allocated
+ * @return a request which matches the allocation, or null for "no match"
+ */
+ private OutstandingRequest removeOpenRequest(Container container) {
+ int pri = container.getPriority().getPriority();
+ Resource resource = container.getResource();
+ OutstandingRequest request = null;
+ ListIterator<OutstandingRequest> openlist = openRequests.listIterator();
+ while (openlist.hasNext() && request == null) {
+ OutstandingRequest r = openlist.next();
+ if (r.getPriority() == pri) {
+ // matching resource
+ if (r.resourceRequirementsMatch(resource)) {
+ // match of priority and resources
+ request = r;
+ openlist.remove();
+ } else {
+ log.debug("Matched priorities but resources different");
+ }
+ }
+ }
+ return request;
+ }
+
+ /**
+ * Determine which host was a role type most recently used on, so that
+ * if a choice is made of which (potentially surplus) containers to use,
+ * the most recent one is picked first. This operation <i>does not</i>
+ * change the role history, though it queries it.
+ */
+ static class newerThan implements Comparator<Container>, Serializable {
+ private RoleHistory rh;
+
+ public newerThan(RoleHistory rh) {
+ this.rh = rh;
+ }
+
+ /**
+ * Get the age of a node hosting container. If it is not known in the history,
+ * return 0.
+ * @param c container
+ * @return age, null if there's no entry for it.
+ */
+ private long getAgeOf(Container c) {
+ long age = 0;
+ NodeInstance node = rh.getExistingNodeInstance(c);
+ int role = ContainerPriority.extractRole(c);
+ if (node != null) {
+ NodeEntry nodeEntry = node.get(role);
+ if (nodeEntry != null) {
+ age = nodeEntry.getLastUsed();
+ }
+ }
+ return age;
+ }
+
+ /**
+ * Comparator: which host is more recent?
+ * @param c1 container 1
+ * @param c2 container 2
+ * @return 1 if c2 older-than c1, 0 if equal; -1 if c1 older-than c2
+ */
+ @Override
+ public int compare(Container c1, Container c2) {
+ int role1 = ContainerPriority.extractRole(c1);
+ int role2 = ContainerPriority.extractRole(c2);
+ if (role1 < role2) return -1;
+ if (role1 > role2) return 1;
+
+ long age = getAgeOf(c1);
+ long age2 = getAgeOf(c2);
+
+ if (age > age2) {
+ return -1;
+ } else if (age < age2) {
+ return 1;
+ }
+ // equal
+ return 0;
+ }
+ }
+
+ /**
+ * Take a list of requests and split them into specific host requests and
+ * generic assignments. This is to give requested hosts priority
+ * in container assignments if more come back than expected
+ * @param rh RoleHistory instance
+ * @param inAllocated the list of allocated containers
+ * @param outPlaceRequested initially empty list of requested locations
+ * @param outUnplaced initially empty list of unrequested hosts
+ */
+ public synchronized void partitionRequests(RoleHistory rh,
+ List<Container> inAllocated,
+ List<Container> outPlaceRequested,
+ List<Container> outUnplaced) {
+ Collections.sort(inAllocated, new newerThan(rh));
+ for (Container container : inAllocated) {
+ int role = ContainerPriority.extractRole(container);
+ String hostname = RoleHistoryUtils.hostnameOf(container);
+ if (placedRequests.containsKey(new OutstandingRequest(role, hostname))) {
+ outPlaceRequested.add(container);
+ } else {
+ outUnplaced.add(container);
+ }
+ }
+ }
+
+
+ /**
+ * Reset list all outstanding requests for a role: return the hostnames
+ * of any canceled requests
+ *
+ * @param role role to cancel
+ * @return possibly empty list of hostnames
+ */
+ public synchronized List<NodeInstance> resetOutstandingRequests(int role) {
+ List<NodeInstance> hosts = new ArrayList<>();
+ Iterator<Map.Entry<RoleHostnamePair, OutstandingRequest>> iterator =
+ placedRequests.entrySet().iterator();
+ while (iterator.hasNext()) {
+ Map.Entry<RoleHostnamePair, OutstandingRequest> next =
+ iterator.next();
+ OutstandingRequest request = next.getValue();
+ if (request.roleId == role) {
+ iterator.remove();
+ request.completed();
+ hosts.add(request.node);
+ }
+ }
+ ListIterator<OutstandingRequest> openlist = openRequests.listIterator();
+ while (openlist.hasNext()) {
+ OutstandingRequest next = openlist.next();
+ if (next.roleId == role) {
+ openlist.remove();
+ }
+ }
+ return hosts;
+ }
+
+ /**
+ * Get a list of outstanding requests. The list is cloned, but the contents
+ * are shared
+ * @return a list of the current outstanding requests
+ */
+ public synchronized List<OutstandingRequest> listPlacedRequests() {
+ return new ArrayList<>(placedRequests.values());
+ }
+
+ /**
+ * Get a list of outstanding requests. The list is cloned, but the contents
+ * are shared
+ * @return a list of the current outstanding requests
+ */
+ public synchronized List<OutstandingRequest> listOpenRequests() {
+ return new ArrayList<>(openRequests);
+ }
+
+ /**
+ * Escalate operation as triggered by external timer.
+ * @return a (usually empty) list of cancel/request operations.
+ */
+ @SuppressWarnings("SynchronizationOnLocalVariableOrMethodParameter")
+ public synchronized List<AbstractRMOperation> escalateOutstandingRequests(long now) {
+ if (placedRequests.isEmpty()) {
+ return NO_REQUESTS;
+ }
+
+ List<AbstractRMOperation> operations = new ArrayList<>();
+ for (OutstandingRequest outstandingRequest : placedRequests.values()) {
+ synchronized (outstandingRequest) {
+ // sync escalation check with operation so that nothing can happen to state
+ // of the request during the escalation
+ if (outstandingRequest.shouldEscalate(now)) {
+
+ // time to escalate
+ CancelSingleRequest cancel = outstandingRequest.createCancelOperation();
+ operations.add(cancel);
+ AMRMClient.ContainerRequest escalated = outstandingRequest.escalate();
+ operations.add(new ContainerRequestOperation(escalated));
+ }
+ }
+
+ }
+ return operations;
+ }
+
+ /**
+ * Cancel all outstanding AA requests from the lists of requests.
+ *
+ * This does not remove them from the role status; they must be reset
+ * by the caller.
+ *
+ */
+ @SuppressWarnings("SynchronizationOnLocalVariableOrMethodParameter")
+ public synchronized List<AbstractRMOperation> cancelOutstandingAARequests() {
+
+ log.debug("Looking for AA request to cancel");
+ List<AbstractRMOperation> operations = new ArrayList<>();
+
+ // first, all placed requests
+ List<RoleHostnamePair> requestsToRemove = new ArrayList<>(placedRequests.size());
+ for (Map.Entry<RoleHostnamePair, OutstandingRequest> entry : placedRequests.entrySet()) {
+ OutstandingRequest outstandingRequest = entry.getValue();
+ synchronized (outstandingRequest) {
+ if (outstandingRequest.isAntiAffine()) {
+ // time to escalate
+ operations.add(outstandingRequest.createCancelOperation());
+ requestsToRemove.add(entry.getKey());
+ }
+ }
+ }
+ for (RoleHostnamePair keys : requestsToRemove) {
+ placedRequests.remove(keys);
+ }
+
+ // second, all open requests
+ ListIterator<OutstandingRequest> orit = openRequests.listIterator();
+ while (orit.hasNext()) {
+ OutstandingRequest outstandingRequest = orit.next();
+ synchronized (outstandingRequest) {
+ if (outstandingRequest.isAntiAffine()) {
+ // time to escalate
+ operations.add(outstandingRequest.createCancelOperation());
+ orit.remove();
+ }
+ }
+ }
+ log.info("Cancelling {} outstanding AA requests", operations.size());
+
+ return operations;
+ }
+
+ /**
+ * Extract a specific number of open requests for a role
+ * @param roleId role Id
+ * @param count count to extract
+ * @return a list of requests which are no longer in the open request list
+ */
+ public synchronized List<OutstandingRequest> extractOpenRequestsForRole(int roleId, int count) {
+ List<OutstandingRequest> results = new ArrayList<>();
+ ListIterator<OutstandingRequest> openlist = openRequests.listIterator();
+ while (openlist.hasNext() && count > 0) {
+ OutstandingRequest openRequest = openlist.next();
+ if (openRequest.roleId == roleId) {
+ results.add(openRequest);
+ openlist.remove();
+ count--;
+ }
+ }
+ return results;
+ }
+
+ /**
+ * Extract a specific number of placed requests for a role
+ * @param roleId role Id
+ * @param count count to extract
+ * @return a list of requests which are no longer in the placed request data structure
+ */
+ public synchronized List<OutstandingRequest> extractPlacedRequestsForRole(int roleId, int count) {
+ List<OutstandingRequest> results = new ArrayList<>();
+ Iterator<Map.Entry<RoleHostnamePair, OutstandingRequest>>
+ iterator = placedRequests.entrySet().iterator();
+ while (iterator.hasNext() && count > 0) {
+ OutstandingRequest request = iterator.next().getValue();
+ if (request.roleId == roleId) {
+ results.add(request);
+ count--;
+ }
+ }
+ // now cull them from the map
+ for (OutstandingRequest result : results) {
+ placedRequests.remove(result);
+ }
+
+ return results;
+ }
+
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org