You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hama.apache.org by tj...@apache.org on 2011/10/11 22:00:47 UTC

svn commit: r1182058 - in /incubator/hama/branches/HamaV2/server: pom.xml src/main/java/org/apache/hama/bsp/BSPTaskLauncher.java src/main/java/org/apache/hama/bsp/JobImpl.java

Author: tjungblut
Date: Tue Oct 11 20:00:46 2011
New Revision: 1182058

URL: http://svn.apache.org/viewvc?rev=1182058&view=rev
Log:
fixed build- server module now has a packaged jar which can be used to run complete YARN bsps

Modified:
    incubator/hama/branches/HamaV2/server/pom.xml
    incubator/hama/branches/HamaV2/server/src/main/java/org/apache/hama/bsp/BSPTaskLauncher.java
    incubator/hama/branches/HamaV2/server/src/main/java/org/apache/hama/bsp/JobImpl.java

Modified: incubator/hama/branches/HamaV2/server/pom.xml
URL: http://svn.apache.org/viewvc/incubator/hama/branches/HamaV2/server/pom.xml?rev=1182058&r1=1182057&r2=1182058&view=diff
==============================================================================
--- incubator/hama/branches/HamaV2/server/pom.xml (original)
+++ incubator/hama/branches/HamaV2/server/pom.xml Tue Oct 11 20:00:46 2011
@@ -1,181 +1,135 @@
-<!--
-   Licensed to the Apache Software Foundation (ASF) under one or more
-   contributor license agreements.  See the NOTICE file distributed with
-   this work for additional information regarding copyright ownership.
-   The ASF licenses this file to You under the Apache License, Version 2.0
-   (the "License"); you may not use this file except in compliance with
-   the License.  You may obtain a copy of the License at
-
-       http://www.apache.org/licenses/LICENSE-2.0
-
-   Unless required by applicable law or agreed to in writing, software
-   distributed under the License is distributed on an "AS IS" BASIS,
-   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-   See the License for the specific language governing permissions and
-   limitations under the License.
--->
+<!-- Licensed to the Apache Software Foundation (ASF) under one or more contributor 
+	license agreements. See the NOTICE file distributed with this work for additional 
+	information regarding copyright ownership. The ASF licenses this file to 
+	You under the Apache License, Version 2.0 (the "License"); you may not use 
+	this file except in compliance with the License. You may obtain a copy of 
+	the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required 
+	by applicable law or agreed to in writing, software distributed under the 
+	License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS 
+	OF ANY KIND, either express or implied. See the License for the specific 
+	language governing permissions and limitations under the License. -->
 
 <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
-         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
+	xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
 
-  <parent>
-    <groupId>org.apache.hama</groupId>
-    <artifactId>hama-parent</artifactId>
-    <version>0.4.0-incubating-SNAPSHOT</version>
-  </parent>
-
-  <modelVersion>4.0.0</modelVersion>
-  <groupId>org.apache.hama</groupId>
-  <artifactId>hama-server</artifactId>
-  <name>Apache Hama Server</name>
-  <version>0.4.0-incubating-SNAPSHOT</version>
-  <packaging>jar</packaging>
-  <scm>
-    <connection>
+	<parent>
+		<groupId>org.apache.hama</groupId>
+		<artifactId>hama-parent</artifactId>
+		<version>0.4.0-incubating-SNAPSHOT</version>
+	</parent>
+
+	<modelVersion>4.0.0</modelVersion>
+	<groupId>org.apache.hama</groupId>
+	<artifactId>hama-server</artifactId>
+	<name>Apache Hama Server</name>
+	<version>0.4.0-incubating-SNAPSHOT</version>
+	<packaging>jar</packaging>
+	<scm>
+		<connection>
       scm:svn:http://svn.apache.org/repos/asf/incubator/hama/server
     </connection>
-    <developerConnection>
+		<developerConnection>
       scm:svn:https://svn.apache.org/repos/asf/incubator/hama/server
     </developerConnection>
