You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2020/08/13 09:38:57 UTC

[GitHub] [flink] XComp commented on a change in pull request #13004: [FLINK-18719][runtime] Define interfaces for new active RMs and implement

XComp commented on a change in pull request #13004:
URL: https://github.com/apache/flink/pull/13004#discussion_r469728536



##########
File path: flink-runtime/src/main/java/org/apache/flink/runtime/util/config/memory/jobmanager/JobManagerFlinkMemory.java
##########
@@ -69,4 +73,23 @@ public MemorySize getJvmDirectMemorySize() {
 	public MemorySize getTotalFlinkMemorySize() {
 		return jvmHeap.add(offHeapMemory);
 	}
+
+	@Override
+	public boolean equals(Object obj) {
+		if (obj == this) {
+			return true;
+		} else if (obj instanceof JobManagerFlinkMemory) {
+			JobManagerFlinkMemory that = (JobManagerFlinkMemory) obj;
+			return Objects.equals(this.jvmHeap, that.jvmHeap) &&
+					Objects.equals(this. offHeapMemory, that.offHeapMemory);

Review comment:
       Here's also a formatting problem having the space between `this.` and `offHeapMemory`. Alternatively, `this.` could be removed entirely to be on par with the `hashCode()` implementation.

##########
File path: flink-runtime/src/main/java/org/apache/flink/runtime/util/config/memory/JvmMetaspaceAndOverhead.java
##########
@@ -49,4 +50,23 @@ public MemorySize getMetaspace() {
 	public MemorySize getOverhead() {
 		return overhead;
 	}
+
+	@Override
+	public boolean equals(Object obj) {
+		if (obj == this) {
+			return true;
+		} else if (obj instanceof JvmMetaspaceAndOverhead ) {
+			JvmMetaspaceAndOverhead that = (JvmMetaspaceAndOverhead) obj;
+			return Objects.equals(this.metaspace, that.metaspace) &&
+					Objects.equals(this. overhead, that.overhead);

Review comment:
       Here as well: space between `this.` and `overhead` shouldn't be there.

##########
File path: flink-runtime/src/main/java/org/apache/flink/runtime/util/config/memory/CommonProcessMemorySpec.java
##########
@@ -94,4 +96,23 @@ public MemorySize getTotalFlinkMemorySize() {
 	public MemorySize getTotalProcessMemorySize() {
 		return flinkMemory.getTotalFlinkMemorySize().add(getJvmMetaspaceSize()).add(getJvmOverheadSize());
 	}
+
+	@Override
+	public boolean equals(Object obj) {
+		if (obj == this) {
+			return true;
+		} else if (obj instanceof CommonProcessMemorySpec<?> ) {
+			CommonProcessMemorySpec<?> that = (CommonProcessMemorySpec<?>) obj;
+			return Objects.equals(this.flinkMemory, that.flinkMemory) &&
+					Objects.equals(this. jvmMetaspaceAndOverhead, that.jvmMetaspaceAndOverhead);

Review comment:
       Here as well: Space between `this.` and `jvmMetaspaceAndOverhead`.

##########
File path: flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
##########
@@ -1064,6 +1076,13 @@ public void handleError(final Exception exception) {
 	 */
 	protected abstract void initialize() throws ResourceManagerException;
 
+	/**
+	 * Terminates the framework specific components.
+	 *
+	 * @throws Exception with occurs during termination.
+	 */
+	protected abstract void terminate() throws Exception;

Review comment:
       Just as a side note: As discussed with @tillrohrmann making terminate being non-blocking would be follow-up feature we might consider later on.

##########
File path: flink-runtime/src/main/java/org/apache/flink/runtime/util/config/memory/taskmanager/TaskExecutorFlinkMemory.java
##########
@@ -128,4 +130,31 @@ public MemorySize getJvmDirectMemorySize() {
 	public MemorySize getTotalFlinkMemorySize() {
 		return frameworkHeap.add(frameworkOffHeap).add(taskHeap).add(taskOffHeap).add(network).add(managed);
 	}
+
+	@Override
+	public boolean equals(Object obj) {
+		if (obj == this) {
+			return true;
+		} else if (obj instanceof TaskExecutorFlinkMemory) {
+			TaskExecutorFlinkMemory that = (TaskExecutorFlinkMemory) obj;
+			return Objects.equals(this.frameworkHeap, that.frameworkHeap) &&
+					Objects.equals(this. frameworkOffHeap, that.frameworkOffHeap) &&
+					Objects.equals(this. taskHeap, that.taskHeap)&&
+					Objects.equals(this. taskOffHeap, that.taskOffHeap)&&
+					Objects.equals(this. network, that.network)&&
+					Objects.equals(this. managed, that.managed);

Review comment:
       Interesting: I would have thought that this is causing a compilation error. But it looks like the compiler can deal with spaces between `this.` and the member. I learned something new. :-) Anyway, the formatting should be fixed here removing the spaces between `this.` and the member variable and adding a space before the `&&` operators.

##########
File path: flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/active/TestingResourceManagerDriver.java
##########
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.resourcemanager.active;
+
+import org.apache.flink.runtime.clusterframework.ApplicationStatus;
+import org.apache.flink.runtime.clusterframework.TaskExecutorProcessSpec;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.util.Preconditions;
+import org.apache.flink.util.function.BiConsumerWithException;
+
+import javax.annotation.Nullable;
+
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executor;
+import java.util.function.Consumer;
+import java.util.function.Function;
+import java.util.function.Supplier;
+
+/**
+ * Testing implementation of {@link ResourceManagerDriver}.
+ */
+public class TestingResourceManagerDriver implements ResourceManagerDriver<ResourceID> {
+
+	private final BiConsumerWithException<ResourceEventHandler<ResourceID>, Executor, Exception> initializeConsumer;
+	private final Supplier<CompletableFuture<Void>> terminateSupplier;
+	private final BiConsumerWithException<ApplicationStatus, String, Exception> deregisterApplicationConsumer;
+	private final Function<TaskExecutorProcessSpec, CompletableFuture<ResourceID>> requestResourceFunction;
+	private final Consumer<ResourceID> releaseResourceConsumer;
+
+	private TestingResourceManagerDriver(
+			final BiConsumerWithException<ResourceEventHandler<ResourceID>, Executor, Exception> initializeConsumer,
+			final Supplier<CompletableFuture<Void>> terminateSupplier,
+			final BiConsumerWithException<ApplicationStatus, String, Exception> deregisterApplicationConsumer,
+			final Function<TaskExecutorProcessSpec, CompletableFuture<ResourceID>> requestResourceFunction,
+			final Consumer<ResourceID> releaseResourceConsumer) {
+		this.initializeConsumer = Preconditions.checkNotNull(initializeConsumer);
+		this.terminateSupplier = Preconditions.checkNotNull(terminateSupplier);
+		this.deregisterApplicationConsumer = Preconditions.checkNotNull(deregisterApplicationConsumer);
+		this.requestResourceFunction = Preconditions.checkNotNull(requestResourceFunction);
+		this.releaseResourceConsumer = Preconditions.checkNotNull(releaseResourceConsumer);
+	}
+
+	@Override
+	public void initialize(ResourceEventHandler<ResourceID> resourceEventHandler, Executor mainThreadExecutor) throws Exception {
+		initializeConsumer.accept(resourceEventHandler, mainThreadExecutor);
+	}
+
+	@Override
+	public CompletableFuture<Void> terminate() {
+		return terminateSupplier.get();
+	}
+
+	@Override
+	public void deregisterApplication(ApplicationStatus finalStatus, @Nullable String optionalDiagnostics) throws Exception {
+		deregisterApplicationConsumer.accept(finalStatus, optionalDiagnostics);
+	}
+
+	@Override
+	public CompletableFuture<ResourceID> requestResource(TaskExecutorProcessSpec taskExecutorProcessSpec) {
+		return requestResourceFunction.apply(taskExecutorProcessSpec);
+	}
+
+	@Override
+	public void releaseResource(ResourceID worker) {
+		releaseResourceConsumer.accept(worker);
+	}
+
+	public static class Builder {
+		private BiConsumerWithException<ResourceEventHandler<ResourceID>, Executor, Exception> initializeConsumer =
+				(ignore1, ignore2) -> {};
+
+		private Supplier<CompletableFuture<Void>> terminateSupplier =
+				() -> CompletableFuture.completedFuture(null);
+
+		private BiConsumerWithException<ApplicationStatus, String, Exception> deregisterApplicationConsumer =
+				(ignore1, ignore2) -> {};
+
+		private Function<TaskExecutorProcessSpec, CompletableFuture<ResourceID>> requestResourceFunction =
+				(ignore) -> CompletableFuture.completedFuture(ResourceID.generate());
+
+		private Consumer<ResourceID> releaseResourceConsumer =
+				(ignore) -> {};
+
+		public Builder setInitializeConsumer(BiConsumerWithException<ResourceEventHandler<ResourceID>, Executor, Exception> initializeConsumer) {

Review comment:
       CheckStyle complains about missing JavaDoc here.

##########
File path: flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/active/ResourceManagerDriver.java
##########
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.resourcemanager.active;
+
+import org.apache.flink.runtime.clusterframework.ApplicationStatus;
+import org.apache.flink.runtime.clusterframework.TaskExecutorProcessSpec;
+import org.apache.flink.runtime.clusterframework.types.ResourceIDRetrievable;
+import org.apache.flink.runtime.resourcemanager.exceptions.ResourceManagerException;
+
+import javax.annotation.Nullable;
+
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executor;
+
+/**
+ * A {@link ResourceManagerDriver} is responsible for requesting and releasing resources from/to a particular external
+ * resource manager.
+ */
+public interface ResourceManagerDriver<WorkerType extends ResourceIDRetrievable> {
+
+	/**
+	 * Initialize the deployment specific components.
+	 *
+	 * @param resourceEventHandler Handler that handles resource events.
+	 * @param mainThreadExecutor Rpc main thread executor.
+	 */
+	void initialize(ResourceEventHandler<WorkerType> resourceEventHandler, Executor mainThreadExecutor) throws Exception;
+
+	/**
+	 * Terminate the deployment specific components.
+	 *
+	 * @return A future that will be completed successfully when the driver is terminated, or exceptionally if cannot be

Review comment:
       ```suggestion
   	 * @return A future that will be completed successfully when the driver is terminated, or exceptionally if it cannot be
   ```




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org