You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by pr...@apache.org on 2009/04/14 19:57:57 UTC

svn commit: r764904 - in /hadoop/pig/branches/multiquery: ./ src/org/apache/pig/ src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/ src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/plans/ src/org/apache/pig/backend/hadoop/...

Author: pradeepkth
Date: Tue Apr 14 17:57:56 2009
New Revision: 764904

URL: http://svn.apache.org/viewvc?rev=764904&view=rev
Log:
PIG-627: multiquery support incremental patch (hagleitn via pradeepkth)

Modified:
    hadoop/pig/branches/multiquery/CHANGES.txt
    hadoop/pig/branches/multiquery/src/org/apache/pig/PigServer.java
    hadoop/pig/branches/multiquery/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceLauncher.java
    hadoop/pig/branches/multiquery/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/NoopFilterRemover.java
    hadoop/pig/branches/multiquery/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/plans/PhyPlanVisitor.java
    hadoop/pig/branches/multiquery/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POSplit.java
    hadoop/pig/branches/multiquery/src/org/apache/pig/impl/logicalLayer/parser/QueryParser.jjt
    hadoop/pig/branches/multiquery/src/org/apache/pig/tools/grunt/GruntParser.java
    hadoop/pig/branches/multiquery/test/org/apache/pig/test/TestLoad.java
    hadoop/pig/branches/multiquery/test/org/apache/pig/test/TestStore.java

Modified: hadoop/pig/branches/multiquery/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/multiquery/CHANGES.txt?rev=764904&r1=764903&r2=764904&view=diff
==============================================================================
--- hadoop/pig/branches/multiquery/CHANGES.txt (original)
+++ hadoop/pig/branches/multiquery/CHANGES.txt Tue Apr 14 17:57:56 2009
@@ -600,3 +600,5 @@
     PIG-627: multiquery support incremental patch (hagleitn via pradeepkth)
 
     PIG-627: multiquery support incremental patch (hagleitn via pradeepkth)
+
+    PIG-627: multiquery support incremental patch (hagleitn via pradeepkth)

Modified: hadoop/pig/branches/multiquery/src/org/apache/pig/PigServer.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/multiquery/src/org/apache/pig/PigServer.java?rev=764904&r1=764903&r2=764904&view=diff
==============================================================================
--- hadoop/pig/branches/multiquery/src/org/apache/pig/PigServer.java (original)
+++ hadoop/pig/branches/multiquery/src/org/apache/pig/PigServer.java Tue Apr 14 17:57:56 2009
@@ -884,7 +884,8 @@
             this.batchMode = batchMode;
             this.processedStores = 0;
             this.ignoreNumStores = 0;
-            this.jobName = "DefaultJobName";
+            this.jobName = pigContext.getProperties().getProperty(PigContext.JOB_NAME,
+                                                                  PigContext.JOB_NAME_PREFIX+":DefaultJobName");
             this.lp = new LogicalPlan();
         };
         
@@ -901,7 +902,7 @@
         boolean isBatchEmpty() { return processedStores == storeOpTable.keySet().size(); }
         
         void execute() throws ExecException, FrontendException {
-            pigContext.getProperties().setProperty(PigContext.JOB_NAME, PigContext.JOB_NAME_PREFIX + ":" + jobName);
+            pigContext.getProperties().setProperty(PigContext.JOB_NAME, jobName);
             PigServer.this.execute(null);
             processedStores = storeOpTable.keySet().size();
         }
@@ -911,7 +912,7 @@
         }
 
         void setJobName(String name) {
-            jobName = name;
+            jobName = PigContext.JOB_NAME_PREFIX+":"+name;
         }
 
         LogicalPlan getPlan(String alias) throws IOException {

Modified: hadoop/pig/branches/multiquery/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceLauncher.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/multiquery/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceLauncher.java?rev=764904&r1=764903&r2=764904&view=diff
==============================================================================
--- hadoop/pig/branches/multiquery/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceLauncher.java (original)
+++ hadoop/pig/branches/multiquery/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceLauncher.java Tue Apr 14 17:57:56 2009
@@ -222,10 +222,6 @@
         POPackageAnnotator pkgAnnotator = new POPackageAnnotator(plan);
         pkgAnnotator.visit();
         
-        // check whether stream operator is present
-        MRStreamHandler checker = new MRStreamHandler(plan);
-        checker.visit();
-        
         // optimize joins
         LastInputStreamingOptimizer liso = 
             new MRCompiler.LastInputStreamingOptimizer(plan, lastInputChunkSize);
@@ -250,6 +246,12 @@
         // NoopFilterRemover.
         NoopStoreRemover sRem = new NoopStoreRemover(plan);
         sRem.visit();
+
+        // check whether stream operator is present
+        // after MultiQueryOptimizer because it can shift streams from
+        // map to reduce, etc.
+        MRStreamHandler checker = new MRStreamHandler(plan);
+        checker.visit();
         
         return plan;
     }