-  </scm>
+	</scm>
 
-  <dependencies>
-    <dependency>
-      <groupId>org.apache.hama</groupId>
-      <artifactId>hama-core</artifactId>
-      <version>${project.version}</version>
-    </dependency>
-    <dependency>
-      <groupId>org.apache.hama</groupId>
-      <artifactId>hama-api</artifactId>
-      <version>${project.version}</version>
-    </dependency>  
-    <dependency>
-      <groupId>commons-logging</groupId>
-      <artifactId>commons-logging</artifactId>
-    </dependency>
-
-    <dependency>
-      <groupId>com.google.protobuf</groupId>
-      <artifactId>protobuf-java</artifactId>
-      <version>2.4.0a</version>
-    </dependency>
-    <dependency>
-      <groupId>org.apache.avro</groupId>
-      <artifactId>avro</artifactId>
-      <version>1.5.3</version>
-      <exclusions>
-        <exclusion>
-          <groupId>org.mortbay.jetty</groupId>
-          <artifactId>jetty</artifactId>
-        </exclusion>
-        <exclusion>
-          <groupId>org.apache.ant</groupId>
-          <artifactId>ant</artifactId>
-        </exclusion>
-        <exclusion>
-          <groupId>org.jboss.netty</groupId>
-          <artifactId>netty</artifactId>
-        </exclusion>
-        <exclusion>
-          <groupId>org.apache.velocity</groupId>
-          <artifactId>velocity</artifactId>
-        </exclusion>
-        <exclusion>
-          <groupId>org.slf4j</groupId>
-          <artifactId>slf4j-api</artifactId>
-        </exclusion>
-        <exclusion>
-          <artifactId>paranamer-ant</artifactId>
-          <groupId>com.thoughtworks.paranamer</groupId>
-        </exclusion>
-      </exclusions>
-    </dependency>
-
-
-    <dependency>
-      <groupId>org.apache.hadoop</groupId>
-      <artifactId>hadoop-common</artifactId>
-      <version>0.23.0-SNAPSHOT</version>
-      <scope>provided</scope>
-      <exclusions>
-        <exclusion>
-          <groupId>commons-el</groupId>
-          <artifactId>commons-el</artifactId>
-        </exclusion>
-        <exclusion>
-          <groupId>tomcat</groupId>
-          <artifactId>jasper-runtime</artifactId>
-        </exclusion>
-        <exclusion>
-          <groupId>tomcat</groupId>
-          <artifactId>jasper-compiler</artifactId>
-        </exclusion>
-        <exclusion>
-          <groupId>org.mortbay.jetty</groupId>
-          <artifactId>jsp-2.1-jetty</artifactId>
-        </exclusion>
-        <exclusion>
-          <groupId>hsqldb</groupId>
-          <artifactId>hsqldb</artifactId>
-        </exclusion>
-      </exclusions>
-    </dependency>
-
-    <dependency>
-      <groupId>org.apache.hadoop</groupId>
-      <artifactId>hadoop-yarn-api</artifactId>
-      <version>0.23.0-SNAPSHOT</version>
-      <scope>provided</scope>
-    </dependency>
-    <dependency>
-      <groupId>org.apache.hadoop</groupId>
-      <artifactId>hadoop-yarn-common</artifactId>
-      <version>0.23.0-SNAPSHOT</version>
-      <scope>provided</scope>
-    </dependency>
-<!--
-    <dependency>
-      <groupId>org.apache.hadoop</groupId>
-      <artifactId>hadoop-yarn-nodemanager</artifactId>
-      <version>0.23.0-SNAPSHOT</version>
-      <scope>provided</scope>
-    </dependency>
-    <dependency>
-      <groupId>org.apache.hadoop</groupId>
-      <artifactId>hadoop-yarn-resourcemanager</artifactId>
-      <version>0.23.0-SNAPSHOT</version>
-      <scope>provided</scope>
-    </dependency>
--->
-    <dependency>
-      <groupId>org.apache.zookeeper</groupId>
-      <artifactId>zookeeper</artifactId>
-    </dependency>
-  </dependencies>
-
-  <build>
-    <plugins>
-      <plugin>
-        <groupId>org.apache.maven.plugins</groupId>
-        <artifactId>maven-dependency-plugin</artifactId>
-        <executions>
-          <execution>
-            <id>copy-dependencies</id>
-            <phase>package</phase>
-            <goals>
-              <goal>copy-dependencies</goal>
-            </goals>
-            <configuration>
-              <outputDirectory>${basedir}/target/lib</outputDirectory>
-              <overWriteReleases>false</overWriteReleases>
-              <overWriteSnapshots>true</overWriteSnapshots>
-              <excludeTransitive>true</excludeTransitive>
-            </configuration>
-          </execution>
-        </executions>
-      </plugin>
-    </plugins>
-    <finalName>hama-server-${project.version}</finalName>
-  </build>
+	<dependencies>
+		<dependency>
+			<groupId>org.apache.hama</groupId>
+			<artifactId>hama-core</artifactId>
+			<version>${project.version}</version>
+			<exclusions>
+				<exclusion>
+					<groupId>org.apache.hadoop</groupId>
+					<artifactId>hadoop-core</artifactId>
+				</exclusion>
+			</exclusions>
+		</dependency>
+		<dependency>
+			<groupId>org.apache.hama</groupId>
+			<artifactId>hama-api</artifactId>
+			<version>${project.version}</version>
+			<exclusions>
+				<exclusion>
+					<groupId>org.apache.hadoop</groupId>
+					<artifactId>hadoop-core</artifactId>
+				</exclusion>
+			</exclusions>
+		</dependency>
+
+		<dependency>
+			<groupId>commons-logging</groupId>
+			<artifactId>commons-logging</artifactId>
+		</dependency>
+		<dependency>
+			<groupId>com.google.protobuf</groupId>
+			<artifactId>protobuf-java</artifactId>
+			<version>2.4.0a</version>
+		</dependency>
+		<dependency>
+			<groupId>org.apache.avro</groupId>
+			<artifactId>avro</artifactId>
+			<version>1.5.3</version>
+		</dependency>
+
+
+		<dependency>
+			<groupId>org.apache.hadoop</groupId>
+			<artifactId>hadoop-common</artifactId>
+			<version>0.23.0-SNAPSHOT</version>
+		</dependency>
+		<dependency>
+			<groupId>org.apache.hadoop</groupId>
+			<artifactId>hadoop-yarn-api</artifactId>
+			<version>0.23.0-SNAPSHOT</version>
+		</dependency>
+		<dependency>
+			<groupId>org.apache.hadoop</groupId>
+			<artifactId>hadoop-yarn-common</artifactId>
+			<version>0.23.0-SNAPSHOT</version>
+		</dependency>
+
+		<dependency>
+			<groupId>org.apache.zookeeper</groupId>
+			<artifactId>zookeeper</artifactId>
+		</dependency>
+	</dependencies>
+
+	<build>
+		<plugins>
+			<plugin>
+				<groupId>org.apache.maven.plugins</groupId>
+				<artifactId>maven-dependency-plugin</artifactId>
+				<executions>
+					<execution>
+						<id>copy-dependencies</id>
+						<phase>package</phase>
+						<goals>
+							<goal>copy-dependencies</goal>
+						</goals>
+						<configuration>
+							<outputDirectory>${basedir}/target/lib</outputDirectory>
+							<overWriteReleases>false</overWriteReleases>
+							<overWriteSnapshots>true</overWriteSnapshots>
+							<excludeTransitive>true</excludeTransitive>
+						</configuration>
+					</execution>
+				</executions>
+			</plugin>
+			<plugin>
+				<groupId>org.apache.maven.plugins</groupId>
+				<artifactId>maven-shade-plugin</artifactId>
+				<version>1.4</version>
+				<executions>
+					<execution>
+						<phase>package</phase>
+						<goals>
+							<goal>shade</goal>
+						</goals>
+					</execution>
+				</executions>
+			</plugin>
+		</plugins>
+		<finalName>hama-server-${project.version}</finalName>
+	</build>
 </project>

