You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2017/12/31 16:54:12 UTC

flink git commit: [FLINK-8330] [flip6] Remove YarnClusterClientV2

Repository: flink
Updated Branches:
  refs/heads/master ce62945ae -> 3fdee00e4


[FLINK-8330] [flip6] Remove YarnClusterClientV2

The YarnClusterClientV2 is no longer needed since we have removed FlinkYarnCLI.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/3fdee00e
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/3fdee00e
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/3fdee00e

Branch: refs/heads/master
Commit: 3fdee00e45480c5471e6dbe0d2cd006fdd046b75
Parents: ce62945
Author: Till Rohrmann <tr...@apache.org>
Authored: Sun Dec 31 17:35:24 2017 +0100
Committer: Till Rohrmann <tr...@apache.org>
Committed: Sun Dec 31 17:35:24 2017 +0100

----------------------------------------------------------------------
 .../apache/flink/yarn/YarnClusterClientV2.java  | 151 -------------------
 .../flink/yarn/YarnClusterDescriptorV2.java     |   6 +-
 2 files changed, 2 insertions(+), 155 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/3fdee00e/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterClientV2.java
----------------------------------------------------------------------
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterClientV2.java b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterClientV2.java
deleted file mode 100644
index d1af6dc..0000000
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterClientV2.java
+++ /dev/null
@@ -1,151 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.yarn;
-
-import org.apache.flink.api.common.JobSubmissionResult;
-import org.apache.flink.client.program.ClusterClient;
-import org.apache.flink.client.program.ProgramInvocationException;
-import org.apache.flink.runtime.clusterframework.ApplicationStatus;
-import org.apache.flink.runtime.clusterframework.messages.GetClusterStatusResponse;
-import org.apache.flink.runtime.jobgraph.JobGraph;
-
-import org.apache.hadoop.yarn.api.records.ApplicationId;
-import org.apache.hadoop.yarn.client.api.YarnClient;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.net.InetSocketAddress;
-import java.net.URL;
-import java.util.List;
-
-/**
- * Java representation of a running Flink job on YARN.
- * Since flip-6, a flink job will be run as a yarn job by default, each job has a jobmaster,
- * so this class will be used as a client to communicate with yarn and start the job on yarn.
- */
-public class YarnClusterClientV2 extends ClusterClient {
-
-	private static final Logger LOG = LoggerFactory.getLogger(YarnClusterClientV2.class);
-
-	private YarnClient yarnClient;
-
-	private final AbstractYarnClusterDescriptor clusterDescriptor;
-
-	private ApplicationId appId;
-
-	private String trackingURL;
-
-	/**
-	 * Create a client to communicate with YARN cluster.
-	 *
-	 * @param clusterDescriptor The descriptor used to create yarn job
-	 * @param flinkConfig Flink configuration
-	 * @throws Exception if the cluster client could not be created
-	 */
-	public YarnClusterClientV2(
-			final AbstractYarnClusterDescriptor clusterDescriptor,
-			org.apache.flink.configuration.Configuration flinkConfig) throws Exception {
-
-		super(flinkConfig);
-
-		this.clusterDescriptor = clusterDescriptor;
-		this.yarnClient = clusterDescriptor.getYarnClient();
-		this.trackingURL = "";
-	}
-
-	@Override
-	public org.apache.flink.configuration.Configuration getFlinkConfiguration() {
-		return flinkConfig;
-	}
-
-	@Override
-	public int getMaxSlots() {
-        // No need not set max slot
-		return 0;
-	}
-
-	@Override
-	public boolean hasUserJarsInClassPath(List<URL> userJarFiles) {
-		return clusterDescriptor.hasUserJarFiles(userJarFiles);
-	}
-
-	@Override
-	protected JobSubmissionResult submitJob(JobGraph jobGraph, ClassLoader classLoader) throws ProgramInvocationException {
-		throw new UnsupportedOperationException("Not yet implemented.");
-	}
-
-	@Override
-	public String getWebInterfaceURL() {
-		// there seems to be a difference between HD 2.2.0 and 2.6.0
-		if (!trackingURL.startsWith("http://")) {
-			return "http://" + trackingURL;
-		} else {
-			return trackingURL;
-		}
-	}
-
-	@Override
-	public String getClusterIdentifier() {
-		return "Yarn cluster with application id " + getApplicationId();
-	}
-
-	/**
-	 * This method is only available if the cluster hasn't been started in detached mode.
-	 */
-	@Override
-	public GetClusterStatusResponse getClusterStatus() {
-		throw new UnsupportedOperationException("Not support getClusterStatus since Flip-6.");
-	}
-
-	public ApplicationStatus getApplicationStatus() {
-		//TODO: this method is useful for later
-		return null;
-	}
-
-	@Override
-	public List<String> getNewMessages() {
-		throw new UnsupportedOperationException("Not support getNewMessages since Flip-6.");
-	}
-
-	@Override
-	public void finalizeCluster() {
-		// Do nothing
-	}
-
-	@Override
-	public boolean isDetached() {
-		return super.isDetached() || clusterDescriptor.isDetachedMode();
-	}
-
-	@Override
-	public void waitForClusterToBeReady() {
-		throw new UnsupportedOperationException("Not support waitForClusterToBeReady since Flip-6.");
-	}
-
-	@Override
-	public InetSocketAddress getJobManagerAddress() {
-		//TODO: just return a local address in order to be compatible with createClient in CliFrontend
-		return new InetSocketAddress("localhost", 0);
-	}
-
-	public ApplicationId getApplicationId() {
-		return appId;
-	}
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/3fdee00e/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterDescriptorV2.java
----------------------------------------------------------------------
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterDescriptorV2.java b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterDescriptorV2.java
index 3e58da5..ed04523 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterDescriptorV2.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterDescriptorV2.java
@@ -23,10 +23,8 @@ import org.apache.flink.yarn.entrypoint.YarnJobClusterEntrypoint;
 import org.apache.flink.yarn.entrypoint.YarnSessionClusterEntrypoint;
 
 /**
- * Implementation of {@link org.apache.flink.yarn.AbstractYarnClusterDescriptor} which is used to start the new application master for a job under flip-6.
- * This implementation is now however tricky, since YarnClusterDescriptorV2 is related YarnClusterClientV2, but AbstractYarnClusterDescriptor is related
- * to YarnClusterClient. We should let YarnClusterDescriptorV2 implements ClusterDescriptor&lt;YarnClusterClientV2&gt;.
- * However, in order to use the code in AbstractYarnClusterDescriptor for setting environments and so on, we make YarnClusterDescriptorV2 as now.
+ * Implementation of {@link org.apache.flink.yarn.AbstractYarnClusterDescriptor} which is used to start the
+ * new application master for a job under flip-6.
  */
 public class YarnClusterDescriptorV2 extends AbstractYarnClusterDescriptor {