Modified: hadoop/pig/branches/multiquery/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/NoopFilterRemover.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/multiquery/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/NoopFilterRemover.java?rev=764904&r1=764903&r2=764904&view=diff
==============================================================================
--- hadoop/pig/branches/multiquery/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/NoopFilterRemover.java (original)
+++ hadoop/pig/branches/multiquery/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/NoopFilterRemover.java Tue Apr 14 17:57:56 2009
@@ -71,6 +71,7 @@
             for (Pair<POFilter, PhysicalPlan> pair: removalQ) {
                 removeFilter(pair.first, pair.second);
             }
+            removalQ.clear();
         }
         
         @Override
@@ -94,7 +95,15 @@
         private void removeFilter(POFilter filter, PhysicalPlan plan) {
             if (plan.size() > 1) {
                 try {
+                    List<PhysicalOperator> fInputs = filter.getInputs();
+                    List<PhysicalOperator> sucs = plan.getSuccessors(filter);
+
                     plan.removeAndReconnect(filter);
+                    if(sucs!=null && sucs.size()!=0){
+                        for (PhysicalOperator suc : sucs) {
+                            suc.setInputs(fInputs);
+                        }
+                    }
                 } catch (PlanException pe) {
                     log.info("Couldn't remove a filter in optimizer: "+pe.getMessage());
                 }

Modified: hadoop/pig/branches/multiquery/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/plans/PhyPlanVisitor.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/multiquery/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/plans/PhyPlanVisitor.java?rev=764904&r1=764903&r2=764904&view=diff
==============================================================================
--- hadoop/pig/branches/multiquery/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/plans/PhyPlanVisitor.java (original)
+++ hadoop/pig/branches/multiquery/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/plans/PhyPlanVisitor.java Tue Apr 14 17:57:56 2009
@@ -65,6 +65,7 @@
         for (PhysicalPlan plan : inpPlans) {
             pushWalker(mCurrentWalker.spawnChildWalker(plan));
             visit();
+            popWalker();
         }
     }
 
@@ -115,6 +116,7 @@
         for (PhysicalPlan plan : inpPlans) {
             pushWalker(mCurrentWalker.spawnChildWalker(plan));
             visit();
+            popWalker();
         }
 	}
     
@@ -258,6 +260,7 @@
         for (PhysicalPlan plan : inpPlans) {
             pushWalker(mCurrentWalker.spawnChildWalker(plan));
             visit();
+            popWalker();
         }
         
     }

Modified: hadoop/pig/branches/multiquery/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POSplit.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/multiquery/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POSplit.java?rev=764904&r1=764903&r2=764904&view=diff
==============================================================================
--- hadoop/pig/branches/multiquery/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POSplit.java (original)
+++ hadoop/pig/branches/multiquery/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POSplit.java Tue Apr 14 17:57:56 2009
@@ -95,6 +95,8 @@
     
     private static Result empty = new Result(POStatus.STATUS_NULL, null);
     
