You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hama.apache.org by tj...@apache.org on 2011/10/11 22:00:47 UTC
svn commit: r1182058 - in /incubator/hama/branches/HamaV2/server: pom.xml
src/main/java/org/apache/hama/bsp/BSPTaskLauncher.java
src/main/java/org/apache/hama/bsp/JobImpl.java
Author: tjungblut
Date: Tue Oct 11 20:00:46 2011
New Revision: 1182058
URL: http://svn.apache.org/viewvc?rev=1182058&view=rev
Log:
fixed build- server module now has a packaged jar which can be used to run complete YARN bsps
Modified:
incubator/hama/branches/HamaV2/server/pom.xml
incubator/hama/branches/HamaV2/server/src/main/java/org/apache/hama/bsp/BSPTaskLauncher.java
incubator/hama/branches/HamaV2/server/src/main/java/org/apache/hama/bsp/JobImpl.java
Modified: incubator/hama/branches/HamaV2/server/pom.xml
URL: http://svn.apache.org/viewvc/incubator/hama/branches/HamaV2/server/pom.xml?rev=1182058&r1=1182057&r2=1182058&view=diff
==============================================================================
--- incubator/hama/branches/HamaV2/server/pom.xml (original)
+++ incubator/hama/branches/HamaV2/server/pom.xml Tue Oct 11 20:00:46 2011
@@ -1,181 +1,135 @@
-<!--
- Licensed to the Apache Software Foundation (ASF) under one or more
- contributor license agreements. See the NOTICE file distributed with
- this work for additional information regarding copyright ownership.
- The ASF licenses this file to You under the Apache License, Version 2.0
- (the "License"); you may not use this file except in compliance with
- the License. You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
- Unless required by applicable law or agreed to in writing, software
- distributed under the License is distributed on an "AS IS" BASIS,
- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- See the License for the specific language governing permissions and
- limitations under the License.
--->
+<!-- Licensed to the Apache Software Foundation (ASF) under one or more contributor
+ license agreements. See the NOTICE file distributed with this work for additional
+ information regarding copyright ownership. The ASF licenses this file to
+ You under the Apache License, Version 2.0 (the "License"); you may not use
+ this file except in compliance with the License. You may obtain a copy of
+ the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required
+ by applicable law or agreed to in writing, software distributed under the
+ License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS
+ OF ANY KIND, either express or implied. See the License for the specific
+ language governing permissions and limitations under the License. -->
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
- xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
- <parent>
- <groupId>org.apache.hama</groupId>
- <artifactId>hama-parent</artifactId>
- <version>0.4.0-incubating-SNAPSHOT</version>
- </parent>
-
- <modelVersion>4.0.0</modelVersion>
- <groupId>org.apache.hama</groupId>
- <artifactId>hama-server</artifactId>
- <name>Apache Hama Server</name>
- <version>0.4.0-incubating-SNAPSHOT</version>
- <packaging>jar</packaging>
- <scm>
- <connection>
+ <parent>
+ <groupId>org.apache.hama</groupId>
+ <artifactId>hama-parent</artifactId>
+ <version>0.4.0-incubating-SNAPSHOT</version>
+ </parent>
+
+ <modelVersion>4.0.0</modelVersion>
+ <groupId>org.apache.hama</groupId>
+ <artifactId>hama-server</artifactId>
+ <name>Apache Hama Server</name>
+ <version>0.4.0-incubating-SNAPSHOT</version>
+ <packaging>jar</packaging>
+ <scm>
+ <connection>
scm:svn:http://svn.apache.org/repos/asf/incubator/hama/server
</connection>
- <developerConnection>
+ <developerConnection>
scm:svn:https://svn.apache.org/repos/asf/incubator/hama/server
</developerConnection>
- </scm>
+ </scm>
- <dependencies>
- <dependency>
- <groupId>org.apache.hama</groupId>
- <artifactId>hama-core</artifactId>
- <version>${project.version}</version>
- </dependency>
- <dependency>
- <groupId>org.apache.hama</groupId>
- <artifactId>hama-api</artifactId>
- <version>${project.version}</version>
- </dependency>
- <dependency>
- <groupId>commons-logging</groupId>
- <artifactId>commons-logging</artifactId>
- </dependency>
-
- <dependency>
- <groupId>com.google.protobuf</groupId>
- <artifactId>protobuf-java</artifactId>
- <version>2.4.0a</version>
- </dependency>
- <dependency>
- <groupId>org.apache.avro</groupId>
- <artifactId>avro</artifactId>
- <version>1.5.3</version>
- <exclusions>
- <exclusion>
- <groupId>org.mortbay.jetty</groupId>
- <artifactId>jetty</artifactId>
- </exclusion>
- <exclusion>
- <groupId>org.apache.ant</groupId>
- <artifactId>ant</artifactId>
- </exclusion>
- <exclusion>
- <groupId>org.jboss.netty</groupId>
- <artifactId>netty</artifactId>
- </exclusion>
- <exclusion>
- <groupId>org.apache.velocity</groupId>
- <artifactId>velocity</artifactId>
- </exclusion>
- <exclusion>
- <groupId>org.slf4j</groupId>
- <artifactId>slf4j-api</artifactId>
- </exclusion>
- <exclusion>
- <artifactId>paranamer-ant</artifactId>
- <groupId>com.thoughtworks.paranamer</groupId>
- </exclusion>
- </exclusions>
- </dependency>
-
-
- <dependency>
- <groupId>org.apache.hadoop</groupId>
- <artifactId>hadoop-common</artifactId>
- <version>0.23.0-SNAPSHOT</version>
- <scope>provided</scope>
- <exclusions>
- <exclusion>
- <groupId>commons-el</groupId>
- <artifactId>commons-el</artifactId>
- </exclusion>
- <exclusion>
- <groupId>tomcat</groupId>
- <artifactId>jasper-runtime</artifactId>
- </exclusion>
- <exclusion>
- <groupId>tomcat</groupId>
- <artifactId>jasper-compiler</artifactId>
- </exclusion>
- <exclusion>
- <groupId>org.mortbay.jetty</groupId>
- <artifactId>jsp-2.1-jetty</artifactId>
- </exclusion>
- <exclusion>
- <groupId>hsqldb</groupId>
- <artifactId>hsqldb</artifactId>
- </exclusion>
- </exclusions>
- </dependency>
-
- <dependency>
- <groupId>org.apache.hadoop</groupId>
- <artifactId>hadoop-yarn-api</artifactId>
- <version>0.23.0-SNAPSHOT</version>
- <scope>provided</scope>
- </dependency>
- <dependency>
- <groupId>org.apache.hadoop</groupId>
- <artifactId>hadoop-yarn-common</artifactId>
- <version>0.23.0-SNAPSHOT</version>
- <scope>provided</scope>
- </dependency>
-<!--
- <dependency>
- <groupId>org.apache.hadoop</groupId>
- <artifactId>hadoop-yarn-nodemanager</artifactId>
- <version>0.23.0-SNAPSHOT</version>
- <scope>provided</scope>
- </dependency>
- <dependency>
- <groupId>org.apache.hadoop</groupId>
- <artifactId>hadoop-yarn-resourcemanager</artifactId>
- <version>0.23.0-SNAPSHOT</version>
- <scope>provided</scope>
- </dependency>
--->
- <dependency>
- <groupId>org.apache.zookeeper</groupId>
- <artifactId>zookeeper</artifactId>
- </dependency>
- </dependencies>
-
- <build>
- <plugins>
- <plugin>
- <groupId>org.apache.maven.plugins</groupId>
- <artifactId>maven-dependency-plugin</artifactId>
- <executions>
- <execution>
- <id>copy-dependencies</id>
- <phase>package</phase>
- <goals>
- <goal>copy-dependencies</goal>
- </goals>
- <configuration>
- <outputDirectory>${basedir}/target/lib</outputDirectory>
- <overWriteReleases>false</overWriteReleases>
- <overWriteSnapshots>true</overWriteSnapshots>
- <excludeTransitive>true</excludeTransitive>
- </configuration>
- </execution>
- </executions>
- </plugin>
- </plugins>
- <finalName>hama-server-${project.version}</finalName>
- </build>
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.hama</groupId>
+ <artifactId>hama-core</artifactId>
+ <version>${project.version}</version>
+ <exclusions>
+ <exclusion>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-core</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hama</groupId>
+ <artifactId>hama-api</artifactId>
+ <version>${project.version}</version>
+ <exclusions>
+ <exclusion>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-core</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+
+ <dependency>
+ <groupId>commons-logging</groupId>
+ <artifactId>commons-logging</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>com.google.protobuf</groupId>
+ <artifactId>protobuf-java</artifactId>
+ <version>2.4.0a</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.avro</groupId>
+ <artifactId>avro</artifactId>
+ <version>1.5.3</version>
+ </dependency>
+
+
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-common</artifactId>
+ <version>0.23.0-SNAPSHOT</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-yarn-api</artifactId>
+ <version>0.23.0-SNAPSHOT</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-yarn-common</artifactId>
+ <version>0.23.0-SNAPSHOT</version>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.zookeeper</groupId>
+ <artifactId>zookeeper</artifactId>
+ </dependency>
+ </dependencies>
+
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-dependency-plugin</artifactId>
+ <executions>
+ <execution>
+ <id>copy-dependencies</id>
+ <phase>package</phase>
+ <goals>
+ <goal>copy-dependencies</goal>
+ </goals>
+ <configuration>
+ <outputDirectory>${basedir}/target/lib</outputDirectory>
+ <overWriteReleases>false</overWriteReleases>
+ <overWriteSnapshots>true</overWriteSnapshots>
+ <excludeTransitive>true</excludeTransitive>
+ </configuration>
+ </execution>
+ </executions>
+ </plugin>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-shade-plugin</artifactId>
+ <version>1.4</version>
+ <executions>
+ <execution>
+ <phase>package</phase>
+ <goals>
+ <goal>shade</goal>
+ </goals>
+ </execution>
+ </executions>
+ </plugin>
+ </plugins>
+ <finalName>hama-server-${project.version}</finalName>
+ </build>
</project>
Modified: incubator/hama/branches/HamaV2/server/src/main/java/org/apache/hama/bsp/BSPTaskLauncher.java
URL: http://svn.apache.org/viewvc/incubator/hama/branches/HamaV2/server/src/main/java/org/apache/hama/bsp/BSPTaskLauncher.java?rev=1182058&r1=1182057&r2=1182058&view=diff
==============================================================================
--- incubator/hama/branches/HamaV2/server/src/main/java/org/apache/hama/bsp/BSPTaskLauncher.java (original)
+++ incubator/hama/branches/HamaV2/server/src/main/java/org/apache/hama/bsp/BSPTaskLauncher.java Tue Oct 11 20:00:46 2011
@@ -115,7 +115,7 @@ public class BSPTaskLauncher implements
ctx.setCommands(Arrays.asList("${JAVA_HOME}" + "/bin/java -cp './package/*' ",
BSPTaskLauncher.class.getCanonicalName(), jobId.getJtIdentifier(), id
- + "", this.jobFile.makeQualified(FileSystem.get(conf)).toString(),
+ + "", this.jobFile.makeQualified(fs.getUri(), fs.getWorkingDirectory()).toString(),
" 1>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stdout", " 2>"
+ ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stderr"));
ctx.setLocalResources(Collections.singletonMap("package", packageResource));
Modified: incubator/hama/branches/HamaV2/server/src/main/java/org/apache/hama/bsp/JobImpl.java
URL: http://svn.apache.org/viewvc/incubator/hama/branches/HamaV2/server/src/main/java/org/apache/hama/bsp/JobImpl.java?rev=1182058&r1=1182057&r2=1182058&view=diff
==============================================================================
--- incubator/hama/branches/HamaV2/server/src/main/java/org/apache/hama/bsp/JobImpl.java (original)
+++ incubator/hama/branches/HamaV2/server/src/main/java/org/apache/hama/bsp/JobImpl.java Tue Oct 11 20:00:46 2011
@@ -17,6 +17,7 @@
*/
package org.apache.hama.bsp;
+import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
@@ -42,6 +43,7 @@ import org.apache.hadoop.yarn.api.record
import org.apache.hadoop.yarn.api.records.ResourceRequest;
import org.apache.hadoop.yarn.exceptions.YarnRemoteException;
import org.apache.hadoop.yarn.ipc.YarnRPC;
+import org.apache.hadoop.yarn.util.BuilderUtils;
import org.apache.hadoop.yarn.util.Records;
import org.apache.hama.bsp.BSPTaskLauncher.BSPTaskStatus;
@@ -74,6 +76,8 @@ public class JobImpl implements Job {
private ExecutorCompletionService<BSPTaskStatus> completionService = new ExecutorCompletionService<BSPTaskStatus>(
threadPool);
private Map<Integer, BSPTaskLauncher> launchers = new HashMap<Integer, BSPTaskLauncher>();
+ private int lastResponseID = 0;
+ private Resource availableResources;
public JobImpl(ApplicationAttemptId appAttemptId,
Configuration jobConfiguration, YarnRPC yarnRPC, AMRMProtocol amrmRPC,
@@ -131,36 +135,20 @@ public class JobImpl implements Job {
@Override
public JobState startJob() throws YarnRemoteException, InterruptedException,
ExecutionException {
-
- ResourceRequest request = createBSPTaskRequest(getTotalBSPTasks(),
- taskMemoryInMb, priority);
-
- AllocateRequest req = Records.newRecord(AllocateRequest.class);
- // response id zero because this is our initial allocation
- req.setResponseId(0);
- // set ApplicationAttemptId
- req.setApplicationAttemptId(appAttemptId);
- // add our task request
- req.addAsk(request);
- // always an empty list
- req.addAllReleases(releasedContainers);
- // we don't have a real progress, so it is always zero
- req.setProgress(0);
+
+ AllocateRequest req = BuilderUtils.newAllocateRequest(
+ appAttemptId, lastResponseID, 0.0f,createBSPTaskRequest(getTotalBSPTasks(),
+ taskMemoryInMb, priority), releasedContainers);
AllocateResponse allocateResponse = resourceManager.allocate(req);
AMResponse amResponse = allocateResponse.getAMResponse();
LOG.info("Got response! ID: " + amResponse.getResponseId()
+ " with num of containers: "
- + amResponse.getAllocatedContainers().size());
- // somehow the response id is always incremented
- if (amResponse.getResponseId() == 1) {
- this.allocatedContainers = amResponse.getAllocatedContainers();
- } else {
- LOG.error("Response IDs somehow did not match. Got: "
- + amResponse.getResponseId() + " where it should be 1 (one).");
- state = JobState.FAILED;
- return state;
- }
+ + amResponse.getAllocatedContainers().size() + " and following resources: " + amResponse.getAvailableResources().getMemory()+ "mb");
+ this.lastResponseID = amResponse.getResponseId();
+
+ this.availableResources = amResponse.getAvailableResources();
+ this.allocatedContainers = amResponse.getAllocatedContainers();
int launchedBSPTasks = 0;
@@ -220,32 +208,37 @@ public class JobImpl implements Job {
threadPool.shutdownNow();
}
- private ResourceRequest createBSPTaskRequest(int numBSPTasks, int memoryInMb,
- int priority) {
- // Resource Request
- ResourceRequest rsrcRequest = Records.newRecord(ResourceRequest.class);
-
- // setup requirements for hosts
- // whether a particular rack/host is needed
- // useful for applications that are sensitive
- // to data locality
- rsrcRequest.setHostName("*");
-
- // set the priority for the request
- Priority pri = Records.newRecord(Priority.class);
- pri.setPriority(priority);
- rsrcRequest.setPriority(pri);
-
- // Set up resource type requirements
- // For now, only memory is supported so we set memory requirements
- Resource capability = Records.newRecord(Resource.class);
- capability.setMemory(memoryInMb);
- rsrcRequest.setCapability(capability);
-
- // set no. of containers needed
- // matching the specifications
- rsrcRequest.setNumContainers(numBSPTasks);
- return rsrcRequest;
+ private List<ResourceRequest> createBSPTaskRequest(int numTasks,
+ int memoryInMb, int priority) {
+
+ List<ResourceRequest> reqList = new ArrayList<ResourceRequest>(numTasks);
+ for (int i = 0; i < numTasks; i++) {
+ // Resource Request
+ ResourceRequest rsrcRequest = Records.newRecord(ResourceRequest.class);
+
+ // setup requirements for hosts
+ // whether a particular rack/host is needed
+ // useful for applications that are sensitive
+ // to data locality
+ rsrcRequest.setHostName("*");
+
+ // set the priority for the request
+ Priority pri = Records.newRecord(Priority.class);
+ pri.setPriority(priority);
+ rsrcRequest.setPriority(pri);
+
+ // Set up resource type requirements
+ // For now, only memory is supported so we set memory requirements
+ Resource capability = Records.newRecord(Resource.class);
+ capability.setMemory(memoryInMb);
+ rsrcRequest.setCapability(capability);
+
+ // set no. of containers needed
+ // matching the specifications
+ rsrcRequest.setNumContainers(numBSPTasks);
+ reqList.add(rsrcRequest);
+ }
+ return reqList;
}
@Override