You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2020/05/30 21:15:13 UTC

[flink] 10/11: [hotfix][coordination] Remove unused class ExecutionJobVertexCoordinatorContext

This is an automated email from the ASF dual-hosted git repository.

sewen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit d046fea0019bfc112ae90b42a31ecfecf4887e16
Author: Stephan Ewen <se...@apache.org>
AuthorDate: Fri May 29 21:46:36 2020 +0200

    [hotfix][coordination] Remove unused class ExecutionJobVertexCoordinatorContext
    
    This class was left over from a prior refactoring.
---
 .../ExecutionJobVertexCoordinatorContext.java      | 85 ----------------------
 1 file changed, 85 deletions(-)

diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertexCoordinatorContext.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertexCoordinatorContext.java
deleted file mode 100644
index c9e99de..0000000
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertexCoordinatorContext.java
+++ /dev/null
@@ -1,85 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.executiongraph;
-
-import org.apache.flink.runtime.jobgraph.OperatorID;
-import org.apache.flink.runtime.messages.Acknowledge;
-import org.apache.flink.runtime.operators.coordination.OperatorCoordinator;
-import org.apache.flink.runtime.operators.coordination.OperatorEvent;
-import org.apache.flink.util.FlinkRuntimeException;
-import org.apache.flink.util.SerializedValue;
-
-import java.io.IOException;
-import java.util.concurrent.CompletableFuture;
-
-/**
- * An implementation of the {@link OperatorCoordinator.Context} that delegates call to an
- * {@link ExecutionJobVertex}.
- */
-final class ExecutionJobVertexCoordinatorContext implements OperatorCoordinator.Context {
-
-	private final OperatorID operatorId;
-
-	private final ExecutionJobVertex jobVertex;
-
-	ExecutionJobVertexCoordinatorContext(OperatorID operatorId, ExecutionJobVertex jobVertex) {
-		this.operatorId = operatorId;
-		this.jobVertex = jobVertex;
-	}
-
-	@Override
-	public OperatorID getOperatorId() {
-		return operatorId;
-	}
-
-	@Override
-	public CompletableFuture<Acknowledge> sendEvent(OperatorEvent evt, int targetSubtask) {
-		final SerializedValue<OperatorEvent> serializedEvent;
-		try {
-			serializedEvent = new SerializedValue<>(evt);
-		}
-		catch (IOException e) {
-			// we do not expect that this exception is handled by the caller, so we make it
-			// unchecked so that it can bubble up
-			throw new FlinkRuntimeException("Cannot serialize operator event", e);
-		}
-
-		return getTaskExecution(targetSubtask).sendOperatorEvent(operatorId, serializedEvent);
-	}
-
-	@Override
-	public void failTask(int subtask, Throwable cause) {
-		final Execution taskExecution = getTaskExecution(subtask);
-		taskExecution.fail(cause);
-	}
-
-	@Override
-	public void failJob(Throwable cause) {
-		jobVertex.getGraph().failGlobal(cause);
-	}
-
-	@Override
-	public int currentParallelism() {
-		return jobVertex.getParallelism();
-	}
-
-	private Execution getTaskExecution(int subtask) {
-		return jobVertex.getTaskVertices()[subtask].getCurrentExecutionAttempt();
-	}
-}