You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by rm...@apache.org on 2015/01/23 18:43:56 UTC

[3/6] flink git commit: [FLINK-1295][FLINK-883] Allow to deploy 'job only' YARN cluster. Add tests to YARN

http://git-wip-us.apache.org/repos/asf/flink/blob/2af65867/flink-yarn/src/main/scala/org/apache/flink/yarn/Messages.scala
----------------------------------------------------------------------
diff --git a/flink-yarn/src/main/scala/org/apache/flink/yarn/Messages.scala b/flink-yarn/src/main/scala/org/apache/flink/yarn/Messages.scala
new file mode 100644
index 0000000..47ce782
--- /dev/null
+++ b/flink-yarn/src/main/scala/org/apache/flink/yarn/Messages.scala
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.yarn
+
+import java.util.Date
+
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.yarn.api.records.FinalApplicationStatus
+
+object Messages {
+  case class YarnMessage(message: String, date: Date = new Date())
+  case class ApplicationMasterStatus(numTaskManagers: Int, numSlots: Int)
+  case object RegisterClient
+
+  case class StopYarnSession(status: FinalApplicationStatus)
+  case object JobManagerStopped
+  case class StartYarnSession(configuration: Configuration, actorSystemPort: Int)
+
+  case object PollContainerCompletion
+  case object PollYarnClusterStatus // see org.apache.flink.runtime.yarn.FlinkYarnClusterStatus for
+                                    // the response
+  case object CheckForUserCommand
+
+  // Client-local messages
+  case class LocalRegisterClient(jobManagerAddress: String)
+  case object LocalGetYarnMessage // request new message
+  case object LocalGetYarnClusterStatus // request the latest cluster status
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/2af65867/flink-yarn/src/main/scala/org/apache/flink/yarn/YarnJobManager.scala
----------------------------------------------------------------------
diff --git a/flink-yarn/src/main/scala/org/apache/flink/yarn/YarnJobManager.scala b/flink-yarn/src/main/scala/org/apache/flink/yarn/YarnJobManager.scala
new file mode 100644
index 0000000..7e9570e
--- /dev/null
+++ b/flink-yarn/src/main/scala/org/apache/flink/yarn/YarnJobManager.scala
@@ -0,0 +1,312 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.yarn
+
+import java.io.{IOException, File}
+import java.nio.ByteBuffer
+import java.util.{ Collections}
+
+import akka.actor.ActorRef
+import org.apache.flink.configuration.ConfigConstants
+import org.apache.flink.runtime.ActorLogMessages
+import org.apache.flink.runtime.jobmanager.{WithWebServer, JobManager}
+import org.apache.flink.runtime.yarn.FlinkYarnClusterStatus
+import org.apache.flink.yarn.Messages._
+import org.apache.flink.yarn.appMaster.YarnTaskManagerRunner
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.fs.{FileSystem, Path}
+import org.apache.hadoop.io.DataOutputBuffer
+import org.apache.hadoop.security.UserGroupInformation
+import org.apache.hadoop.yarn.api.ApplicationConstants
+import org.apache.hadoop.yarn.api.ApplicationConstants.Environment
+import org.apache.hadoop.yarn.api.records._
+import org.apache.hadoop.yarn.client.api.{NMClient, AMRMClient}
+import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest
+import org.apache.hadoop.yarn.util.Records
+
+import scala.concurrent.duration._
+
+
+trait YarnJobManager extends ActorLogMessages {
+  that: JobManager with WithWebServer =>
+
+  import context._
+  import scala.collection.JavaConverters._
+
+  val ALLOCATION_DELAY = 100 milliseconds
+  val COMPLETION_DELAY = 5 seconds
+
+  var rmClientOption: Option[AMRMClient[ContainerRequest]] = None
+  var nmClientOption: Option[NMClient] = None
+  var messageListener:Option[ActorRef] = None
+  var containerLaunchContext: Option[ContainerLaunchContext] = None
+
+  var allocatedContainers = 0
+  var completedContainers = 0
+  var numTaskManager = 0
+
+
+  abstract override def receiveWithLogMessages: Receive = {
+    receiveYarnMessages orElse super.receiveWithLogMessages
+  }
+
+  def receiveYarnMessages: Receive = {
+    case StopYarnSession(status) =>
+      log.info("Stopping YARN Session.")
+
+      instanceManager.getAllRegisteredInstances.asScala foreach {
+        instance =>
+          instance.getTaskManager ! StopYarnSession(status)
+      }
+
+      rmClientOption foreach {
+        rmClient =>
+          rmClient.unregisterApplicationMaster(status, "", "")
+          rmClient.close()
+      }
+
+      rmClientOption = None
+
+      nmClientOption foreach {
+        _.close()
+      }
+
+      nmClientOption = None
+      messageListener foreach {
+        _ ! JobManagerStopped
+      }
+      context.system.shutdown()
+
+    case RegisterClient =>
+      messageListener = Some(sender())
+
+    case PollYarnClusterStatus =>
+      sender() ! new FlinkYarnClusterStatus(instanceManager.getNumberOfRegisteredTaskManagers,
+        instanceManager.getTotalNumberOfSlots)
+
+    case StartYarnSession(conf, actorSystemPort: Int) => {
+      log.info("Start yarn session.")
+      val memoryPerTaskManager = env.get(FlinkYarnClient.ENV_TM_MEMORY).toInt
+      val heapLimit = Utils.calculateHeapSize(memoryPerTaskManager)
+
+      val applicationMasterHost = env.get(Environment.NM_HOST.key)
+      require(applicationMasterHost != null, s"Application master (${Environment.NM_HOST} not set.")
+
+      numTaskManager = env.get(FlinkYarnClient.ENV_TM_COUNT).toInt
+      log.info(s"Requesting ${numTaskManager} task managers.")
+
+      val remoteFlinkJarPath = env.get(FlinkYarnClient.FLINK_JAR_PATH)
+      val fs = FileSystem.get(conf)
+      val appId = env.get(FlinkYarnClient.ENV_APP_ID)
+      val currDir = env.get(Environment.PWD.key())
+      val clientHomeDir = env.get(FlinkYarnClient.ENV_CLIENT_HOME_DIR)
+      val shipListString = env.get(FlinkYarnClient.ENV_CLIENT_SHIP_FILES)
+      val yarnClientUsername = env.get(FlinkYarnClient.ENV_CLIENT_USERNAME)
+
+      val jobManagerWebPort = that.webServer.getServer.getConnectors()(0).getLocalPort;
+
+      val rm = AMRMClient.createAMRMClient[ContainerRequest]()
+      rm.init(conf)
+      rm.start()
+
+      rmClientOption = Some(rm)
+
+      val nm = NMClient.createNMClient()
+      nm.init(conf)
+      nm.start()
+      nm.cleanupRunningContainersOnStop(true)
+
+      nmClientOption = Some(nm)
+
+      // Register with ResourceManager
+      val url = s"http://$applicationMasterHost:$jobManagerWebPort"
+      log.info(s"Registering ApplicationMaster with tracking url $url.")
+      rm.registerApplicationMaster(applicationMasterHost, actorSystemPort, url)
+
+
+      // Priority for worker containers - priorities are intra-application
+      val priority = Records.newRecord(classOf[Priority])
+      priority.setPriority(0)
+
+      // Resource requirements for worker containers
+      val capability = Records.newRecord(classOf[Resource])
+      capability.setMemory(memoryPerTaskManager)
+      capability.setVirtualCores(1) // hard-code that number (YARN is not accunting for CPUs)
+
+      // Make container requests to ResourceManager
+      for (i <- 0 until numTaskManager) {
+        val containerRequest = new ContainerRequest(capability, null, null, priority)
+        log.info(s"Requesting TaskManager container $i.")
+        rm.addContainerRequest(containerRequest)
+      }
+
+      val flinkJar = Records.newRecord(classOf[LocalResource])
+      val flinkConf = Records.newRecord(classOf[LocalResource])
+
+      // register Flink Jar with remote HDFS
+      val remoteJarPath = new Path(remoteFlinkJarPath)
+      Utils.registerLocalResource(fs, remoteJarPath, flinkJar)
+
+      // register conf with local fs
+      Utils.setupLocalResource(conf, fs, appId, new Path(s"file://$currDir/flink-conf-modified" +
+        s".yaml"), flinkConf, new Path(clientHomeDir))
+      log.info(s"Prepared local resource for modified yaml: $flinkConf")
+
+      val hasLogback = new File(s"$currDir/logback.xml").exists()
+      val hasLog4j = new File(s"$currDir/log4j.properties").exists()
+
+      // prepare files to be shipped
+      val resources = shipListString.split(",") flatMap {
+        pathStr =>
+          if (pathStr.isEmpty) {
+            None
+          } else {
+            val resource = Records.newRecord(classOf[LocalResource])
+            val path = new Path(pathStr)
+            Utils.registerLocalResource(fs, path, resource)
+            Some((path.getName, resource))
+          }
+      } toList
+
+      val taskManagerLocalResources = ("flink.jar", flinkJar) ::("flink-conf.yaml",
+        flinkConf) :: resources toMap
+
+      allocatedContainers = 0
+      completedContainers = 0
+
+      containerLaunchContext = Some(createContainerLaunchContext(heapLimit, hasLogback, hasLog4j,
+        yarnClientUsername, conf, taskManagerLocalResources))
+
+      context.system.scheduler.scheduleOnce(ALLOCATION_DELAY, self, PollContainerCompletion)
+    }
+
+    case PollContainerCompletion => {
+      rmClientOption match {
+        case Some(rmClient) => {
+          val response = rmClient.allocate(completedContainers.toFloat / numTaskManager)
+
+          for (container <- response.getAllocatedContainers.asScala) {
+            log.info(s"Got new container for TM ${container.getId} on host ${
+              container.getNodeId.getHost}")
+
+            allocatedContainers += 1
+
+            log.info(s"Launching container #$allocatedContainers.")
+            nmClientOption match {
+              case Some(nmClient) => {
+                containerLaunchContext match {
+                  case Some(ctx) => nmClient.startContainer(container, ctx)
+                  case None => {
+                    log.error("The ContainerLaunchContext was not set.")
+                    self ! StopYarnSession(FinalApplicationStatus.FAILED)
+                  }
+                }
+              }
+              case None => {
+                log.error("The NMClient was not set.")
+                self ! StopYarnSession(FinalApplicationStatus.FAILED)
+              }
+            }
+          }
+
+          for (status <- response.getCompletedContainersStatuses.asScala) {
+            completedContainers += 1
+            log.info(s"Completed container ${status.getContainerId}. Total completed " +
+              s"$completedContainers.")
+            log.info(s"Diagnostics ${status.getDiagnostics}.")
+
+            messageListener foreach {
+              _ ! YarnMessage(s"Diagnostics for containerID=${status.getContainerId} in " +
+                s"state=${status.getState}.\n${status.getDiagnostics}")
+            }
+          }
+
+          if (allocatedContainers < numTaskManager) {
+            context.system.scheduler.scheduleOnce(ALLOCATION_DELAY, self, PollContainerCompletion)
+          } else if (completedContainers < numTaskManager) {
+            context.system.scheduler.scheduleOnce(COMPLETION_DELAY, self, PollContainerCompletion)
+          } else {
+            self ! StopYarnSession(FinalApplicationStatus.FAILED)
+          }
+        }
+        case None => {
+          log.error("The AMRMClient was not set.")
+          self ! StopYarnSession(FinalApplicationStatus.FAILED)
+        }
+      }
+    }
+  }
+
+  def createContainerLaunchContext(heapLimit: Int, hasLogback: Boolean, hasLog4j: Boolean,
+                                   yarnClientUsername: String, yarnConf: Configuration,
+                                   taskManagerLocalResources: Map[String, LocalResource]):
+  ContainerLaunchContext = {
+    log.info("Create container launch context.")
+    val ctx = Records.newRecord(classOf[ContainerLaunchContext])
+
+    val javaOpts = configuration.getString(ConfigConstants.FLINK_JVM_OPTIONS, "")
+    val tmCommand = new StringBuilder(s"$$JAVA_HOME/bin/java -Xmx${heapLimit}m $javaOpts")
+
+    if (hasLogback || hasLog4j) {
+      tmCommand ++=
+        s""" -Dlog.file="${ApplicationConstants.LOG_DIR_EXPANSION_VAR}/taskmanager.log""""
+    }
+
+    if (hasLogback) {
+      tmCommand ++= s" -Dlogback.configurationFile=file:logback.xml"
+    }
+
+    if (hasLog4j) {
+      tmCommand ++= s" -Dlog4j.configuration=file:log4j.properties"
+    }
+
+    tmCommand ++= s" ${classOf[YarnTaskManagerRunner].getName} --configDir . 1> " +
+      s"${ApplicationConstants.LOG_DIR_EXPANSION_VAR}/taskmanager-stdout.log 2> " +
+      s"${ApplicationConstants.LOG_DIR_EXPANSION_VAR}/taskmanager-stderr.log"
+
+    ctx.setCommands(Collections.singletonList(tmCommand.toString()))
+
+    log.info(s"Starting TM with command=${tmCommand.toString()}")
+
+    ctx.setLocalResources(taskManagerLocalResources.asJava)
+
+    // Setup classpath for container ( = TaskManager )
+    val containerEnv = new java.util.HashMap[String, String]()
+    Utils.setupEnv(yarnConf, containerEnv)
+    containerEnv.put(FlinkYarnClient.ENV_CLIENT_USERNAME, yarnClientUsername)
+    ctx.setEnvironment(containerEnv)
+
+    val user = UserGroupInformation.getCurrentUser
+
+    try {
+      val credentials = user.getCredentials
+      val dob = new DataOutputBuffer()
+      credentials.writeTokenStorageToStream(dob)
+      val securityTokens = ByteBuffer.wrap(dob.getData, 0, dob.getLength)
+      ctx.setTokens(securityTokens)
+    } catch {
+      case e: IOException =>
+        log.warning("Getting current user info failed when trying to launch the container", e)
+    }
+
+    ctx
+  }
+
+  def env = System.getenv()
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/2af65867/flink-yarn/src/main/scala/org/apache/flink/yarn/YarnTaskManager.scala
----------------------------------------------------------------------
diff --git a/flink-yarn/src/main/scala/org/apache/flink/yarn/YarnTaskManager.scala b/flink-yarn/src/main/scala/org/apache/flink/yarn/YarnTaskManager.scala
new file mode 100644
index 0000000..b596946
--- /dev/null
+++ b/flink-yarn/src/main/scala/org/apache/flink/yarn/YarnTaskManager.scala
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.yarn
+
+import org.apache.flink.runtime.ActorLogMessages
+import org.apache.flink.runtime.taskmanager.TaskManager
+import org.apache.flink.yarn.Messages.StopYarnSession
+
+trait YarnTaskManager extends ActorLogMessages {
+  that: TaskManager =>
+
+  abstract override def receiveWithLogMessages: Receive = {
+    receiveYarnMessages orElse super.receiveWithLogMessages
+  }
+
+  def receiveYarnMessages: Receive = {
+    case StopYarnSession(status) => {
+      log.info(s"Stopping YARN TaskManager with final application status $status")
+      context.system.shutdown()
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/2af65867/flink-yarn/src/main/scala/org/apache/flink/yarn/YarnUtils.scala
----------------------------------------------------------------------
diff --git a/flink-yarn/src/main/scala/org/apache/flink/yarn/YarnUtils.scala b/flink-yarn/src/main/scala/org/apache/flink/yarn/YarnUtils.scala
new file mode 100644
index 0000000..185190d
--- /dev/null
+++ b/flink-yarn/src/main/scala/org/apache/flink/yarn/YarnUtils.scala
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.yarn
+
+import akka.actor.{Props, ActorRef, ActorSystem}
+import com.typesafe.config.ConfigFactory
+import org.apache.flink.configuration.Configuration
+import org.apache.flink.runtime.akka.AkkaUtils
+import org.apache.flink.runtime.taskmanager.TaskManager
+import org.slf4j.LoggerFactory
+
+object YarnUtils {
+
+  val LOG = LoggerFactory.getLogger(this.getClass)
+
+  def createActorSystem(hostname: String, port: Int, configuration: Configuration): ActorSystem = {
+    val akkaConfig = ConfigFactory.parseString(AkkaUtils.getConfigString(hostname, port,
+      configuration) + getConfigString)
+
+    AkkaUtils.createActorSystem(akkaConfig)
+  }
+
+  def createActorSystem(): ActorSystem = {
+    val akkaConfig = ConfigFactory.parseString(AkkaUtils.getDefaultActorSystemConfigString +
+      getConfigString)
+
+    AkkaUtils.createActorSystem(akkaConfig)
+  }
+
+  def getConfigString: String = {
+    """
+    |akka{
+    |  loglevel = "DEBUG"
+    |  stdout-loglevel = "DEBUG"
+    |  log-dead-letters-during-shutdown = off
+    |  log-dead-letters = off
+    |
+    |  actor {
+    |    provider = "akka.remote.RemoteActorRefProvider"
+    |  }
+    |
+    |  remote{
+    |    log-remote-lifecycle-events = off
+    |
+    |    netty{
+    |      tcp{
+    |        transport-class = "akka.remote.transport.netty.NettyTransport"
+    |        tcp-nodelay = on
+    |        maximum-frame-size = 1MB
+    |        execution-pool-size = 4
+    |      }
+    |    }
+    |  }
+    |}""".stripMargin
+  }
+
+  def startActorSystemAndTaskManager(args: Array[String]): (ActorSystem, ActorRef) = {
+    val (hostname, port, config) = TaskManager.parseArgs(args)
+
+    val actorSystem = createActorSystem(hostname, port, config)
+
+    val (connectionInfo, jobManagerURL, taskManagerConfig, networkConnectionConfiguration) =
+      TaskManager.parseConfiguration(hostname, config, false)
+
+    (actorSystem, TaskManager.startActor(Props(new TaskManager(connectionInfo, jobManagerURL,
+      taskManagerConfig, networkConnectionConfiguration) with YarnTaskManager))(actorSystem))
+  }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/2af65867/flink-yarn/src/test/java/org/apache/flink/yarn/UtilsTests.java
----------------------------------------------------------------------
diff --git a/flink-yarn/src/test/java/org/apache/flink/yarn/UtilsTests.java b/flink-yarn/src/test/java/org/apache/flink/yarn/UtilsTests.java
new file mode 100644
index 0000000..c8d639a
--- /dev/null
+++ b/flink-yarn/src/test/java/org/apache/flink/yarn/UtilsTests.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.yarn;
+
+import org.apache.hadoop.fs.Path;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class UtilsTests {
+
+	@Test
+	public void testHeapCutoff() {
+
+		// ASSUMES DEFAULT Configuration values.
+		Assert.assertEquals(800, Utils.calculateHeapSize(1000) );
+		Assert.assertEquals(9500, Utils.calculateHeapSize(10000) );
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/2af65867/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index bcff76f..e792a3a 100644
--- a/pom.xml
+++ b/pom.xml
@@ -349,7 +349,8 @@ under the License.
 					</exclusion>
 				</exclusions>
 			</dependency>
-			<!-- YARN -->
+
+			<!-- Hadoop 2 Dependencies -->
 			<dependency>
 				<groupId>org.apache.hadoop</groupId>
 				<artifactId>hadoop-common</artifactId>
@@ -573,6 +574,23 @@ under the License.
 					</exclusion>
 				</exclusions>
 			</dependency>
+			<dependency>
+				<groupId>org.apache.hadoop</groupId>
+				<artifactId>hadoop-yarn-common</artifactId>
+				<version>${hadoop.version}</version>
+			</dependency>
+			<dependency>
+				<groupId>org.apache.hadoop</groupId>
+				<artifactId>hadoop-yarn-server-tests</artifactId>
+				<scope>test</scope>
+				<version>${hadoop.version}</version>
+			</dependency>
+			<dependency>
+				<groupId>org.apache.hadoop</groupId>
+				<artifactId>hadoop-minicluster</artifactId>
+				<scope>test</scope>
+				<version>${hadoop.version}</version>
+			</dependency>
 		</dependencies>
 	</dependencyManagement>
 
@@ -603,6 +621,19 @@ under the License.
 		</profile>
 
 		<profile>
+			<id>include-yarn</id>
+			<activation>
+				<property>
+				<!-- Please do not remove the 'hadoop1' comment. See ./tools/generate_specific_pom.sh -->
+				<!--hadoop2--><name>!hadoop.profile</name>
+				</property>
+			</activation>
+			<modules>
+				<module>flink-yarn</module>
+				<module>flink-yarn-tests</module>
+			</modules>
+		</profile>
+		<profile>
 			<id>hadoop-2.0.0-alpha</id>
 			<activation>
 				<property>