You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2016/08/25 18:49:22 UTC

[79/89] [abbrv] flink git commit: [FLINK-4373] [cluster management] Introduce SlotID, AllocationID, ResourceProfile

[FLINK-4373] [cluster management] Introduce SlotID, AllocationID, ResourceProfile

[FLINK-4373] [cluster management] address comments

This closes #2370.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/baf4a616
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/baf4a616
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/baf4a616

Branch: refs/heads/flip-6
Commit: baf4a616905e4ba15974511abc39993dda307f2b
Parents: 946ea09
Author: Kurt Young <yk...@gmail.com>
Authored: Fri Aug 12 11:05:48 2016 +0800
Committer: Stephan Ewen <se...@apache.org>
Committed: Thu Aug 25 20:21:03 2016 +0200

----------------------------------------------------------------------
 .../clusterframework/types/AllocationID.java    | 32 ++++++++
 .../clusterframework/types/ResourceProfile.java | 68 ++++++++++++++++
 .../runtime/clusterframework/types/SlotID.java  | 83 ++++++++++++++++++++
 .../types/ResourceProfileTest.java              | 49 ++++++++++++
 4 files changed, 232 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/baf4a616/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/types/AllocationID.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/types/AllocationID.java b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/types/AllocationID.java
new file mode 100644
index 0000000..f7ae6ee
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/types/AllocationID.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.clusterframework.types;
+
+import org.apache.flink.util.AbstractID;
+
+/**
+ * Unique identifier for the attempt to allocate a slot, normally created by JobManager when requesting a slot,
+ * constant across re-tries. This can also be used to identify responses by the ResourceManager and to identify
+ * deployment calls towards the TaskManager that was allocated from.
+ */
+public class AllocationID extends AbstractID {
+
+	private static final long serialVersionUID = 1L;
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/baf4a616/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/types/ResourceProfile.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/types/ResourceProfile.java b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/types/ResourceProfile.java
new file mode 100644
index 0000000..cbe709f
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/types/ResourceProfile.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.clusterframework.types;
+
+import java.io.Serializable;
+
+/**
+ * Describe the resource profile of the slot, either when requiring or offering it. The profile can be
+ * checked whether it can match another profile's requirement, and furthermore we may calculate a matching
+ * score to decide which profile we should choose when we have lots of candidate slots.
+ */
+public class ResourceProfile implements Serializable {
+
+	private static final long serialVersionUID = -784900073893060124L;
+
+	/** How many cpu cores are needed, use double so we can specify cpu like 0.1 */
+	private final double cpuCores;
+
+	/** How many memory in mb are needed */
+	private final long memoryInMB;
+
+	public ResourceProfile(double cpuCores, long memoryInMB) {
+		this.cpuCores = cpuCores;
+		this.memoryInMB = memoryInMB;
+	}
+
+	/**
+	 * Get the cpu cores needed
+	 * @return The cpu cores, 1.0 means a full cpu thread
+	 */
+	public double getCpuCores() {
+		return cpuCores;
+	}
+
+	/**
+	 * Get the memory needed in MB
+	 * @return The memory in MB
+	 */
+	public long getMemoryInMB() {
+		return memoryInMB;
+	}
+
+	/**
+	 * Check whether required resource profile can be matched
+	 *
+	 * @param required the required resource profile
+	 * @return true if the requirement is matched, otherwise false
+	 */
+	public boolean isMatching(ResourceProfile required) {
+		return Double.compare(cpuCores, required.getCpuCores()) >= 0 && memoryInMB >= required.getMemoryInMB();
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/baf4a616/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/types/SlotID.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/types/SlotID.java b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/types/SlotID.java
new file mode 100644
index 0000000..d1b072d
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/types/SlotID.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.clusterframework.types;
+
+import java.io.Serializable;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * Unique identifier for a slot which located in TaskManager.
+ */
+public class SlotID implements ResourceIDRetrievable, Serializable {
+
+	private static final long serialVersionUID = -6399206032549807771L;
+
+	/** The resource id which this slot located */
+	private final ResourceID resourceId;
+
+	/** The numeric id for single slot */
+	private final int slotId;
+
+	public SlotID(ResourceID resourceId, int slotId) {
+		this.resourceId = checkNotNull(resourceId, "ResourceID must not be null");
+		this.slotId = slotId;
+	}
+
+	// ------------------------------------------------------------------------
+
+	@Override
+	public ResourceID getResourceID() {
+		return resourceId;
+	}
+
+	// ------------------------------------------------------------------------
+
+	@Override
+	public boolean equals(Object o) {
+		if (this == o) {
+			return true;
+		}
+		if (o == null || getClass() != o.getClass()) {
+			return false;
+		}
+
+		SlotID slotID = (SlotID) o;
+
+		if (slotId != slotID.slotId) {
+			return false;
+		}
+		return resourceId.equals(slotID.resourceId);
+	}
+
+	@Override
+	public int hashCode() {
+		int result = resourceId.hashCode();
+		result = 31 * result + slotId;
+		return result;
+	}
+
+	@Override
+	public String toString() {
+		return "SlotID{" +
+			"resourceId=" + resourceId +
+			", slotId=" + slotId +
+			'}';
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/baf4a616/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/types/ResourceProfileTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/types/ResourceProfileTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/types/ResourceProfileTest.java
new file mode 100644
index 0000000..bc5ddaa
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/types/ResourceProfileTest.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.clusterframework.types;
+
+import org.junit.Test;
+
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+public class ResourceProfileTest {
+
+	@Test
+	public void testMatchRequirement() throws Exception {
+		ResourceProfile rp1 = new ResourceProfile(1.0, 100);
+		ResourceProfile rp2 = new ResourceProfile(1.0, 200);
+		ResourceProfile rp3 = new ResourceProfile(2.0, 100);
+		ResourceProfile rp4 = new ResourceProfile(2.0, 200);
+
+		assertFalse(rp1.isMatching(rp2));
+		assertTrue(rp2.isMatching(rp1));
+
+		assertFalse(rp1.isMatching(rp3));
+		assertTrue(rp3.isMatching(rp1));
+
+		assertFalse(rp2.isMatching(rp3));
+		assertFalse(rp3.isMatching(rp2));
+
+		assertTrue(rp4.isMatching(rp1));
+		assertTrue(rp4.isMatching(rp2));
+		assertTrue(rp4.isMatching(rp3));
+		assertTrue(rp4.isMatching(rp4));
+	}
+}