You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by ro...@apache.org on 2014/02/17 00:42:19 UTC

svn commit: r1568862 - in /pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez: TezJobControlCompiler.java TezOperPlan.java TezPlanContainer.java TezResourceManager.java

Author: rohini
Date: Sun Feb 16 23:42:19 2014
New Revision: 1568862

URL: http://svn.apache.org/r1568862
Log:
PIG-3629: Implement STREAM in Tez - Fix issues with localization of resources (abain via rohini)

Modified:
    pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezJobControlCompiler.java
    pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezOperPlan.java
    pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezPlanContainer.java
    pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezResourceManager.java

Modified: pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezJobControlCompiler.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezJobControlCompiler.java?rev=1568862&r1=1568861&r2=1568862&view=diff
==============================================================================
--- pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezJobControlCompiler.java (original)
+++ pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezJobControlCompiler.java Sun Feb 16 23:42:19 2014
@@ -97,7 +97,7 @@ public class TezJobControlCompiler {
         try {
             Map<String, LocalResource> localResources = new HashMap<String, LocalResource>();
             localResources.putAll(planContainer.getLocalResources());
-            localResources.putAll(tezPlan.getLocalExtraResources());
+            localResources.putAll(tezPlan.getExtraResources());
             TezDAG tezDag = buildDAG(tezPlan, localResources);
             return new TezJob(tezConf, tezDag, localResources);
         } catch (Exception e) {

Modified: pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezOperPlan.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezOperPlan.java?rev=1568862&r1=1568861&r2=1568862&view=diff
==============================================================================
--- pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezOperPlan.java (original)
+++ pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezOperPlan.java Sun Feb 16 23:42:19 2014
@@ -21,15 +21,14 @@ import java.io.ByteArrayOutputStream;
 import java.io.File;
 import java.io.IOException;
 import java.io.PrintStream;
+import java.net.URI;
+import java.net.URL;
 import java.util.HashMap;
-import java.util.HashSet;
 import java.util.Map;
 import java.util.Set;
 
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.yarn.api.records.LocalResource;
-import org.apache.hadoop.yarn.api.records.URL;
-import org.apache.hadoop.yarn.util.ConverterUtils;
 import org.apache.pig.impl.plan.OperatorPlan;
 import org.apache.pig.impl.plan.VisitorException;
 
@@ -42,7 +41,7 @@ public class TezOperPlan extends Operato
 
     private static final long serialVersionUID = 1L;
 
-    private Map<URL, Path> extraResources = new HashMap<URL, Path>();
+    private Map<String, Path> extraResources = new HashMap<String, Path>();
 
     public TezOperPlan() {
     }
@@ -61,73 +60,65 @@ public class TezOperPlan extends Operato
         return baos.toString();
     }
 
-    // Add extra plan-specific local resources to HDFS
+    // Add extra plan-specific local resources from the source FS
     public void addExtraResource(URL url) throws IOException {
-        if (!extraResources.containsKey(url)) {
-            Path pathInHDFS = TezResourceManager.addLocalResource(url);
-            extraResources.put(url, pathInHDFS);
+        Path resourcePath = new Path(url.getFile());
+        String resourceName = resourcePath.getName();
+
+        if (!extraResources.containsKey(resourceName)) {
+            Path remoteFsPath = TezResourceManager.addTezResource(url);
+            extraResources.put(resourceName, remoteFsPath);
         }
     }
 
-    // Add extra plan-specific local resources already present in HDFS
-    public void addExtraResource(URL url, Path pathInHDFS) throws IOException {
-        if (!extraResources.containsKey(url)) {
-            TezResourceManager.addLocalResource(url, pathInHDFS);
-            extraResources.put(url, pathInHDFS);
+    // Add extra plan-specific local resources already present in the remote FS
+    public void addExtraResource(String resourceName, Path remoteFsPath) throws IOException {
+        if (!extraResources.containsKey(resourceName)) {
+            TezResourceManager.addTezResource(resourceName, remoteFsPath);
+            extraResources.put(resourceName, remoteFsPath);
         }
     }
 
     // Get the plan-specific resources
-    public Map<String, LocalResource> getLocalExtraResources() throws Exception {
+    public Map<String, LocalResource> getExtraResources() throws Exception {
         TezPOStreamVisitor streamVisitor = new TezPOStreamVisitor(this);
         streamVisitor.visit();
 
         // In a STREAM add the files specified in SHIP and CACHE
         // as local resources for the plan.
-        addFileResources(streamVisitor.getShipFiles());
-        addHdfsResources(streamVisitor.getCacheFiles());
+        addShipResources(streamVisitor.getShipFiles());
+        addCacheResources(streamVisitor.getCacheFiles());
 
-        Set<URL> resourceUrls = extraResources.keySet();
-        return TezResourceManager.getTezResources(resourceUrls);
+        return TezResourceManager.getTezResources(extraResources.keySet());
     }
 
-    private void addFileResources(Set<String> fileNames) throws IOException {
-        Set<URL> fileUrls = new HashSet<URL>();
-
+    // In the statement "SHIP('/home/foo')" we'll map the resource name foo to
+    // the file that has been copied to the staging directory in the remote FS.
+    private void addShipResources(Set<String> fileNames) throws IOException {
         for (String fileName : fileNames) {
             fileName = fileName.trim();
             if (fileName.length() > 0) {
-                fileUrls.add(ConverterUtils.getYarnUrlFromURI(new File(fileName).toURI()));
+                URL url = new File(fileName).toURI().toURL();
+                addExtraResource(url);
             }
         }
-
-        for (URL url : fileUrls) {
-            addExtraResource(url);
-        }
     }
 
     // In the statement "CACHE('/input/data.txt#alias.txt')" we'll map the
-    // URL 'hdfs:/input/data.txt#alias.txt' to the actual resource path in
-    // HDFS at '/input/data.txt'.
-    private void addHdfsResources(Set<String> fileNames) throws Exception {
-        Map<URL, Path> resourceMap = new HashMap<URL, Path>();
-
+    // resource name alias.txt to the actual resource path in the remote FS
+    // at '/input/data.txt'.
+    private void addCacheResources(Set<String> fileNames) throws Exception {
         for (String fileName : fileNames) {
             fileName = fileName.trim();
             if (fileName.length() > 0) {
-                URL urlOnHDFS = ConverterUtils.getYarnUrlFromURI(new File(fileName).toURI());
-                urlOnHDFS.setScheme("hdfs");
+                URI resourceURI = new URI(fileName);
+                String fragment = resourceURI.getFragment();
 
-                // Get the path on HDFS without a fragment at the end
-                int aliasIndex = fileName.indexOf("#");
-                String path = (aliasIndex == -1) ? fileName : fileName.substring(0, aliasIndex);
-                Path pathOnHDFS = new Path(path);
-                resourceMap.put(urlOnHDFS, pathOnHDFS);
-            }
-        }
+                Path remoteFsPath = new Path(resourceURI.getPath());
+                String resourceName = (fragment != null && fragment.length() > 0) ? fragment : remoteFsPath.getName();
 
-        for (Map.Entry<URL, Path> entry : resourceMap.entrySet()) {
-            addExtraResource(entry.getKey(), entry.getValue());
+                addExtraResource(resourceName, remoteFsPath);
+            }
         }
     }
 

Modified: pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezPlanContainer.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezPlanContainer.java?rev=1568862&r1=1568861&r2=1568862&view=diff
==============================================================================
--- pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezPlanContainer.java (original)
+++ pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezPlanContainer.java Sun Feb 16 23:42:19 2014
@@ -20,6 +20,7 @@ package org.apache.pig.backend.hadoop.ex
 import java.io.ByteArrayOutputStream;
 import java.io.File;
 import java.io.PrintStream;
+import java.net.URL;
 import java.util.ArrayList;
 import java.util.HashSet;
 import java.util.List;
@@ -27,8 +28,6 @@ import java.util.Map;
 import java.util.Set;
 
 import org.apache.hadoop.yarn.api.records.LocalResource;
-import org.apache.hadoop.yarn.api.records.URL;
-import org.apache.hadoop.yarn.util.ConverterUtils;
 import org.apache.pig.impl.PigContext;
 import org.apache.pig.impl.plan.NodeIdGenerator;
 import org.apache.pig.impl.plan.OperatorKey;
@@ -55,12 +54,12 @@ public class TezPlanContainer extends Op
 
         // In MR Pig the extra jars and script jars get put in Distributed Cache, but
         // in Tez we'll add them as local resources.
-        for (java.net.URL jarUrl : pigContext.extraJars) {
-            jarLists.add(ConverterUtils.getYarnUrlFromURI(jarUrl.toURI()));
+        for (URL jarUrl : pigContext.extraJars) {
+            jarLists.add(jarUrl);
         }
 
         for (String jarFile : pigContext.scriptJars) {
-            jarLists.add(ConverterUtils.getYarnUrlFromURI(new File(jarFile).toURI()));
+            jarLists.add(new File(jarFile).toURI().toURL());
         }
 
         // Script files for non-Java UDF's are added to the Job.jar by the JarManager class,
@@ -68,7 +67,7 @@ public class TezPlanContainer extends Op
         // to the GroovyScriptEngine (see JarManager.java for comments).
         for (Map.Entry<String, File> scriptFile : pigContext.getScriptFiles().entrySet()) {
             if (scriptFile.getKey().endsWith(".groovy")) {
-                jarLists.add(ConverterUtils.getYarnUrlFromURI(scriptFile.getValue().toURI()));
+                jarLists.add(scriptFile.getValue().toURI().toURL());
             }
         }
 
@@ -87,21 +86,20 @@ public class TezPlanContainer extends Op
                     // avoid NPE.
                     continue;
                 }
-
-                URL jarUrl = ConverterUtils.getYarnUrlFromURI(new File(jarName).toURI());
+                URL jarUrl = new File(jarName).toURI().toURL();
                 jarLists.add(jarUrl);
-
-                // Streaming UDF's are not working under Hadoop 2 (PIG-3478), so don't bother adding
-                // resources for them yet.
-                // if ("StreamingUDF".equals(clazz.getSimpleName())) {
-                //     for (String fileName : StreamingUDF.getResourcesForJar()) {
-                //         jarLists.add(ConverterUtils.getYarnUrlFromURI(new File(fileName).toURI()));
-                //     }
-                // }
             }
         }
 
-        return TezResourceManager.getTezResources(jarLists);
+        // Streaming UDF's are not working under Hadoop 2 (PIG-3478), so don't bother adding
+        // resources for them yet.
+        // if ("StreamingUDF".equals(clazz.getSimpleName())) {
+        //     for (String fileName : StreamingUDF.getResourcesForJar()) {
+        //         jarLists.add(new File(fileName).toURI().toURL());
+        //     }
+        // }
+
+        return TezResourceManager.addTezResources(jarLists);
     }
 
     public TezOperPlan getNextPlan(List<TezOperPlan> processedPlans) {
@@ -176,7 +174,7 @@ public class TezPlanContainer extends Op
             split(planNode);
         }
     }
-    
+
     @Override
     public String toString() {
         ByteArrayOutputStream baos = new ByteArrayOutputStream();

Modified: pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezResourceManager.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezResourceManager.java?rev=1568862&r1=1568861&r2=1568862&view=diff
==============================================================================
--- pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezResourceManager.java (original)
+++ pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezResourceManager.java Sun Feb 16 23:42:19 2014
@@ -20,7 +20,9 @@ package org.apache.pig.backend.hadoop.ex
 import java.io.File;
 import java.io.FileOutputStream;
 import java.io.IOException;
+import java.net.URL;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.Map;
 import java.util.Set;
 
@@ -31,12 +33,9 @@ import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.yarn.api.records.LocalResource;
 import org.apache.hadoop.yarn.api.records.LocalResourceType;
 import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
-import org.apache.hadoop.yarn.api.records.URL;
 import org.apache.hadoop.yarn.util.ConverterUtils;
-import org.apache.pig.PigException;
 import org.apache.pig.impl.PigContext;
 import org.apache.pig.impl.util.JarManager;
-import org.apache.pig.impl.util.Utils;
 
 public class TezResourceManager {
     private static Path stagingDir;
@@ -44,8 +43,7 @@ public class TezResourceManager {
     private static Configuration conf;
     private static URL bootStrapJar;
     private static FileSystem remoteFs;
-
-    public static Map<URL, Path> resources = new HashMap<URL, Path>();
+    public static Map<String, Path> resources = new HashMap<String, Path>();
 
     public static URL getBootStrapJar() {
         return bootStrapJar;
@@ -56,37 +54,45 @@ public class TezResourceManager {
         TezResourceManager.pigContext = pigContext;
         TezResourceManager.conf = conf;
         String jar = JarManager.findContainingJar(org.apache.pig.Main.class);
-        TezResourceManager.bootStrapJar = ConverterUtils.getYarnUrlFromURI(new File(jar).toURI());
+        TezResourceManager.bootStrapJar = new File(jar).toURI().toURL();
         remoteFs = FileSystem.get(conf);
         addBootStrapJar();
     }
 
-    public static Path addLocalResource(URL url) throws IOException {
-        if (!"file".equals(url.getScheme())) {
-            throw new PigException("This method should only be called with file:// resources");
-        }
+    // Add files from the source FS as local resources. The resource name will
+    // be the same as the file name.
+    public static Path addTezResource(URL url) throws IOException {
+        Path resourcePath = new Path(url.getFile());
+        String resourceName = resourcePath.getName();
 
-        if (resources.containsKey(url)) {
-            return resources.get(url);
+        if (resources.containsKey(resourceName)) {
+            return resources.get(resourceName);
         }
 
-        java.net.URL javaUrl = new java.net.URL(url.getScheme(), url.getHost(), url.getFile());
-        Path pathInHDFS = Utils.shipToHDFS(pigContext, conf, javaUrl);
-        resources.put(url, pathInHDFS);
-        return pathInHDFS;
+        // Ship the resource to the staging directory on the remote FS
+        Path remoteFsPath = remoteFs.makeQualified(new Path(stagingDir, resourceName));
+        remoteFs.copyFromLocalFile(resourcePath, remoteFsPath);
+        resources.put(resourceName, remoteFsPath);
+        return remoteFsPath;
     }
 
-    // Add files already present in HDFS as local resources. Allow the URL
-    // to be different from the path to support resource aliasing in a CACHE
-    // statement. See TezOperPlan::addHdfsResources for an example.
-    public static void addLocalResource(URL url, Path pathInHDFS) throws IOException {
-        if (!"hdfs".equals(url.getScheme())) {
-            throw new PigException("This method should only be called with hdfs:// resources");
+    // Add files already present in the remote FS as local resources. Allow the
+    // resource name to be different from the file name to to support resource
+    // aliasing in a CACHE statement (and to allow the same file to be aliased
+    // with multiple resource names).
+    public static void addTezResource(String resourceName, Path remoteFsPath) throws IOException {
+        if (!resources.containsKey(resourceName)) {
+            resources.put(resourceName, remoteFsPath);
         }
+    }
 
-        if (!resources.containsKey(url)) {
-            resources.put(url, pathInHDFS);
+    public static Map<String, LocalResource> addTezResources(Set<URL> resources) throws Exception {
+        Set<String> resourceNames = new HashSet<String>();
+        for (URL url : resources) {
+            addTezResource(url);
+            resourceNames.add(new Path(url.getFile()).getName());
         }
+        return getTezResources(resourceNames);
     }
 
     public static void addBootStrapJar() throws IOException {
@@ -100,51 +106,30 @@ public class TezResourceManager {
         FileOutputStream fos = new FileOutputStream(jobJar);
         JarManager.createBootStrapJar(fos, pigContext);
 
-        // Ship the job.jar to the staging directory on hdfs
+        // Ship the job.jar to the staging directory on the remote FS
         Path remoteJarPath = remoteFs.makeQualified(new Path(stagingDir, new Path(bootStrapJar.getFile()).getName()));
         remoteFs.copyFromLocalFile(new Path(jobJar.getAbsolutePath()), remoteJarPath);
-        resources.put(bootStrapJar, remoteJarPath);
+        resources.put(jobJar.getName(), remoteJarPath);
     }
 
-    public static Path get(URL url) {
-        return resources.get(url);
-    }
-
-    public static Map<String, LocalResource> getTezResources(Set<URL> urls) throws Exception {
+    public static Map<String, LocalResource> getTezResources(Set<String> resourceNames) throws Exception {
         Map<String, LocalResource> tezResources = new HashMap<String, LocalResource>();
-        for (URL url : urls) {
-            if (!resources.containsKey(url)) {
-                addLocalResource(url);
-            }
-
-            // Extract the resource name from the URL, which will be
-            // symlinked in the container's working directory.
-            String resourceName = getTezResourceName(url);
-            Path resourcePath = resources.get(url);
-
+        for (String resourceName : resourceNames) {
+            // The resource name will be symlinked to the resource path in the
+            // container's working directory.
+            Path resourcePath = resources.get(resourceName);
             FileStatus fstat = remoteFs.getFileStatus(resourcePath);
+
             LocalResource tezResource = LocalResource.newInstance(
                     ConverterUtils.getYarnUrlFromPath(fstat.getPath()),
                     LocalResourceType.FILE,
                     LocalResourceVisibility.APPLICATION,
                     fstat.getLen(),
                     fstat.getModificationTime());
+
             tezResources.put(resourceName, tezResource);
         }
         return tezResources;
     }
-
-    // If the URL has a fragment at the end, use that as the resource name.
-    // This will allow for resource aliasing in a CACHE statement.
-    private static String getTezResourceName(URL resourceUrl) throws Exception {
-        String resourcePath = resourceUrl.getFile();
-        int aliasIndex = resourcePath.indexOf("#");
-
-        if (aliasIndex != -1 && aliasIndex < (resourcePath.length() - 1)) {
-            return resourcePath.substring(aliasIndex + 1);
-        }
-
-        return resourcePath.substring(resourcePath.lastIndexOf("/") + 1);
-    }
 }