You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by ch...@apache.org on 2014/03/01 08:18:33 UTC

svn commit: r1573128 - in /pig/branches/tez: src/org/apache/pig/backend/hadoop/executionengine/tez/ test/org/apache/pig/tez/

Author: cheolsoo
Date: Sat Mar  1 07:18:33 2014
New Revision: 1573128

URL: http://svn.apache.org/r1573128
Log:
PIG-3785: TezResourceManager should not be a singleton (daijy)

Modified:
    pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezCompiler.java
    pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezLauncher.java
    pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezOperPlan.java
    pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezPlanContainer.java
    pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezResourceManager.java
    pig/branches/tez/test/org/apache/pig/tez/TestSecondarySortTez.java
    pig/branches/tez/test/org/apache/pig/tez/TestTezCompiler.java
    pig/branches/tez/test/org/apache/pig/tez/TestTezJobControlCompiler.java

Modified: pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezCompiler.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezCompiler.java?rev=1573128&r1=1573127&r2=1573128&view=diff
==============================================================================
--- pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezCompiler.java (original)
+++ pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezCompiler.java Sat Mar  1 07:18:33 2014
@@ -25,7 +25,6 @@ import java.util.HashMap;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
-import java.util.Map.Entry;
 import java.util.Properties;
 import java.util.Set;
 import java.util.Stack;
@@ -142,6 +141,8 @@ public class TezCompiler extends PhyPlan
     private UDFFinder udfFinder;
 
     private Map<PhysicalOperator, TezOperator> phyToTezOpMap;
+    
+    private TezResourceManager tezResourceManager;
 
     public static final String USER_COMPARATOR_MARKER = "user.comparator.func:";
     public static final String FILE_CONCATENATION_THRESHOLD = "pig.files.concatenation.threshold";
@@ -152,14 +153,16 @@ public class TezCompiler extends PhyPlan
 
     private POLocalRearrangeTezFactory localRearrangeFactory;
 
