You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by ch...@apache.org on 2014/03/01 08:18:33 UTC
svn commit: r1573128 - in /pig/branches/tez:
src/org/apache/pig/backend/hadoop/executionengine/tez/
test/org/apache/pig/tez/
Author: cheolsoo
Date: Sat Mar 1 07:18:33 2014
New Revision: 1573128
URL: http://svn.apache.org/r1573128
Log:
PIG-3785: TezResourceManager should not be a singleton (daijy)
Modified:
pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezCompiler.java
pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezLauncher.java
pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezOperPlan.java
pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezPlanContainer.java
pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezResourceManager.java
pig/branches/tez/test/org/apache/pig/tez/TestSecondarySortTez.java
pig/branches/tez/test/org/apache/pig/tez/TestTezCompiler.java
pig/branches/tez/test/org/apache/pig/tez/TestTezJobControlCompiler.java
Modified: pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezCompiler.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezCompiler.java?rev=1573128&r1=1573127&r2=1573128&view=diff
==============================================================================
--- pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezCompiler.java (original)
+++ pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezCompiler.java Sat Mar 1 07:18:33 2014
@@ -25,7 +25,6 @@ import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
-import java.util.Map.Entry;
import java.util.Properties;
import java.util.Set;
import java.util.Stack;
@@ -142,6 +141,8 @@ public class TezCompiler extends PhyPlan
private UDFFinder udfFinder;
private Map<PhysicalOperator, TezOperator> phyToTezOpMap;
+
+ private TezResourceManager tezResourceManager;
public static final String USER_COMPARATOR_MARKER = "user.comparator.func:";
public static final String FILE_CONCATENATION_THRESHOLD = "pig.files.concatenation.threshold";
@@ -152,14 +153,16 @@ public class TezCompiler extends PhyPlan
private POLocalRearrangeTezFactory localRearrangeFactory;
- public TezCompiler(PhysicalPlan plan, PigContext pigContext)
+ public TezCompiler(PhysicalPlan plan, PigContext pigContext, TezResourceManager tezResourceManager)
throws TezCompilerException {
super(plan, new DepthFirstWalker<PhysicalOperator, PhysicalPlan>(plan));
this.plan = plan;
this.pigContext = pigContext;
+ this.tezResourceManager = tezResourceManager;
+
pigProperties = pigContext.getProperties();
splitsSeen = Maps.newHashMap();
- tezPlan = new TezOperPlan();
+ tezPlan = new TezOperPlan(tezResourceManager);
nig = NodeIdGenerator.getGenerator();
udfFinder = new UDFFinder();
List<PhysicalOperator> roots = plan.getRoots();
@@ -186,7 +189,7 @@ public class TezCompiler extends PhyPlan
// Segment a single DAG into a DAG graph
public TezPlanContainer getPlanContainer() throws PlanException {
- TezPlanContainer tezPlanContainer = new TezPlanContainer(pigContext);
+ TezPlanContainer tezPlanContainer = new TezPlanContainer(pigContext, tezResourceManager);
TezPlanContainerNode node = new TezPlanContainerNode(OperatorKey.genOpKey(scope), tezPlan);
tezPlanContainer.add(node);
tezPlanContainer.split(node);
Modified: pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezLauncher.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezLauncher.java?rev=1573128&r1=1573127&r2=1573128&view=diff
==============================================================================
--- pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezLauncher.java (original)
+++ pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezLauncher.java Sat Mar 1 07:18:33 2014
@@ -57,7 +57,7 @@ public class TezLauncher extends Launche
Path stagingDir = FileLocalizer.getTemporaryPath(pc, "-tez");
- TezResourceManager.initialize(stagingDir, pc, conf);
+ TezResourceManager tezResourceManager = new TezResourceManager(stagingDir, pc, conf);
conf.set(TezConfiguration.TEZ_AM_STAGING_DIR, stagingDir.toString());
@@ -67,7 +67,7 @@ public class TezLauncher extends Launche
PigStats.start(tezStats);
TezJobControlCompiler jcc = new TezJobControlCompiler(pc, conf);
- TezPlanContainer tezPlanContainer = compile(php, pc);
+ TezPlanContainer tezPlanContainer = compile(php, pc, tezResourceManager);
TezOperPlan tezPlan;
@@ -132,7 +132,7 @@ public class TezLauncher extends Launche
String format, boolean verbose) throws PlanException,
VisitorException, IOException {
log.debug("Entering TezLauncher.explain");
- TezPlanContainer tezPlanContainer = compile(php, pc);
+ TezPlanContainer tezPlanContainer = compile(php, pc, null);
if (format.equals("text")) {
TezPlanContainerPrinter printer = new TezPlanContainerPrinter(ps, tezPlanContainer);
@@ -144,9 +144,9 @@ public class TezLauncher extends Launche
}
}
- public TezPlanContainer compile(PhysicalPlan php, PigContext pc)
+ public TezPlanContainer compile(PhysicalPlan php, PigContext pc, TezResourceManager tezResourceManager)
throws PlanException, IOException, VisitorException {
- TezCompiler comp = new TezCompiler(php, pc);
+ TezCompiler comp = new TezCompiler(php, pc, tezResourceManager);
TezOperPlan tezPlan = comp.compile();
boolean nocombiner = Boolean.parseBoolean(pc.getProperties().getProperty(
PigConfiguration.PROP_NO_COMBINER, "false"));
Modified: pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezOperPlan.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezOperPlan.java?rev=1573128&r1=1573127&r2=1573128&view=diff
==============================================================================
--- pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezOperPlan.java (original)
+++ pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezOperPlan.java Sat Mar 1 07:18:33 2014
@@ -41,9 +41,12 @@ public class TezOperPlan extends Operato
private static final long serialVersionUID = 1L;
+ private TezResourceManager tezResourceManager;
+
private Map<String, Path> extraResources = new HashMap<String, Path>();
- public TezOperPlan() {
+ public TezOperPlan(TezResourceManager tezResourceManager) {
+ this.tezResourceManager = tezResourceManager;
}
@Override
@@ -66,7 +69,7 @@ public class TezOperPlan extends Operato
String resourceName = resourcePath.getName();
if (!extraResources.containsKey(resourceName)) {
- Path remoteFsPath = TezResourceManager.addTezResource(url);
+ Path remoteFsPath = tezResourceManager.addTezResource(url);
extraResources.put(resourceName, remoteFsPath);
}
}
@@ -74,7 +77,7 @@ public class TezOperPlan extends Operato
// Add extra plan-specific local resources already present in the remote FS
public void addExtraResource(String resourceName, Path remoteFsPath) throws IOException {
if (!extraResources.containsKey(resourceName)) {
- TezResourceManager.addTezResource(resourceName, remoteFsPath);
+ tezResourceManager.addTezResource(resourceName, remoteFsPath);
extraResources.put(resourceName, remoteFsPath);
}
}
@@ -89,7 +92,7 @@ public class TezOperPlan extends Operato
addShipResources(streamVisitor.getShipFiles());
addCacheResources(streamVisitor.getCacheFiles());
- return TezResourceManager.getTezResources(extraResources.keySet());
+ return tezResourceManager.getTezResources(extraResources.keySet());
}
// In the statement "SHIP('/home/foo')" we'll map the resource name foo to
Modified: pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezPlanContainer.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezPlanContainer.java?rev=1573128&r1=1573127&r2=1573128&view=diff
==============================================================================
--- pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezPlanContainer.java (original)
+++ pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezPlanContainer.java Sat Mar 1 07:18:33 2014
@@ -38,10 +38,12 @@ import org.apache.pig.impl.util.JarManag
public class TezPlanContainer extends OperatorPlan<TezPlanContainerNode> {
private static final long serialVersionUID = 1L;
+ private TezResourceManager tezResourceManager;
private PigContext pigContext;
- public TezPlanContainer(PigContext pigContext) {
+ public TezPlanContainer(PigContext pigContext, TezResourceManager tezResourceManager) {
this.pigContext = pigContext;
+ this.tezResourceManager = tezResourceManager;
}
// Add the Pig jar and the UDF jars as AM resources (all DAG's in the planContainer
@@ -50,7 +52,7 @@ public class TezPlanContainer extends Op
public Map<String, LocalResource> getLocalResources() throws Exception {
Set<URL> jarLists = new HashSet<URL>();
- jarLists.add(TezResourceManager.getBootStrapJar());
+ jarLists.add(tezResourceManager.getBootStrapJar());
// In MR Pig the extra jars and script jars get put in Distributed Cache, but
// in Tez we'll add them as local resources.
@@ -76,7 +78,7 @@ public class TezPlanContainer extends Op
Set<String> udfs = tezPlanContainerUDFCollector.getUdfs();
for (String func : udfs) {
- Class clazz = pigContext.getClassForAlias(func);
+ Class<?> clazz = pigContext.getClassForAlias(func);
if (clazz != null) {
String jarName = JarManager.findContainingJar(clazz);
if (jarName == null) {
@@ -99,7 +101,7 @@ public class TezPlanContainer extends Op
// }
// }
- return TezResourceManager.addTezResources(jarLists);
+ return tezResourceManager.addTezResources(jarLists);
}
public TezOperPlan getNextPlan(List<TezOperPlan> processedPlans) {
@@ -155,7 +157,7 @@ public class TezPlanContainer extends Op
if (operToSegment != null) {
for (TezOperator succ : succs) {
tezOperPlan.disconnect(operToSegment, succ);
- TezOperPlan newOperPlan = new TezOperPlan();
+ TezOperPlan newOperPlan = new TezOperPlan(tezResourceManager);
List<TezPlanContainerNode> containerSuccs = new ArrayList<TezPlanContainerNode>();
if (getSuccessors(planNode)!=null) {
containerSuccs.addAll(getSuccessors(planNode));
Modified: pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezResourceManager.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezResourceManager.java?rev=1573128&r1=1573127&r2=1573128&view=diff
==============================================================================
--- pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezResourceManager.java (original)
+++ pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezResourceManager.java Sat Mar 1 07:18:33 2014
@@ -38,31 +38,31 @@ import org.apache.pig.impl.PigContext;
import org.apache.pig.impl.util.JarManager;
public class TezResourceManager {
- private static Path stagingDir;
- private static PigContext pigContext;
- private static Configuration conf;
- private static URL bootStrapJar;
- private static FileSystem remoteFs;
- public static Map<String, Path> resources = new HashMap<String, Path>();
+ private Path stagingDir;
+ private PigContext pigContext;
+ private Configuration conf;
+ private URL bootStrapJar;
+ private FileSystem remoteFs;
+ public Map<String, Path> resources = new HashMap<String, Path>();
- public static URL getBootStrapJar() {
+ public URL getBootStrapJar() {
return bootStrapJar;
}
- public static void initialize(Path stagingDir, PigContext pigContext, Configuration conf) throws IOException {
+ public TezResourceManager(Path stagingDir, PigContext pigContext, Configuration conf) throws IOException {
resources.clear();
- TezResourceManager.stagingDir = stagingDir;
- TezResourceManager.pigContext = pigContext;
- TezResourceManager.conf = conf;
+ this.stagingDir = stagingDir;
+ this.pigContext = pigContext;
+ this.conf = conf;
String jar = JarManager.findContainingJar(org.apache.pig.Main.class);
- TezResourceManager.bootStrapJar = new File(jar).toURI().toURL();
+ this.bootStrapJar = new File(jar).toURI().toURL();
remoteFs = FileSystem.get(conf);
addBootStrapJar();
}
// Add files from the source FS as local resources. The resource name will
// be the same as the file name.
- public static Path addTezResource(URL url) throws IOException {
+ public Path addTezResource(URL url) throws IOException {
Path resourcePath = new Path(url.getFile());
String resourceName = resourcePath.getName();
@@ -81,13 +81,13 @@ public class TezResourceManager {
// resource name to be different from the file name to to support resource
// aliasing in a CACHE statement (and to allow the same file to be aliased
// with multiple resource names).
- public static void addTezResource(String resourceName, Path remoteFsPath) throws IOException {
+ public void addTezResource(String resourceName, Path remoteFsPath) throws IOException {
if (!resources.containsKey(resourceName)) {
resources.put(resourceName, remoteFsPath);
}
}
- public static Map<String, LocalResource> addTezResources(Set<URL> resources) throws Exception {
+ public Map<String, LocalResource> addTezResources(Set<URL> resources) throws Exception {
Set<String> resourceNames = new HashSet<String>();
for (URL url : resources) {
addTezResource(url);
@@ -96,7 +96,7 @@ public class TezResourceManager {
return getTezResources(resourceNames);
}
- public static void addBootStrapJar() throws IOException {
+ public void addBootStrapJar() throws IOException {
if (resources.containsKey(bootStrapJar)) {
return;
}
@@ -113,7 +113,7 @@ public class TezResourceManager {
resources.put(new Path(bootStrapJar.getFile()).getName(), remoteJarPath);
}
- public static Map<String, LocalResource> getTezResources(Set<String> resourceNames) throws Exception {
+ public Map<String, LocalResource> getTezResources(Set<String> resourceNames) throws Exception {
Map<String, LocalResource> tezResources = new HashMap<String, LocalResource>();
for (String resourceName : resourceNames) {
// The resource name will be symlinked to the resource path in the
Modified: pig/branches/tez/test/org/apache/pig/tez/TestSecondarySortTez.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/test/org/apache/pig/tez/TestSecondarySortTez.java?rev=1573128&r1=1573127&r2=1573128&view=diff
==============================================================================
--- pig/branches/tez/test/org/apache/pig/tez/TestSecondarySortTez.java (original)
+++ pig/branches/tez/test/org/apache/pig/tez/TestSecondarySortTez.java Sat Mar 1 07:18:33 2014
@@ -44,7 +44,7 @@ public class TestSecondarySortTez extend
public SecondaryKeyOptimizer visitSecondaryKeyOptimizer(String query)
throws Exception, VisitorException {
PhysicalPlan pp = Util.buildPp(pigServer, query);
- TezCompiler comp = new TezCompiler(pp, pc);
+ TezCompiler comp = new TezCompiler(pp, pc, null);
TezOperPlan tezPlan = comp.compile();
boolean nocombiner = Boolean.parseBoolean(pc.getProperties().getProperty(
PigConfiguration.PROP_NO_COMBINER, "false"));
Modified: pig/branches/tez/test/org/apache/pig/tez/TestTezCompiler.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/test/org/apache/pig/tez/TestTezCompiler.java?rev=1573128&r1=1573127&r2=1573128&view=diff
==============================================================================
--- pig/branches/tez/test/org/apache/pig/tez/TestTezCompiler.java (original)
+++ pig/branches/tez/test/org/apache/pig/tez/TestTezCompiler.java Sat Mar 1 07:18:33 2014
@@ -316,7 +316,7 @@ public class TestTezCompiler {
private void run(String query, String expectedFile) throws Exception {
PhysicalPlan pp = Util.buildPp(pigServer, query);
TezLauncher launcher = new TezLauncher();
- TezPlanContainer tezPlanContainer = launcher.compile(pp, pc);
+ TezPlanContainer tezPlanContainer = launcher.compile(pp, pc, null);
ByteArrayOutputStream baos = new ByteArrayOutputStream();
PrintStream ps = new PrintStream(baos);
Modified: pig/branches/tez/test/org/apache/pig/tez/TestTezJobControlCompiler.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/test/org/apache/pig/tez/TestTezJobControlCompiler.java?rev=1573128&r1=1573127&r2=1573128&view=diff
==============================================================================
--- pig/branches/tez/test/org/apache/pig/tez/TestTezJobControlCompiler.java (original)
+++ pig/branches/tez/test/org/apache/pig/tez/TestTezJobControlCompiler.java Sat Mar 1 07:18:33 2014
@@ -169,7 +169,7 @@ public class TestTezJobControlCompiler {
private Pair<TezOperPlan, DAG> compile(String query) throws Exception {
PhysicalPlan pp = Util.buildPp(pigServer, query);
- TezCompiler comp = new TezCompiler(pp, pc);
+ TezCompiler comp = new TezCompiler(pp, pc, null);
TezOperPlan tezPlan = comp.compile();
TezJobControlCompiler jobComp = new TezJobControlCompiler(pc, new Configuration());
DAG dag = jobComp.buildDAG(tezPlan, new HashMap<String, LocalResource>());