+    private boolean inpEOP = false;
+    
     /**
      * Constructs an operator with the specified key
      * @param k the operator key
@@ -200,6 +202,12 @@
    
     @Override
     public Result getNext(Tuple t) throws ExecException {
+
+        if (this.parentPlan.endOfAllInput) {
+            
+            return getStreamCloseResult();         
+        
+        } 
         
         if (processedSet.cardinality() == myPlans.size()) {
             
@@ -258,5 +266,53 @@
         
         return res;
     }
+    
+    private Result getStreamCloseResult() throws ExecException {
+        Result res = null;
+        
+        while (true) {
+            
+            if (processedSet.cardinality() == myPlans.size()) {
+                Result inp = processInput();
+                if (inp.returnStatus == POStatus.STATUS_OK) {                
+                    Tuple tuple = (Tuple)inp.result;
+                    for (PhysicalPlan pl : myPlans) {
+                        pl.attachInput(tuple);
+                    }
+                    inpEOP = false;
+                } else if (inp.returnStatus == POStatus.STATUS_EOP){
+                    inpEOP = true;
+                } else if (inp.returnStatus == POStatus.STATUS_NULL) {
+                    inpEOP = false;
+                } else if (inp.returnStatus == POStatus.STATUS_ERR) {
+                    return inp;
+                }
+                processedSet.clear();
+            } 
+            
+            int idx = processedSet.nextClearBit(0);
+            PhysicalOperator leaf = myPlans.get(idx).getLeaves().get(0);
+            
+            res = leaf.getNext(dummyTuple);
+           
+            if (res.returnStatus == POStatus.STATUS_EOP)  {
+                processedSet.set(idx++);        
+                if (idx < myPlans.size()) {
+                    continue;
+                }
+            } else {
+                break;
+            }
+            
+            if (!inpEOP && res.returnStatus == POStatus.STATUS_EOP) {                   
+                continue;
+            } else {
+                break;
+            }
+        }
+        
+        return res;
+                
+    }
         
 }

Modified: hadoop/pig/branches/multiquery/src/org/apache/pig/impl/logicalLayer/parser/QueryParser.jjt
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/multiquery/src/org/apache/pig/impl/logicalLayer/parser/QueryParser.jjt?rev=764904&r1=764903&r2=764904&view=diff
==============================================================================
--- hadoop/pig/branches/multiquery/src/org/apache/pig/impl/logicalLayer/parser/QueryParser.jjt (original)
+++ hadoop/pig/branches/multiquery/src/org/apache/pig/impl/logicalLayer/parser/QueryParser.jjt Tue Apr 14 17:57:56 2009
@@ -65,6 +65,7 @@
 import org.apache.pig.backend.datastorage.DataStorage;
 import org.apache.pig.backend.datastorage.ContainerDescriptor;
 import org.apache.pig.backend.datastorage.ElementDescriptor;
+import org.apache.hadoop.fs.Path;
 
 public class QueryParser {
 	private PigContext pigContext;
@@ -213,6 +214,13 @@
             path = uri.getSchemeSpecificPart();
         }
 
+        if ((scheme == null && pigContext.getExecType() == ExecType.MAPREDUCE) ||
+            "hdfs".equalsIgnoreCase(scheme)) {
+            // We need to get the path from a hadoop path object,
+            // otherwise special glob characters could get removed.
+            path = new Path(fname).toUri().getPath();
+        }
+
         if (scheme == null || scheme.equals("file") || scheme.equals("hdfs")) {
             if (pigContext.getExecType() != ExecType.LOCAL) {
                 if (fname.startsWith(FileLocalizer.LOCAL_PREFIX)) {

Modified: hadoop/pig/branches/multiquery/src/org/apache/pig/tools/grunt/GruntParser.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/multiquery/src/org/apache/pig/tools/grunt/GruntParser.java?rev=764904&r1=764903&r2=764904&view=diff
==============================================================================
--- hadoop/pig/branches/multiquery/src/org/apache/pig/tools/grunt/GruntParser.java (original)
+++ hadoop/pig/branches/multiquery/src/org/apache/pig/tools/grunt/GruntParser.java Tue Apr 14 17:57:56 2009
@@ -322,6 +322,7 @@
         
         if (batch) {
             setBatchOn();
+            mPigServer.setJobName(script);
             try {
                 loadScript(script, true, mLoadOnly, params, files);
                 executeBatch();

Modified: hadoop/pig/branches/multiquery/test/org/apache/pig/test/TestLoad.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/multiquery/test/org/apache/pig/test/TestLoad.java?rev=764904&r1=764903&r2=764904&view=diff
==============================================================================
--- hadoop/pig/branches/multiquery/test/org/apache/pig/test/TestLoad.java (original)
+++ hadoop/pig/branches/multiquery/test/org/apache/pig/test/TestLoad.java Tue Apr 14 17:57:56 2009
@@ -132,7 +132,7 @@
 
     @Test
     public void testLoadRemoteRelScheme() throws Exception {
-        checkLoadPath("hdfs:test","/tmp/test");
+        checkLoadPath("test","/tmp/test");
     }
 
     @Test

Modified: hadoop/pig/branches/multiquery/test/org/apache/pig/test/TestStore.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/multiquery/test/org/apache/pig/test/TestStore.java?rev=764904&r1=764903&r2=764904&view=diff
==============================================================================
--- hadoop/pig/branches/multiquery/test/org/apache/pig/test/TestStore.java (original)
+++ hadoop/pig/branches/multiquery/test/org/apache/pig/test/TestStore.java Tue Apr 14 17:57:56 2009
@@ -217,7 +217,7 @@
 
     @Test
     public void testStoreRemoteRelScheme() throws Exception {
-        checkStorePath("hdfs:test","/tmp/test");
+        checkStorePath("test","/tmp/test");
     }
 
     @Test