-    public TezCompiler(PhysicalPlan plan, PigContext pigContext)
+    public TezCompiler(PhysicalPlan plan, PigContext pigContext, TezResourceManager tezResourceManager)
             throws TezCompilerException {
         super(plan, new DepthFirstWalker<PhysicalOperator, PhysicalPlan>(plan));
         this.plan = plan;
         this.pigContext = pigContext;
+        this.tezResourceManager = tezResourceManager;
+        
         pigProperties = pigContext.getProperties();
         splitsSeen = Maps.newHashMap();
-        tezPlan = new TezOperPlan();
+        tezPlan = new TezOperPlan(tezResourceManager);
         nig = NodeIdGenerator.getGenerator();
         udfFinder = new UDFFinder();
         List<PhysicalOperator> roots = plan.getRoots();
@@ -186,7 +189,7 @@ public class TezCompiler extends PhyPlan
 
     // Segment a single DAG into a DAG graph
     public TezPlanContainer getPlanContainer() throws PlanException {
-        TezPlanContainer tezPlanContainer = new TezPlanContainer(pigContext);
+        TezPlanContainer tezPlanContainer = new TezPlanContainer(pigContext, tezResourceManager);
         TezPlanContainerNode node = new TezPlanContainerNode(OperatorKey.genOpKey(scope), tezPlan);
         tezPlanContainer.add(node);
         tezPlanContainer.split(node);

Modified: pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezLauncher.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezLauncher.java?rev=1573128&r1=1573127&r2=1573128&view=diff
==============================================================================
--- pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezLauncher.java (original)
+++ pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezLauncher.java Sat Mar  1 07:18:33 2014
@@ -57,7 +57,7 @@ public class TezLauncher extends Launche
 
         Path stagingDir = FileLocalizer.getTemporaryPath(pc, "-tez");
 
-        TezResourceManager.initialize(stagingDir, pc, conf);
+        TezResourceManager tezResourceManager = new TezResourceManager(stagingDir, pc, conf);
 
         conf.set(TezConfiguration.TEZ_AM_STAGING_DIR, stagingDir.toString());
 
@@ -67,7 +67,7 @@ public class TezLauncher extends Launche
         PigStats.start(tezStats);
 
         TezJobControlCompiler jcc = new TezJobControlCompiler(pc, conf);
-        TezPlanContainer tezPlanContainer = compile(php, pc);
+        TezPlanContainer tezPlanContainer = compile(php, pc, tezResourceManager);
 
         TezOperPlan tezPlan;
 
@@ -132,7 +132,7 @@ public class TezLauncher extends Launche
             String format, boolean verbose) throws PlanException,
             VisitorException, IOException {
         log.debug("Entering TezLauncher.explain");
-        TezPlanContainer tezPlanContainer = compile(php, pc);
+        TezPlanContainer tezPlanContainer = compile(php, pc, null);
 
         if (format.equals("text")) {
             TezPlanContainerPrinter printer = new TezPlanContainerPrinter(ps, tezPlanContainer);
@@ -144,9 +144,9 @@ public class TezLauncher extends Launche
         }
     }
 
-    public TezPlanContainer compile(PhysicalPlan php, PigContext pc)
+    public TezPlanContainer compile(PhysicalPlan php, PigContext pc, TezResourceManager tezResourceManager)
             throws PlanException, IOException, VisitorException {
-        TezCompiler comp = new TezCompiler(php, pc);
+        TezCompiler comp = new TezCompiler(php, pc, tezResourceManager);
         TezOperPlan tezPlan = comp.compile();
         boolean nocombiner = Boolean.parseBoolean(pc.getProperties().getProperty(
                 PigConfiguration.PROP_NO_COMBINER, "false"));

Modified: pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezOperPlan.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezOperPlan.java?rev=1573128&r1=1573127&r2=1573128&view=diff
==============================================================================
--- pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezOperPlan.java (original)
+++ pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezOperPlan.java Sat Mar  1 07:18:33 2014
@@ -41,9 +41,12 @@ public class TezOperPlan extends Operato
 
     private static final long serialVersionUID = 1L;
 
+    private TezResourceManager tezResourceManager;
+
     private Map<String, Path> extraResources = new HashMap<String, Path>();
 
-    public TezOperPlan() {
+    public TezOperPlan(TezResourceManager tezResourceManager) {
+        this.tezResourceManager = tezResourceManager;
     }
 
     @Override
@@ -66,7 +69,7 @@ public class TezOperPlan extends Operato
         String resourceName = resourcePath.getName();
 
         if (!extraResources.containsKey(resourceName)) {
-            Path remoteFsPath = TezResourceManager.addTezResource(url);
+            Path remoteFsPath = tezResourceManager.addTezResource(url);
             extraResources.put(resourceName, remoteFsPath);
         }
     }
@@ -74,7 +77,7 @@ public class TezOperPlan extends Operato
     // Add extra plan-specific local resources already present in the remote FS
     public void addExtraResource(String resourceName, Path remoteFsPath) throws IOException {
         if (!extraResources.containsKey(resourceName)) {
-            TezResourceManager.addTezResource(resourceName, remoteFsPath);
+            tezResourceManager.addTezResource(resourceName, remoteFsPath);
             extraResources.put(resourceName, remoteFsPath);
         }
     }
@@ -89,7 +92,7 @@ public class TezOperPlan extends Operato
         addShipResources(streamVisitor.getShipFiles());
         addCacheResources(streamVisitor.getCacheFiles());
 
-        return TezResourceManager.getTezResources(extraResources.keySet());
+        return tezResourceManager.getTezResources(extraResources.keySet());
     }
 
     // In the statement "SHIP('/home/foo')" we'll map the resource name foo to

Modified: pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezPlanContainer.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezPlanContainer.java?rev=1573128&r1=1573127&r2=1573128&view=diff
==============================================================================
--- pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezPlanContainer.java (original)
+++ pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezPlanContainer.java Sat Mar  1 07:18:33 2014
@@ -38,10 +38,12 @@ import org.apache.pig.impl.util.JarManag
 
 public class TezPlanContainer extends OperatorPlan<TezPlanContainerNode> {
     private static final long serialVersionUID = 1L;
+    private TezResourceManager tezResourceManager;
     private PigContext pigContext;
 
-    public TezPlanContainer(PigContext pigContext) {
+    public TezPlanContainer(PigContext pigContext, TezResourceManager tezResourceManager) {
         this.pigContext = pigContext;
+        this.tezResourceManager = tezResourceManager;
     }
 
     // Add the Pig jar and the UDF jars as AM resources (all DAG's in the planContainer
@@ -50,7 +52,7 @@ public class TezPlanContainer extends Op
     public Map<String, LocalResource> getLocalResources() throws Exception {
         Set<URL> jarLists = new HashSet<URL>();
 
-        jarLists.add(TezResourceManager.getBootStrapJar());
+        jarLists.add(tezResourceManager.getBootStrapJar());
 
         // In MR Pig the extra jars and script jars get put in Distributed Cache, but
         // in Tez we'll add them as local resources.
@@ -76,7 +78,7 @@ public class TezPlanContainer extends Op
         Set<String> udfs = tezPlanContainerUDFCollector.getUdfs();
 
         for (String func : udfs) {
-            Class clazz = pigContext.getClassForAlias(func);
+            Class<?> clazz = pigContext.getClassForAlias(func);
             if (clazz != null) {
                 String jarName = JarManager.findContainingJar(clazz);
                 if (jarName == null) {
@@ -99,7 +101,7 @@ public class TezPlanContainer extends Op
         //     }
         // }
 
-        return TezResourceManager.addTezResources(jarLists);
+        return tezResourceManager.addTezResources(jarLists);
     }
 
     public TezOperPlan getNextPlan(List<TezOperPlan> processedPlans) {
@@ -155,7 +157,7 @@ public class TezPlanContainer extends Op
         if (operToSegment != null) {
             for (TezOperator succ : succs) {
                 tezOperPlan.disconnect(operToSegment, succ);
-                TezOperPlan newOperPlan = new TezOperPlan();
+                TezOperPlan newOperPlan = new TezOperPlan(tezResourceManager);
                 List<TezPlanContainerNode> containerSuccs = new ArrayList<TezPlanContainerNode>();
                 if (getSuccessors(planNode)!=null) {
                     containerSuccs.addAll(getSuccessors(planNode));

Modified: pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezResourceManager.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezResourceManager.java?rev=1573128&r1=1573127&r2=1573128&view=diff
==============================================================================
--- pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezResourceManager.java (original)
+++ pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezResourceManager.java Sat Mar  1 07:18:33 2014
@@ -38,31 +38,31 @@ import org.apache.pig.impl.PigContext;
 import org.apache.pig.impl.util.JarManager;
 
 public class TezResourceManager {
-    private static Path stagingDir;
-    private static PigContext pigContext;
-    private static Configuration conf;
-    private static URL bootStrapJar;
-    private static FileSystem remoteFs;
-    public static Map<String, Path> resources = new HashMap<String, Path>();
+    private Path stagingDir;
+    private PigContext pigContext;
+    private Configuration conf;
+    private URL bootStrapJar;
+    private FileSystem remoteFs;
+    public Map<String, Path> resources = new HashMap<String, Path>();
 
-    public static URL getBootStrapJar() {
+    public URL getBootStrapJar() {
         return bootStrapJar;
     }
 
-    public static void initialize(Path stagingDir, PigContext pigContext, Configuration conf) throws IOException {
+    public TezResourceManager(Path stagingDir, PigContext pigContext, Configuration conf) throws IOException {
         resources.clear();
-        TezResourceManager.stagingDir = stagingDir;
-        TezResourceManager.pigContext = pigContext;
-        TezResourceManager.conf = conf;
+        this.stagingDir = stagingDir;
+        this.pigContext = pigContext;
+        this.conf = conf;
         String jar = JarManager.findContainingJar(org.apache.pig.Main.class);
-        TezResourceManager.bootStrapJar = new File(jar).toURI().toURL();
+        this.bootStrapJar = new File(jar).toURI().toURL();
         remoteFs = FileSystem.get(conf);
         addBootStrapJar();
     }
 
     // Add files from the source FS as local resources. The resource name will
     // be the same as the file name.
-    public static Path addTezResource(URL url) throws IOException {
+    public Path addTezResource(URL url) throws IOException {
         Path resourcePath = new Path(url.getFile());
         String resourceName = resourcePath.getName();
 
@@ -81,13 +81,13 @@ public class TezResourceManager {
     // resource name to be different from the file name to to support resource
     // aliasing in a CACHE statement (and to allow the same file to be aliased
     // with multiple resource names).
-    public static void addTezResource(String resourceName, Path remoteFsPath) throws IOException {
+    public void addTezResource(String resourceName, Path remoteFsPath) throws IOException {
         if (!resources.containsKey(resourceName)) {
             resources.put(resourceName, remoteFsPath);
         }
     }
 
-    public static Map<String, LocalResource> addTezResources(Set<URL> resources) throws Exception {
+    public Map<String, LocalResource> addTezResources(Set<URL> resources) throws Exception {
         Set<String> resourceNames = new HashSet<String>();
         for (URL url : resources) {
             addTezResource(url);
@@ -96,7 +96,7 @@ public class TezResourceManager {
         return getTezResources(resourceNames);
     }
 
-    public static void addBootStrapJar() throws IOException {
+    public void addBootStrapJar() throws IOException {
         if (resources.containsKey(bootStrapJar)) {
             return;
         }
@@ -113,7 +113,7 @@ public class TezResourceManager {
         resources.put(new Path(bootStrapJar.getFile()).getName(), remoteJarPath);
     }
 
-    public static Map<String, LocalResource> getTezResources(Set<String> resourceNames) throws Exception {
+    public Map<String, LocalResource> getTezResources(Set<String> resourceNames) throws Exception {
         Map<String, LocalResource> tezResources = new HashMap<String, LocalResource>();
         for (String resourceName : resourceNames) {
             // The resource name will be symlinked to the resource path in the

Modified: pig/branches/tez/test/org/apache/pig/tez/TestSecondarySortTez.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/test/org/apache/pig/tez/TestSecondarySortTez.java?rev=1573128&r1=1573127&r2=1573128&view=diff
==============================================================================
--- pig/branches/tez/test/org/apache/pig/tez/TestSecondarySortTez.java (original)
+++ pig/branches/tez/test/org/apache/pig/tez/TestSecondarySortTez.java Sat Mar  1 07:18:33 2014
@@ -44,7 +44,7 @@ public class TestSecondarySortTez extend
     public SecondaryKeyOptimizer visitSecondaryKeyOptimizer(String query)
             throws Exception, VisitorException {
         PhysicalPlan pp = Util.buildPp(pigServer, query);
-        TezCompiler comp = new TezCompiler(pp, pc);
+        TezCompiler comp = new TezCompiler(pp, pc, null);
         TezOperPlan tezPlan = comp.compile();
         boolean nocombiner = Boolean.parseBoolean(pc.getProperties().getProperty(
                 PigConfiguration.PROP_NO_COMBINER, "false"));

Modified: pig/branches/tez/test/org/apache/pig/tez/TestTezCompiler.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/test/org/apache/pig/tez/TestTezCompiler.java?rev=1573128&r1=1573127&r2=1573128&view=diff
==============================================================================
--- pig/branches/tez/test/org/apache/pig/tez/TestTezCompiler.java (original)
+++ pig/branches/tez/test/org/apache/pig/tez/TestTezCompiler.java Sat Mar  1 07:18:33 2014
@@ -316,7 +316,7 @@ public class TestTezCompiler {
     private void run(String query, String expectedFile) throws Exception {
         PhysicalPlan pp = Util.buildPp(pigServer, query);
         TezLauncher launcher = new TezLauncher();
-        TezPlanContainer tezPlanContainer = launcher.compile(pp, pc);
+        TezPlanContainer tezPlanContainer = launcher.compile(pp, pc, null);
 
         ByteArrayOutputStream baos = new ByteArrayOutputStream();
         PrintStream ps = new PrintStream(baos);

Modified: pig/branches/tez/test/org/apache/pig/tez/TestTezJobControlCompiler.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/test/org/apache/pig/tez/TestTezJobControlCompiler.java?rev=1573128&r1=1573127&r2=1573128&view=diff
==============================================================================
--- pig/branches/tez/test/org/apache/pig/tez/TestTezJobControlCompiler.java (original)
+++ pig/branches/tez/test/org/apache/pig/tez/TestTezJobControlCompiler.java Sat Mar  1 07:18:33 2014
@@ -169,7 +169,7 @@ public class TestTezJobControlCompiler {
 
     private Pair<TezOperPlan, DAG> compile(String query) throws Exception {
         PhysicalPlan pp = Util.buildPp(pigServer, query);
-        TezCompiler comp = new TezCompiler(pp, pc);
+        TezCompiler comp = new TezCompiler(pp, pc, null);
         TezOperPlan tezPlan = comp.compile();
         TezJobControlCompiler jobComp = new TezJobControlCompiler(pc, new Configuration());
         DAG dag = jobComp.buildDAG(tezPlan, new HashMap<String, LocalResource>());