You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by ro...@apache.org on 2014/02/17 00:42:19 UTC
svn commit: r1568862 - in
/pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez:
TezJobControlCompiler.java TezOperPlan.java TezPlanContainer.java
TezResourceManager.java
Author: rohini
Date: Sun Feb 16 23:42:19 2014
New Revision: 1568862
URL: http://svn.apache.org/r1568862
Log:
PIG-3629: Implement STREAM in Tez - Fix issues with localization of resources (abain via rohini)
Modified:
pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezJobControlCompiler.java
pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezOperPlan.java
pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezPlanContainer.java
pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezResourceManager.java
Modified: pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezJobControlCompiler.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezJobControlCompiler.java?rev=1568862&r1=1568861&r2=1568862&view=diff
==============================================================================
--- pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezJobControlCompiler.java (original)
+++ pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezJobControlCompiler.java Sun Feb 16 23:42:19 2014
@@ -97,7 +97,7 @@ public class TezJobControlCompiler {
try {
Map<String, LocalResource> localResources = new HashMap<String, LocalResource>();
localResources.putAll(planContainer.getLocalResources());
- localResources.putAll(tezPlan.getLocalExtraResources());
+ localResources.putAll(tezPlan.getExtraResources());
TezDAG tezDag = buildDAG(tezPlan, localResources);
return new TezJob(tezConf, tezDag, localResources);
} catch (Exception e) {
Modified: pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezOperPlan.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezOperPlan.java?rev=1568862&r1=1568861&r2=1568862&view=diff
==============================================================================
--- pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezOperPlan.java (original)
+++ pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezOperPlan.java Sun Feb 16 23:42:19 2014
@@ -21,15 +21,14 @@ import java.io.ByteArrayOutputStream;
import java.io.File;
import java.io.IOException;
import java.io.PrintStream;
+import java.net.URI;
+import java.net.URL;
import java.util.HashMap;
-import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.yarn.api.records.LocalResource;
-import org.apache.hadoop.yarn.api.records.URL;
-import org.apache.hadoop.yarn.util.ConverterUtils;
import org.apache.pig.impl.plan.OperatorPlan;
import org.apache.pig.impl.plan.VisitorException;
@@ -42,7 +41,7 @@ public class TezOperPlan extends Operato
private static final long serialVersionUID = 1L;
- private Map<URL, Path> extraResources = new HashMap<URL, Path>();
+ private Map<String, Path> extraResources = new HashMap<String, Path>();
public TezOperPlan() {
}
@@ -61,73 +60,65 @@ public class TezOperPlan extends Operato
return baos.toString();
}
- // Add extra plan-specific local resources to HDFS
+ // Add extra plan-specific local resources from the source FS
public void addExtraResource(URL url) throws IOException {
- if (!extraResources.containsKey(url)) {
- Path pathInHDFS = TezResourceManager.addLocalResource(url);
- extraResources.put(url, pathInHDFS);
+ Path resourcePath = new Path(url.getFile());
+ String resourceName = resourcePath.getName();
+
+ if (!extraResources.containsKey(resourceName)) {
+ Path remoteFsPath = TezResourceManager.addTezResource(url);
+ extraResources.put(resourceName, remoteFsPath);
}
}
- // Add extra plan-specific local resources already present in HDFS
- public void addExtraResource(URL url, Path pathInHDFS) throws IOException {
- if (!extraResources.containsKey(url)) {
- TezResourceManager.addLocalResource(url, pathInHDFS);
- extraResources.put(url, pathInHDFS);
+ // Add extra plan-specific local resources already present in the remote FS
+ public void addExtraResource(String resourceName, Path remoteFsPath) throws IOException {
+ if (!extraResources.containsKey(resourceName)) {
+ TezResourceManager.addTezResource(resourceName, remoteFsPath);
+ extraResources.put(resourceName, remoteFsPath);
}
}
// Get the plan-specific resources
- public Map<String, LocalResource> getLocalExtraResources() throws Exception {
+ public Map<String, LocalResource> getExtraResources() throws Exception {
TezPOStreamVisitor streamVisitor = new TezPOStreamVisitor(this);
streamVisitor.visit();
// In a STREAM add the files specified in SHIP and CACHE
// as local resources for the plan.
- addFileResources(streamVisitor.getShipFiles());
- addHdfsResources(streamVisitor.getCacheFiles());
+ addShipResources(streamVisitor.getShipFiles());
+ addCacheResources(streamVisitor.getCacheFiles());
- Set<URL> resourceUrls = extraResources.keySet();
- return TezResourceManager.getTezResources(resourceUrls);
+ return TezResourceManager.getTezResources(extraResources.keySet());
}
- private void addFileResources(Set<String> fileNames) throws IOException {
- Set<URL> fileUrls = new HashSet<URL>();
-
+ // In the statement "SHIP('/home/foo')" we'll map the resource name foo to
+ // the file that has been copied to the staging directory in the remote FS.
+ private void addShipResources(Set<String> fileNames) throws IOException {
for (String fileName : fileNames) {
fileName = fileName.trim();
if (fileName.length() > 0) {
- fileUrls.add(ConverterUtils.getYarnUrlFromURI(new File(fileName).toURI()));
+ URL url = new File(fileName).toURI().toURL();
+ addExtraResource(url);
}
}
-
- for (URL url : fileUrls) {
- addExtraResource(url);
- }
}
// In the statement "CACHE('/input/data.txt#alias.txt')" we'll map the
- // URL 'hdfs:/input/data.txt#alias.txt' to the actual resource path in
- // HDFS at '/input/data.txt'.
- private void addHdfsResources(Set<String> fileNames) throws Exception {
- Map<URL, Path> resourceMap = new HashMap<URL, Path>();
-
+ // resource name alias.txt to the actual resource path in the remote FS
+ // at '/input/data.txt'.
+ private void addCacheResources(Set<String> fileNames) throws Exception {
for (String fileName : fileNames) {
fileName = fileName.trim();
if (fileName.length() > 0) {
- URL urlOnHDFS = ConverterUtils.getYarnUrlFromURI(new File(fileName).toURI());
- urlOnHDFS.setScheme("hdfs");
+ URI resourceURI = new URI(fileName);
+ String fragment = resourceURI.getFragment();
- // Get the path on HDFS without a fragment at the end
- int aliasIndex = fileName.indexOf("#");
- String path = (aliasIndex == -1) ? fileName : fileName.substring(0, aliasIndex);
- Path pathOnHDFS = new Path(path);
- resourceMap.put(urlOnHDFS, pathOnHDFS);
- }
- }
+ Path remoteFsPath = new Path(resourceURI.getPath());
+ String resourceName = (fragment != null && fragment.length() > 0) ? fragment : remoteFsPath.getName();
- for (Map.Entry<URL, Path> entry : resourceMap.entrySet()) {
- addExtraResource(entry.getKey(), entry.getValue());
+ addExtraResource(resourceName, remoteFsPath);
+ }
}
}
Modified: pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezPlanContainer.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezPlanContainer.java?rev=1568862&r1=1568861&r2=1568862&view=diff
==============================================================================
--- pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezPlanContainer.java (original)
+++ pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezPlanContainer.java Sun Feb 16 23:42:19 2014
@@ -20,6 +20,7 @@ package org.apache.pig.backend.hadoop.ex
import java.io.ByteArrayOutputStream;
import java.io.File;
import java.io.PrintStream;
+import java.net.URL;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
@@ -27,8 +28,6 @@ import java.util.Map;
import java.util.Set;
import org.apache.hadoop.yarn.api.records.LocalResource;
-import org.apache.hadoop.yarn.api.records.URL;
-import org.apache.hadoop.yarn.util.ConverterUtils;
import org.apache.pig.impl.PigContext;
import org.apache.pig.impl.plan.NodeIdGenerator;
import org.apache.pig.impl.plan.OperatorKey;
@@ -55,12 +54,12 @@ public class TezPlanContainer extends Op
// In MR Pig the extra jars and script jars get put in Distributed Cache, but
// in Tez we'll add them as local resources.
- for (java.net.URL jarUrl : pigContext.extraJars) {
- jarLists.add(ConverterUtils.getYarnUrlFromURI(jarUrl.toURI()));
+ for (URL jarUrl : pigContext.extraJars) {
+ jarLists.add(jarUrl);
}
for (String jarFile : pigContext.scriptJars) {
- jarLists.add(ConverterUtils.getYarnUrlFromURI(new File(jarFile).toURI()));
+ jarLists.add(new File(jarFile).toURI().toURL());
}
// Script files for non-Java UDF's are added to the Job.jar by the JarManager class,
@@ -68,7 +67,7 @@ public class TezPlanContainer extends Op
// to the GroovyScriptEngine (see JarManager.java for comments).
for (Map.Entry<String, File> scriptFile : pigContext.getScriptFiles().entrySet()) {
if (scriptFile.getKey().endsWith(".groovy")) {
- jarLists.add(ConverterUtils.getYarnUrlFromURI(scriptFile.getValue().toURI()));
+ jarLists.add(scriptFile.getValue().toURI().toURL());
}
}
@@ -87,21 +86,20 @@ public class TezPlanContainer extends Op
// avoid NPE.
continue;
}
-
- URL jarUrl = ConverterUtils.getYarnUrlFromURI(new File(jarName).toURI());
+ URL jarUrl = new File(jarName).toURI().toURL();
jarLists.add(jarUrl);
-
- // Streaming UDF's are not working under Hadoop 2 (PIG-3478), so don't bother adding
- // resources for them yet.
- // if ("StreamingUDF".equals(clazz.getSimpleName())) {
- // for (String fileName : StreamingUDF.getResourcesForJar()) {
- // jarLists.add(ConverterUtils.getYarnUrlFromURI(new File(fileName).toURI()));
- // }
- // }
}
}
- return TezResourceManager.getTezResources(jarLists);
+ // Streaming UDF's are not working under Hadoop 2 (PIG-3478), so don't bother adding
+ // resources for them yet.
+ // if ("StreamingUDF".equals(clazz.getSimpleName())) {
+ // for (String fileName : StreamingUDF.getResourcesForJar()) {
+ // jarLists.add(new File(fileName).toURI().toURL());
+ // }
+ // }
+
+ return TezResourceManager.addTezResources(jarLists);
}
public TezOperPlan getNextPlan(List<TezOperPlan> processedPlans) {
@@ -176,7 +174,7 @@ public class TezPlanContainer extends Op
split(planNode);
}
}
-
+
@Override
public String toString() {
ByteArrayOutputStream baos = new ByteArrayOutputStream();
Modified: pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezResourceManager.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezResourceManager.java?rev=1568862&r1=1568861&r2=1568862&view=diff
==============================================================================
--- pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezResourceManager.java (original)
+++ pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezResourceManager.java Sun Feb 16 23:42:19 2014
@@ -20,7 +20,9 @@ package org.apache.pig.backend.hadoop.ex
import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
+import java.net.URL;
import java.util.HashMap;
+import java.util.HashSet;
import java.util.Map;
import java.util.Set;
@@ -31,12 +33,9 @@ import org.apache.hadoop.fs.Path;
import org.apache.hadoop.yarn.api.records.LocalResource;
import org.apache.hadoop.yarn.api.records.LocalResourceType;
import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
-import org.apache.hadoop.yarn.api.records.URL;
import org.apache.hadoop.yarn.util.ConverterUtils;
-import org.apache.pig.PigException;
import org.apache.pig.impl.PigContext;
import org.apache.pig.impl.util.JarManager;
-import org.apache.pig.impl.util.Utils;
public class TezResourceManager {
private static Path stagingDir;
@@ -44,8 +43,7 @@ public class TezResourceManager {
private static Configuration conf;
private static URL bootStrapJar;
private static FileSystem remoteFs;
-
- public static Map<URL, Path> resources = new HashMap<URL, Path>();
+ public static Map<String, Path> resources = new HashMap<String, Path>();
public static URL getBootStrapJar() {
return bootStrapJar;
@@ -56,37 +54,45 @@ public class TezResourceManager {
TezResourceManager.pigContext = pigContext;
TezResourceManager.conf = conf;
String jar = JarManager.findContainingJar(org.apache.pig.Main.class);
- TezResourceManager.bootStrapJar = ConverterUtils.getYarnUrlFromURI(new File(jar).toURI());
+ TezResourceManager.bootStrapJar = new File(jar).toURI().toURL();
remoteFs = FileSystem.get(conf);
addBootStrapJar();
}
- public static Path addLocalResource(URL url) throws IOException {
- if (!"file".equals(url.getScheme())) {
- throw new PigException("This method should only be called with file:// resources");
- }
+ // Add files from the source FS as local resources. The resource name will
+ // be the same as the file name.
+ public static Path addTezResource(URL url) throws IOException {
+ Path resourcePath = new Path(url.getFile());
+ String resourceName = resourcePath.getName();
- if (resources.containsKey(url)) {
- return resources.get(url);
+ if (resources.containsKey(resourceName)) {
+ return resources.get(resourceName);
}
- java.net.URL javaUrl = new java.net.URL(url.getScheme(), url.getHost(), url.getFile());
- Path pathInHDFS = Utils.shipToHDFS(pigContext, conf, javaUrl);
- resources.put(url, pathInHDFS);
- return pathInHDFS;
+ // Ship the resource to the staging directory on the remote FS
+ Path remoteFsPath = remoteFs.makeQualified(new Path(stagingDir, resourceName));
+ remoteFs.copyFromLocalFile(resourcePath, remoteFsPath);
+ resources.put(resourceName, remoteFsPath);
+ return remoteFsPath;
}
- // Add files already present in HDFS as local resources. Allow the URL
- // to be different from the path to support resource aliasing in a CACHE
- // statement. See TezOperPlan::addHdfsResources for an example.
- public static void addLocalResource(URL url, Path pathInHDFS) throws IOException {
- if (!"hdfs".equals(url.getScheme())) {
- throw new PigException("This method should only be called with hdfs:// resources");
+ // Add files already present in the remote FS as local resources. Allow the
+ // resource name to be different from the file name to to support resource
+ // aliasing in a CACHE statement (and to allow the same file to be aliased
+ // with multiple resource names).
+ public static void addTezResource(String resourceName, Path remoteFsPath) throws IOException {
+ if (!resources.containsKey(resourceName)) {
+ resources.put(resourceName, remoteFsPath);
}
+ }
- if (!resources.containsKey(url)) {
- resources.put(url, pathInHDFS);
+ public static Map<String, LocalResource> addTezResources(Set<URL> resources) throws Exception {
+ Set<String> resourceNames = new HashSet<String>();
+ for (URL url : resources) {
+ addTezResource(url);
+ resourceNames.add(new Path(url.getFile()).getName());
}
+ return getTezResources(resourceNames);
}
public static void addBootStrapJar() throws IOException {
@@ -100,51 +106,30 @@ public class TezResourceManager {
FileOutputStream fos = new FileOutputStream(jobJar);
JarManager.createBootStrapJar(fos, pigContext);
- // Ship the job.jar to the staging directory on hdfs
+ // Ship the job.jar to the staging directory on the remote FS
Path remoteJarPath = remoteFs.makeQualified(new Path(stagingDir, new Path(bootStrapJar.getFile()).getName()));
remoteFs.copyFromLocalFile(new Path(jobJar.getAbsolutePath()), remoteJarPath);
- resources.put(bootStrapJar, remoteJarPath);
+ resources.put(jobJar.getName(), remoteJarPath);
}
- public static Path get(URL url) {
- return resources.get(url);
- }
-
- public static Map<String, LocalResource> getTezResources(Set<URL> urls) throws Exception {
+ public static Map<String, LocalResource> getTezResources(Set<String> resourceNames) throws Exception {
Map<String, LocalResource> tezResources = new HashMap<String, LocalResource>();
- for (URL url : urls) {
- if (!resources.containsKey(url)) {
- addLocalResource(url);
- }
-
- // Extract the resource name from the URL, which will be
- // symlinked in the container's working directory.
- String resourceName = getTezResourceName(url);
- Path resourcePath = resources.get(url);
-
+ for (String resourceName : resourceNames) {
+ // The resource name will be symlinked to the resource path in the
+ // container's working directory.
+ Path resourcePath = resources.get(resourceName);
FileStatus fstat = remoteFs.getFileStatus(resourcePath);
+
LocalResource tezResource = LocalResource.newInstance(
ConverterUtils.getYarnUrlFromPath(fstat.getPath()),
LocalResourceType.FILE,
LocalResourceVisibility.APPLICATION,
fstat.getLen(),
fstat.getModificationTime());
+
tezResources.put(resourceName, tezResource);
}
return tezResources;
}
-
- // If the URL has a fragment at the end, use that as the resource name.
- // This will allow for resource aliasing in a CACHE statement.
- private static String getTezResourceName(URL resourceUrl) throws Exception {
- String resourcePath = resourceUrl.getFile();
- int aliasIndex = resourcePath.indexOf("#");
-
- if (aliasIndex != -1 && aliasIndex < (resourcePath.length() - 1)) {
- return resourcePath.substring(aliasIndex + 1);
- }
-
- return resourcePath.substring(resourcePath.lastIndexOf("/") + 1);
- }
}