Modified: incubator/hama/branches/HamaV2/server/src/main/java/org/apache/hama/bsp/BSPTaskLauncher.java
URL: http://svn.apache.org/viewvc/incubator/hama/branches/HamaV2/server/src/main/java/org/apache/hama/bsp/BSPTaskLauncher.java?rev=1182058&r1=1182057&r2=1182058&view=diff
==============================================================================
--- incubator/hama/branches/HamaV2/server/src/main/java/org/apache/hama/bsp/BSPTaskLauncher.java (original)
+++ incubator/hama/branches/HamaV2/server/src/main/java/org/apache/hama/bsp/BSPTaskLauncher.java Tue Oct 11 20:00:46 2011
@@ -115,7 +115,7 @@ public class BSPTaskLauncher implements 
 
     ctx.setCommands(Arrays.asList("${JAVA_HOME}" + "/bin/java -cp './package/*' ",
         BSPTaskLauncher.class.getCanonicalName(), jobId.getJtIdentifier(), id
-            + "", this.jobFile.makeQualified(FileSystem.get(conf)).toString(),
+            + "", this.jobFile.makeQualified(fs.getUri(), fs.getWorkingDirectory()).toString(),
         " 1>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stdout", " 2>"
             + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stderr"));
     ctx.setLocalResources(Collections.singletonMap("package", packageResource));

Modified: incubator/hama/branches/HamaV2/server/src/main/java/org/apache/hama/bsp/JobImpl.java
URL: http://svn.apache.org/viewvc/incubator/hama/branches/HamaV2/server/src/main/java/org/apache/hama/bsp/JobImpl.java?rev=1182058&r1=1182057&r2=1182058&view=diff
==============================================================================
--- incubator/hama/branches/HamaV2/server/src/main/java/org/apache/hama/bsp/JobImpl.java (original)
+++ incubator/hama/branches/HamaV2/server/src/main/java/org/apache/hama/bsp/JobImpl.java Tue Oct 11 20:00:46 2011
@@ -17,6 +17,7 @@
  */
 package org.apache.hama.bsp;
 
+import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
@@ -42,6 +43,7 @@ import org.apache.hadoop.yarn.api.record
 import org.apache.hadoop.yarn.api.records.ResourceRequest;
 import org.apache.hadoop.yarn.exceptions.YarnRemoteException;
 import org.apache.hadoop.yarn.ipc.YarnRPC;
+import org.apache.hadoop.yarn.util.BuilderUtils;
 import org.apache.hadoop.yarn.util.Records;
 import org.apache.hama.bsp.BSPTaskLauncher.BSPTaskStatus;
 
@@ -74,6 +76,8 @@ public class JobImpl implements Job {
   private ExecutorCompletionService<BSPTaskStatus> completionService = new ExecutorCompletionService<BSPTaskStatus>(
       threadPool);
   private Map<Integer, BSPTaskLauncher> launchers = new HashMap<Integer, BSPTaskLauncher>();
+  private int lastResponseID = 0;
+  private Resource availableResources;
 
   public JobImpl(ApplicationAttemptId appAttemptId,
       Configuration jobConfiguration, YarnRPC yarnRPC, AMRMProtocol amrmRPC,
@@ -131,36 +135,20 @@ public class JobImpl implements Job {
   @Override
   public JobState startJob() throws YarnRemoteException, InterruptedException,
       ExecutionException {
-
-    ResourceRequest request = createBSPTaskRequest(getTotalBSPTasks(),
-        taskMemoryInMb, priority);
-
-    AllocateRequest req = Records.newRecord(AllocateRequest.class);
-    // response id zero because this is our initial allocation
-    req.setResponseId(0);
-    // set ApplicationAttemptId
-    req.setApplicationAttemptId(appAttemptId);
-    // add our task request
-    req.addAsk(request);
-    // always an empty list
-    req.addAllReleases(releasedContainers);
-    // we don't have a real progress, so it is always zero
-    req.setProgress(0);
+    
+    AllocateRequest req = BuilderUtils.newAllocateRequest(
+        appAttemptId, lastResponseID, 0.0f,createBSPTaskRequest(getTotalBSPTasks(),
+            taskMemoryInMb, priority), releasedContainers);
 
     AllocateResponse allocateResponse = resourceManager.allocate(req);
     AMResponse amResponse = allocateResponse.getAMResponse();
     LOG.info("Got response! ID: " + amResponse.getResponseId()
         + " with num of containers: "
-        + amResponse.getAllocatedContainers().size());
-    // somehow the response id is always incremented
-    if (amResponse.getResponseId() == 1) {
-      this.allocatedContainers = amResponse.getAllocatedContainers();
-    } else {
-      LOG.error("Response IDs somehow did not match. Got: "
-          + amResponse.getResponseId() + " where it should be 1 (one).");
-      state = JobState.FAILED;
-      return state;
-    }
+        + amResponse.getAllocatedContainers().size() + " and following resources: " + amResponse.getAvailableResources().getMemory()+ "mb");
+    this.lastResponseID = amResponse.getResponseId();
+    
+    this.availableResources = amResponse.getAvailableResources();
+    this.allocatedContainers = amResponse.getAllocatedContainers();
 
     int launchedBSPTasks = 0;
 
@@ -220,32 +208,37 @@ public class JobImpl implements Job {
     threadPool.shutdownNow();
   }
 
-  private ResourceRequest createBSPTaskRequest(int numBSPTasks, int memoryInMb,
-      int priority) {
-    // Resource Request
-    ResourceRequest rsrcRequest = Records.newRecord(ResourceRequest.class);
-
-    // setup requirements for hosts
-    // whether a particular rack/host is needed
-    // useful for applications that are sensitive
-    // to data locality
-    rsrcRequest.setHostName("*");
-
-    // set the priority for the request
-    Priority pri = Records.newRecord(Priority.class);
-    pri.setPriority(priority);
-    rsrcRequest.setPriority(pri);
-
-    // Set up resource type requirements
-    // For now, only memory is supported so we set memory requirements
-    Resource capability = Records.newRecord(Resource.class);
-    capability.setMemory(memoryInMb);
-    rsrcRequest.setCapability(capability);
-
-    // set no. of containers needed
-    // matching the specifications
-    rsrcRequest.setNumContainers(numBSPTasks);
-    return rsrcRequest;
+  private List<ResourceRequest> createBSPTaskRequest(int numTasks,
+      int memoryInMb, int priority) {
+
+    List<ResourceRequest> reqList = new ArrayList<ResourceRequest>(numTasks);
+    for (int i = 0; i < numTasks; i++) {
+      // Resource Request
+      ResourceRequest rsrcRequest = Records.newRecord(ResourceRequest.class);
+
+      // setup requirements for hosts
+      // whether a particular rack/host is needed
+      // useful for applications that are sensitive
+      // to data locality
+      rsrcRequest.setHostName("*");
+
+      // set the priority for the request
+      Priority pri = Records.newRecord(Priority.class);
+      pri.setPriority(priority);
+      rsrcRequest.setPriority(pri);
+
+      // Set up resource type requirements
+      // For now, only memory is supported so we set memory requirements
+      Resource capability = Records.newRecord(Resource.class);
+      capability.setMemory(memoryInMb);
+      rsrcRequest.setCapability(capability);
+
+      // set no. of containers needed
+      // matching the specifications
+      rsrcRequest.setNumContainers(numBSPTasks);
+      reqList.add(rsrcRequest);
+    }
+    return reqList;
   }
 
   @Override