You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by pr...@apache.org on 2009/11/24 20:54:34 UTC

svn commit: r883836 [21/23] - in /hadoop/pig/branches/load-store-redesign: ./ contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/ contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/ contrib/zebra/ contrib/zebr...

Modified: hadoop/pig/branches/load-store-redesign/src/org/apache/pig/impl/builtin/GFCross.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/src/org/apache/pig/impl/builtin/GFCross.java?rev=883836&r1=883835&r2=883836&view=diff
==============================================================================
--- hadoop/pig/branches/load-store-redesign/src/org/apache/pig/impl/builtin/GFCross.java (original)
+++ hadoop/pig/branches/load-store-redesign/src/org/apache/pig/impl/builtin/GFCross.java Tue Nov 24 19:54:19 2009
@@ -33,7 +33,7 @@
     BagFactory mBagFactory = BagFactory.getInstance();
     TupleFactory mTupleFactory = TupleFactory.getInstance();
     
-    public static int DEFAULT_PARALLELISM = 96;
+    public static final int DEFAULT_PARALLELISM = 96;
 
     @Override
     public DataBag exec(Tuple input) throws IOException {

Modified: hadoop/pig/branches/load-store-redesign/src/org/apache/pig/impl/io/FileLocalizer.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/src/org/apache/pig/impl/io/FileLocalizer.java?rev=883836&r1=883835&r2=883836&view=diff
==============================================================================
--- hadoop/pig/branches/load-store-redesign/src/org/apache/pig/impl/io/FileLocalizer.java (original)
+++ hadoop/pig/branches/load-store-redesign/src/org/apache/pig/impl/io/FileLocalizer.java Tue Nov 24 19:54:19 2009
@@ -391,7 +391,9 @@
             // TODO probably this should be replaced with the local file system
             File f = (new File(fileSpec)).getParentFile();
             if (f!=null){
-                f.mkdirs();
+                boolean res = f.mkdirs();
+                if (!res)
+                    log.warn("FileLocalizer.create: failed to create " + f);
             }
             
             return new FileOutputStream(fileSpec,append);

Modified: hadoop/pig/branches/load-store-redesign/src/org/apache/pig/impl/logicalLayer/ProjectStarTranslator.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/src/org/apache/pig/impl/logicalLayer/ProjectStarTranslator.java?rev=883836&r1=883835&r2=883836&view=diff
==============================================================================
--- hadoop/pig/branches/load-store-redesign/src/org/apache/pig/impl/logicalLayer/ProjectStarTranslator.java (original)
+++ hadoop/pig/branches/load-store-redesign/src/org/apache/pig/impl/logicalLayer/ProjectStarTranslator.java Tue Nov 24 19:54:19 2009
@@ -57,7 +57,6 @@
         //get the attributes of cogroup that are modified during the trnalsation
         
         MultiMap<LogicalOperator, LogicalPlan> mapGByPlans = cg.getGroupByPlans();
-
         for(LogicalOperator op: cg.getInputs()) {
             ArrayList<LogicalPlan> newGByPlans = new ArrayList<LogicalPlan>();
             for(LogicalPlan lp: mapGByPlans.get(op)) {
@@ -70,9 +69,41 @@
                     newGByPlans.add(lp);
                 }
             }
+            
+            
             mapGByPlans.removeKey(op);
             mapGByPlans.put(op, newGByPlans);
         }
+        
+        // check if after translation none of group by plans in a cogroup
+        // have a project(*) - if they still do it's because the input
+        // for the project(*) did not have a schema - in this case, we should
+        // error out since we could have different number/types of 
+        // cogroup keys
+        if(cg.getInputs().size() > 1) { // only for cogroups
+            for(LogicalOperator op: cg.getInputs()) {
+                for(LogicalPlan lp: mapGByPlans.get(op)) {
+                    if(checkPlanForProjectStar(lp)) {
+                        // not following Error handling guidelines to give error code
+                        // and error source since this will get swallowed by the parser
+                        // which will just return a ParseException
+                        throw new VisitorException("Cogroup/Group by * is only allowed if " +
+                        		"the input has a schema");
+                    }
+                }
+            }
+            // check if after translation all group by plans have same arity
+            int arity = mapGByPlans.get(cg.getInputs().get(0)).size();
+            for(LogicalOperator op: cg.getInputs()) {
+                if(arity != mapGByPlans.get(op).size()) {
+                    // not following Error handling guidelines to give error code
+                    // and error source since this will get swallowed by the parser
+                    // which will just return a ParseException
+                    throw new VisitorException("The arity of cogroup/group by columns " +
+                    		"do not match");
+                }
+            }
+        }
     }
     
     /* (non-Javadoc)

Modified: hadoop/pig/branches/load-store-redesign/src/org/apache/pig/impl/logicalLayer/optimizer/PushDownForeachFlatten.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/src/org/apache/pig/impl/logicalLayer/optimizer/PushDownForeachFlatten.java?rev=883836&r1=883835&r2=883836&view=diff
==============================================================================
--- hadoop/pig/branches/load-store-redesign/src/org/apache/pig/impl/logicalLayer/optimizer/PushDownForeachFlatten.java (original)
+++ hadoop/pig/branches/load-store-redesign/src/org/apache/pig/impl/logicalLayer/optimizer/PushDownForeachFlatten.java Tue Nov 24 19:54:19 2009
@@ -163,12 +163,14 @@
             
             // check if any of the foreach's peers is a foreach flatten
             // if so then this rule does not apply
-            for(LogicalOperator peer: peers) {
-                if(!peer.equals(foreach)) {
-                    if(peer instanceof LOForEach) {
-                        LOForEach peerForeach = (LOForEach)peer;
-                        if(peerForeach.hasFlatten().first) {
-                            return false;
+            if (peers != null){
+                for(LogicalOperator peer: peers) {
+                    if(!peer.equals(foreach)) {
+                        if(peer instanceof LOForEach) {
+                            LOForEach peerForeach = (LOForEach)peer;
+                            if(peerForeach.hasFlatten().first) {
+                                return false;
+                            }
                         }
                     }
                 }

Modified: hadoop/pig/branches/load-store-redesign/src/org/apache/pig/impl/logicalLayer/optimizer/StreamOptimizer.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/src/org/apache/pig/impl/logicalLayer/optimizer/StreamOptimizer.java?rev=883836&r1=883835&r2=883836&view=diff
==============================================================================
--- hadoop/pig/branches/load-store-redesign/src/org/apache/pig/impl/logicalLayer/optimizer/StreamOptimizer.java (original)
+++ hadoop/pig/branches/load-store-redesign/src/org/apache/pig/impl/logicalLayer/optimizer/StreamOptimizer.java Tue Nov 24 19:54:19 2009
@@ -210,7 +210,7 @@
             LogicalOperator lo = nodes.get(0);
             if (lo == null || !(lo instanceof LOStream)) {
                 throw new RuntimeException("Expected stream, got " +
-                    lo.getClass().getName());
+                    (lo == null ? lo : lo.getClass().getName()));
             }
             LOStream stream = (LOStream)lo;
             if(mOptimizeLoad) {

Modified: hadoop/pig/branches/load-store-redesign/src/org/apache/pig/impl/logicalLayer/parser/QueryParser.jjt
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/src/org/apache/pig/impl/logicalLayer/parser/QueryParser.jjt?rev=883836&r1=883835&r2=883836&view=diff
==============================================================================
--- hadoop/pig/branches/load-store-redesign/src/org/apache/pig/impl/logicalLayer/parser/QueryParser.jjt (original)
+++ hadoop/pig/branches/load-store-redesign/src/org/apache/pig/impl/logicalLayer/parser/QueryParser.jjt Tue Nov 24 19:54:19 2009
@@ -173,15 +173,14 @@
 	static int undollar(String s) {
 		return Integer.parseInt(s.substring(1, s.length()));	
 	}
-	
-
+	    
     Path getCurrentDir(PigContext pigContext) throws IOException {
         DataStorage dfs = pigContext.getDfs();
         ContainerDescriptor desc = dfs.getActiveContainer();
         ElementDescriptor el = dfs.asElement(desc);
         return new Path(el.toString());
     }
-	
+
 	LogicalOperator parseCogroup(ArrayList<CogroupInput> gis, LogicalPlan lp, LOCogroup.GROUPTYPE type) throws ParseException, PlanException{
 		
 		log.trace("Entering parseCogroup");
@@ -237,15 +236,6 @@
 	 */
 	LogicalOperator parseJoin(ArrayList<CogroupInput> gis, LogicalPlan lp, LOJoin.JOINTYPE jt) throws ParseException, PlanException{
 		log.trace("Entering parseJoin");
-		// Skewed Join behaves as regular join in local mode
-		if (pigContext.getExecType() == ExecType.LOCAL && jt == LOJoin.JOINTYPE.SKEWED) {
-			return rewriteJoin(gis,lp);
-		}
-
-      // Merge Join behaves as regular join in local mode
-		if (pigContext.getExecType() == ExecType.LOCAL && jt == LOJoin.JOINTYPE.MERGE) {
-            return rewriteJoin(gis,lp);
-        }
         
 		int n = gis.size();
 
@@ -709,6 +699,56 @@
             
             return output.toString() ;
         }
+        
+        public static String join(AbstractCollection<String> s, String delimiter) {
+            if (s.isEmpty()) return "";
+            Iterator<String> iter = s.iterator();
+            StringBuffer buffer = new StringBuffer(iter.next());
+            while (iter.hasNext()) {
+                buffer.append(delimiter);
+                buffer.append(iter.next());
+            }
+            return buffer.toString();
+        }
+        
+            
+        public static String[] getPathStrings(String commaSeparatedPaths) {
+            int length = commaSeparatedPaths.length();
+            int curlyOpen = 0;
+            int pathStart = 0;
+            boolean globPattern = false;
+            List<String> pathStrings = new ArrayList<String>();
+        
+            for (int i=0; i<length; i++) {
+                char ch = commaSeparatedPaths.charAt(i);
+                switch(ch) {
+                    case '{' : {
+                        curlyOpen++;
+                        if (!globPattern) {
+                            globPattern = true;
+                        }
+                        break;
+                    }
+                    case '}' : {
+                        curlyOpen--;
+                        if (curlyOpen == 0 && globPattern) {
+                            globPattern = false;
+                        }
+                        break;
+                    }
+                    case ',' : {
+                        if (!globPattern) {
+                            pathStrings.add(commaSeparatedPaths.substring(pathStart, i));
+                            pathStart = i + 1 ;
+                        }
+                        break;
+                    }
+                }
+            }
+            pathStrings.add(commaSeparatedPaths.substring(pathStart, length));
+        
+            return pathStrings.toArray(new String[0]);
+        }
 }
 
 class FunctionType {
@@ -879,19 +919,19 @@
 	)
 	{ 
 		if(null != root) {
-            log.debug("Adding " + root.getAlias() + " " + root + " to the lookup table " + aliases);
-
-            //Translate all the project(*) leaves in the plan to a sequence of projections
-            ProjectStarTranslator translate = new ProjectStarTranslator(lp);
-            translate.visit();
-
-            addLogicalPlan(root, lp);
-
             try {
-			    log.debug("Root: " + root.getClass().getName() + " schema: " + root.getSchema());
+                log.debug("Adding " + root.getAlias() + " " + root + " to the lookup table " + aliases);
+    
+                //Translate all the project(*) leaves in the plan to a sequence of projections
+                ProjectStarTranslator translate = new ProjectStarTranslator(lp);
+                translate.visit();
+    
+                addLogicalPlan(root, lp);
+            
+                log.debug("Root: " + root.getClass().getName() + " schema: " + root.getSchema());
             } catch(FrontendException fee) {
-            	ParseException pe = new ParseException(fee.getMessage());
-            	pe.initCause(fee);  
+                ParseException pe = new ParseException(fee.getMessage());
+                pe.initCause(fee);  
                 throw pe;
             }
 		}
@@ -1157,7 +1197,14 @@
     )
 |   (<STORE> op = StoreClause(lp))
 	)
-    [<PARALLEL> t2=<INTEGER> { op.setRequestedParallelism(Integer.parseInt(t2.image));} ]
+    [<PARALLEL> t2=<INTEGER> {
+      // In Local Mode we can only use one reducer
+    	if( this.pigContext.getExecType() == ExecType.LOCAL ) {
+    		op.setRequestedParallelism(1);
+    	} else {
+    		op.setRequestedParallelism(Integer.parseInt(t2.image));
+    	}
+    } ]
 	)	
 	{log.trace("Exiting BaseExpr"); return op;}
 }
@@ -1944,21 +1991,18 @@
 	// For all types of join we create LOJoin and mark what type of join it is.
 	(
 		[<USING> ("\"replicated\"" { 
-	                if(isOuter) {
-	                    throw new ParseException("Replicated join does not support (left|right|full) outer joins");
-	                }
+	          if(isFullOuter || isRightOuter) {
+	              throw new ParseException("Replicated join does not support (right|full) outer joins");
+	          }
 				    frj = parseJoin(gis, lp, LOJoin.JOINTYPE.REPLICATED); 
 			    }
 			| "\"repl\"" { 
-				    if(isOuter) {
-                        throw new ParseException("Replicated join does not support (left|right|full) outer joins");
-                    }
+				    if(isFullOuter || isRightOuter) {
+	                    throw new ParseException("Replicated join does not support (right|full) outer joins");
+	          }
 				    frj=parseJoin(gis, lp, LOJoin.JOINTYPE.REPLICATED);
 				}
-		    |"\"skewed\"" { 
-		    	    if(isOuter) {
-                        throw new ParseException("Skewed join does not support (left|right|full) outer joins");
-                    }
+		    |"\"skewed\"" {
 		    	    skj = parseJoin(gis, lp, LOJoin.JOINTYPE.SKEWED); 
 		    	}
 		    |"\"merge\"" { 

Modified: hadoop/pig/branches/load-store-redesign/src/org/apache/pig/impl/logicalLayer/schema/Schema.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/src/org/apache/pig/impl/logicalLayer/schema/Schema.java?rev=883836&r1=883835&r2=883836&view=diff
==============================================================================
--- hadoop/pig/branches/load-store-redesign/src/org/apache/pig/impl/logicalLayer/schema/Schema.java (original)
+++ hadoop/pig/branches/load-store-redesign/src/org/apache/pig/impl/logicalLayer/schema/Schema.java Tue Nov 24 19:54:19 2009
@@ -116,7 +116,7 @@
          * request. In order to ensure unique and consistent names, across
          * all field schema objects, the object is made static.
          */
-        public static CanonicalNamer canonicalNamer = new CanonicalNamer();
+        public static final CanonicalNamer canonicalNamer = new CanonicalNamer();
         
         private static Log log = LogFactory.getLog(Schema.FieldSchema.class);
 

Modified: hadoop/pig/branches/load-store-redesign/src/org/apache/pig/impl/logicalLayer/validators/TypeCheckingVisitor.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/src/org/apache/pig/impl/logicalLayer/validators/TypeCheckingVisitor.java?rev=883836&r1=883835&r2=883836&view=diff
==============================================================================
--- hadoop/pig/branches/load-store-redesign/src/org/apache/pig/impl/logicalLayer/validators/TypeCheckingVisitor.java (original)
+++ hadoop/pig/branches/load-store-redesign/src/org/apache/pig/impl/logicalLayer/validators/TypeCheckingVisitor.java Tue Nov 24 19:54:19 2009
@@ -81,7 +81,7 @@
     
     private String currentAlias = null;
     
-    public static MultiMap<Byte, Byte> castLookup = new MultiMap<Byte, Byte>();
+    public static final MultiMap<Byte, Byte> castLookup = new MultiMap<Byte, Byte>();
     static{
         //Ordering here decides the score for the best fit function.
         //Do not change the order. Conversions to a smaller type is preferred

Modified: hadoop/pig/branches/load-store-redesign/src/org/apache/pig/impl/plan/Operator.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/src/org/apache/pig/impl/plan/Operator.java?rev=883836&r1=883835&r2=883836&view=diff
==============================================================================
--- hadoop/pig/branches/load-store-redesign/src/org/apache/pig/impl/plan/Operator.java (original)
+++ hadoop/pig/branches/load-store-redesign/src/org/apache/pig/impl/plan/Operator.java Tue Nov 24 19:54:19 2009
@@ -94,13 +94,10 @@
      */
     @Override
     public boolean equals(Object obj) {
-        if(obj instanceof Operator){
-            Operator opObj = (Operator)obj;
-            if(obj==this)
-                return true;
-            return mKey.equals(opObj);
-        }
-        return false;
+        if(obj==this)
+            return true;
+        else
+            return false;
     }
     
     /**

Modified: hadoop/pig/branches/load-store-redesign/src/org/apache/pig/impl/plan/OperatorPlan.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/src/org/apache/pig/impl/plan/OperatorPlan.java?rev=883836&r1=883835&r2=883836&view=diff
==============================================================================
--- hadoop/pig/branches/load-store-redesign/src/org/apache/pig/impl/plan/OperatorPlan.java (original)
+++ hadoop/pig/branches/load-store-redesign/src/org/apache/pig/impl/plan/OperatorPlan.java Tue Nov 24 19:54:19 2009
@@ -74,7 +74,7 @@
 
     private List<E> mRoots;
     private List<E> mLeaves;
-    protected static Log log = LogFactory.getLog(OperatorPlan.class);
+    protected static final Log log = LogFactory.getLog(OperatorPlan.class);
     
     public OperatorPlan() {
         mRoots = new ArrayList<E>();

Modified: hadoop/pig/branches/load-store-redesign/src/org/apache/pig/impl/streaming/ExecutableManager.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/src/org/apache/pig/impl/streaming/ExecutableManager.java?rev=883836&r1=883835&r2=883836&view=diff
==============================================================================
--- hadoop/pig/branches/load-store-redesign/src/org/apache/pig/impl/streaming/ExecutableManager.java (original)
+++ hadoop/pig/branches/load-store-redesign/src/org/apache/pig/impl/streaming/ExecutableManager.java Tue Nov 24 19:54:19 2009
@@ -347,12 +347,12 @@
                     // We should receive an EOP only when *ALL* input
                     // for this process has already been sent and no
                     // more input is expected
-                    if (inp.returnStatus == POStatus.STATUS_EOP) {
+                    if (inp != null && inp.returnStatus == POStatus.STATUS_EOP) {
                         // signal cleanup in ExecutableManager
                         close();
                         return;
                     }
-                    if (inp.returnStatus == POStatus.STATUS_OK) {
+                    if (inp != null && inp.returnStatus == POStatus.STATUS_OK) {
                         // Check if there was a problem with the managed process
                         if (outerrThreadsError != null) {
                             throw new IOException(

Modified: hadoop/pig/branches/load-store-redesign/src/org/apache/pig/impl/streaming/StreamingCommand.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/src/org/apache/pig/impl/streaming/StreamingCommand.java?rev=883836&r1=883835&r2=883836&view=diff
==============================================================================
--- hadoop/pig/branches/load-store-redesign/src/org/apache/pig/impl/streaming/StreamingCommand.java (original)
+++ hadoop/pig/branches/load-store-redesign/src/org/apache/pig/impl/streaming/StreamingCommand.java Tue Nov 24 19:54:19 2009
@@ -502,10 +502,16 @@
         }
         
         public boolean equals(Object obj) {
-          HandleSpec other = (HandleSpec)obj;
-          return (other != null && name.equals(other.name) && spec.equals(other.spec));
+            if (obj instanceof HandleSpec){
+                HandleSpec other = (HandleSpec)obj;
+                return (other != null && name.equals(other.name) && spec.equals(other.spec));
+            } else 
+                return false;
         }
 
+        public int hashCode() {
+            return name.hashCode();
+        }
 
         public Object clone() {
           try {

Added: hadoop/pig/branches/load-store-redesign/src/org/apache/pig/impl/util/CompilerUtils.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/src/org/apache/pig/impl/util/CompilerUtils.java?rev=883836&view=auto
==============================================================================
--- hadoop/pig/branches/load-store-redesign/src/org/apache/pig/impl/util/CompilerUtils.java (added)
+++ hadoop/pig/branches/load-store-redesign/src/org/apache/pig/impl/util/CompilerUtils.java Tue Nov 24 19:54:19 2009
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.pig.impl.util;
+
+import java.util.ArrayList;
+import java.util.List;
+
+
+import org.apache.pig.EvalFunc;
+import org.apache.pig.FuncSpec;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.ConstantExpression;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.POBinCond;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.POProject;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.POUserFunc;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
+import org.apache.pig.builtin.IsEmpty;
+import org.apache.pig.data.DataBag;
+import org.apache.pig.data.DataType;
+import org.apache.pig.data.NonSpillableDataBag;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.data.TupleFactory;
+import org.apache.pig.impl.PigContext;
+import org.apache.pig.impl.logicalLayer.schema.Schema;
+import org.apache.pig.impl.plan.NodeIdGenerator;
+import org.apache.pig.impl.plan.OperatorKey;
+import org.apache.pig.impl.plan.PlanException;
+
+/* 
+ * A class to add util functions that gets used by LogToPhyTranslator and MRCompiler
+ * 
+ */
+public class CompilerUtils {
+
+    public static void addEmptyBagOuterJoin(PhysicalPlan fePlan, Schema inputSchema) throws PlanException {
+        // we currently have POProject[bag] as the only operator in the plan
+        // If the bag is an empty bag, we should replace
+        // it with a bag with one tuple with null fields so that when we flatten
+        // we do not drop records (flatten will drop records if the bag is left
+        // as an empty bag) and actually project nulls for the fields in 
+        // the empty bag
+        
+        // So we need to get to the following state:
+        // POProject[Bag]
+        //         \     
+        //    POUserFunc["IsEmpty()"] Const[Bag](bag with null fields)   
+        //                        \      |    POProject[Bag]             
+        //                         \     |    /
+        //                          POBinCond
+        POProject relationProject = (POProject) fePlan.getRoots().get(0);
+        try {
+            
+            // condition of the bincond
+            POProject relationProjectForIsEmpty = relationProject.clone();
+            fePlan.add(relationProjectForIsEmpty);
+            String scope = relationProject.getOperatorKey().scope;
+            FuncSpec isEmptySpec = new FuncSpec(IsEmpty.class.getName());
+            Object f = PigContext.instantiateFuncFromSpec(isEmptySpec);
+            POUserFunc isEmpty = new POUserFunc(new OperatorKey(scope, NodeIdGenerator.getGenerator().
+                        getNextNodeId(scope)), -1, null, isEmptySpec, (EvalFunc) f);
+            isEmpty.setResultType(DataType.BOOLEAN);
+            fePlan.add(isEmpty);
+            fePlan.connect(relationProjectForIsEmpty, isEmpty);
+            
+            // lhs of bincond (const bag with null fields)
+            ConstantExpression ce = new ConstantExpression(new OperatorKey(scope,
+                    NodeIdGenerator.getGenerator().getNextNodeId(scope)));
+            // the following should give a tuple with the
+            // required number of nulls
+            Tuple t = TupleFactory.getInstance().newTuple(inputSchema.size());
+            for(int i = 0; i < inputSchema.size(); i++) {
+                t.set(i, null);
+            }
+            List<Tuple> bagContents = new ArrayList<Tuple>(1);
+            bagContents.add(t);
+            DataBag bg = new NonSpillableDataBag(bagContents);
+            ce.setValue(bg);
+            ce.setResultType(DataType.BAG);
+            //this operator doesn't have any predecessors
+            fePlan.add(ce);
+            
+            //rhs of bincond is the original project
+            // let's set up the bincond now
+            POBinCond bincond = new POBinCond(new OperatorKey(scope,
+                    NodeIdGenerator.getGenerator().getNextNodeId(scope)));
+            bincond.setCond(isEmpty);
+            bincond.setLhs(ce);
+            bincond.setRhs(relationProject);
+            bincond.setResultType(DataType.BAG);
+            fePlan.add(bincond);
+
+            fePlan.connect(isEmpty, bincond);
+            fePlan.connect(ce, bincond);
+            fePlan.connect(relationProject, bincond);
+
+        } catch (Exception e) {
+            throw new PlanException("Error setting up outerjoin", e);
+        }
+    	
+    }
+
+}

Modified: hadoop/pig/branches/load-store-redesign/src/org/apache/pig/impl/util/JarManager.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/src/org/apache/pig/impl/util/JarManager.java?rev=883836&r1=883835&r2=883836&view=diff
==============================================================================
--- hadoop/pig/branches/load-store-redesign/src/org/apache/pig/impl/util/JarManager.java (original)
+++ hadoop/pig/branches/load-store-redesign/src/org/apache/pig/impl/util/JarManager.java Tue Nov 24 19:54:19 2009
@@ -120,10 +120,9 @@
             // log.error("Adding extra " + pigContext.extraJars.get(i));
             mergeJar(jarFile, pigContext.extraJars.get(i), null, contents);
         }
-        if (pigContext != null) {
-            jarFile.putNextEntry(new ZipEntry("pigContext"));
-            new ObjectOutputStream(jarFile).writeObject(pigContext);
-        }
+
+        jarFile.putNextEntry(new ZipEntry("pigContext"));
+        new ObjectOutputStream(jarFile).writeObject(pigContext);
         jarFile.close();
     }
 

Added: hadoop/pig/branches/load-store-redesign/src/org/apache/pig/impl/util/UDFContext.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/src/org/apache/pig/impl/util/UDFContext.java?rev=883836&view=auto
==============================================================================
--- hadoop/pig/branches/load-store-redesign/src/org/apache/pig/impl/util/UDFContext.java (added)
+++ hadoop/pig/branches/load-store-redesign/src/org/apache/pig/impl/util/UDFContext.java Tue Nov 24 19:54:19 2009
@@ -0,0 +1,177 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.impl.util;
+
+import java.io.IOException;
+//import java.io.Serializable;
+import java.util.HashMap;
+import java.util.Properties;
+
+import org.apache.hadoop.conf.Configuration;
+
+import org.apache.pig.impl.util.ObjectSerializer;
+
+public class UDFContext {
+    
+    private Configuration jconf = null;
+    private HashMap<Integer, Properties> udfConfs;
+
+    private static UDFContext self = null;
+
+    private UDFContext() {
+        udfConfs = new HashMap<Integer, Properties>();
+    }
+
+    public static UDFContext getUDFContext() {
+        if (self == null) {
+            self = new UDFContext();
+        }
+        return self;
+    }
+
+    /**
+     * Adds the JobConf to this singleton.  Will be 
+     * called on the backend by the Map and Reduce 
+     * functions so that UDFs can obtain the JobConf
+     * on the backend.
+     */
+    public void addJobConf(Configuration conf) {
+        jconf = conf;
+    }
+
+    /**
+     * Get the JobConf.  This should only be called on
+     * the backend.  It will return null on the frontend.
+     * @return JobConf for this job.  This is a copy of the
+     * JobConf.  Nothing written here will be kept by the system.
+     * getUDFConf should be used for recording UDF specific
+     * information.
+     */
+    public Configuration getJobConf() {
+        if (jconf != null)  return new Configuration(jconf);
+        else return null;
+    }
+
+    /**
+     * Get a properties object that is specific to this UDF.
+     * Note that if a given UDF is called multiple times in a script, 
+     * and each instance passes different arguments, then each will
+     * be provided with different configuration object.
+     * This can be used by loaders to pass their input object path
+     * or URI and separate themselves from other instances of the
+     * same loader.  Constructor arguments could also be used,
+     * as they are available on both the front and back end.
+     *
+     * Note that this can only be used to share information
+     * across instantiations of the same function in the front end
+     * and between front end and back end.  It cannot be used to
+     * share information between instantiations (that is, between
+     * map and/or reduce instances) on the back end at runtime.
+     * @param c of the UDF obtaining the properties object.
+     * @param args String arguments that make this instance of
+     * the UDF unique.
+     * @return A reference to the properties object specific to
+     * the calling UDF.  This is a reference, not a copy.
+     * Any changes to this object will automatically be 
+     * propogated to other instances of the UDF calling this 
+     * function.
+     */
+    
+    @SuppressWarnings("unchecked")
+    public Properties getUDFProperties(Class c, String[] args) {
+        Integer k = generateKey(c, args);
+        Properties p = udfConfs.get(k);
+        if (p == null) {
+            p = new Properties();
+            udfConfs.put(k, p);
+        }
+        return p;
+    }
+    
+     /**
+     * Get a properties object that is specific to this UDF.
+     * Note that if a given UDF is called multiple times in a script, 
+     * they will all be provided the same configuration object.  It
+     * is up to the UDF to make sure the multiple instances do not
+     * stomp on each other.
+     *
+     * It is guaranteed that this properties object will be separate
+     * from that provided to any other UDF.
+     *
+     * Note that this can only be used to share information
+     * across instantiations of the same function in the front end
+     * and between front end and back end.  It cannot be used to
+     * share information between instantiations (that is, between
+     * map and/or reduce instances) on the back end at runtime.
+     * @param c of the UDF obtaining the properties object.
+     * @return A reference to the properties object specific to
+     * the calling UDF.  This is a reference, not a copy.
+     * Any changes to this object will automatically be 
+     * propogated to other instances of the UDF calling this 
+     * function.
+     */
+    @SuppressWarnings("unchecked")
+    public Properties getUDFProperties(Class c) {
+        Integer k = generateKey(c);
+        Properties p = udfConfs.get(k);
+        if (p == null) {
+            p = new Properties();
+            udfConfs.put(k, p);
+        }
+        return p;
+    }
+    
+    /**
+     * Serialize the UDF specific information into an instance
+     * of JobConf.  This function is intended to be called on
+     * the front end in preparation for sending the data to the
+     * backend.
+     * @param conf JobConf to serialize into
+     * @throws IOException if underlying serialization throws it
+     */
+    public void serialize(Configuration conf) throws IOException {
+        conf.set("pig.UDFContext", ObjectSerializer.serialize(udfConfs));
+    }
+    
+    /**
+     * Populate the udfConfs field.  This function is intended to
+     * be called by Map.configure or Reduce.configure on the backend.
+     * It assumes that addJobConf has already been called.
+     * @throws IOException if underlying deseralization throws it
+     */
+    @SuppressWarnings("unchecked")
+    public void deserialize() throws IOException {  
+        udfConfs = (HashMap<Integer, Properties>)ObjectSerializer.deserialize(jconf.get("pig.UDFContext"));
+    }
+    
+    @SuppressWarnings("unchecked")
+    private int generateKey(Class c) {
+        return c.getName().hashCode();
+    }
+    
+    @SuppressWarnings("unchecked")
+    private int generateKey(Class c, String[] args) {
+        int hc = c.getName().hashCode();
+        for (int i = 0; i < args.length; i++) {
+            hc <<= 1;
+            hc ^= args[i].hashCode();
+        }
+        return hc;
+    }
+    
+}
\ No newline at end of file

Modified: hadoop/pig/branches/load-store-redesign/src/org/apache/pig/tools/pigstats/PigStats.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/src/org/apache/pig/tools/pigstats/PigStats.java?rev=883836&r1=883835&r2=883836&view=diff
==============================================================================
--- hadoop/pig/branches/load-store-redesign/src/org/apache/pig/tools/pigstats/PigStats.java (original)
+++ hadoop/pig/branches/load-store-redesign/src/org/apache/pig/tools/pigstats/PigStats.java Tue Nov 24 19:54:19 2009
@@ -18,8 +18,11 @@
 
 package org.apache.pig.tools.pigstats;
 
+import java.io.BufferedReader;
 import java.io.ByteArrayOutputStream;
 import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.FileReader;
 import java.io.IOException;
 import java.io.PrintStream;
 import java.util.ArrayList;
@@ -41,7 +44,6 @@
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStore;
-import org.apache.pig.backend.local.executionengine.physicalLayer.counters.POCounter;
 import org.apache.pig.impl.util.ObjectSerializer;
 
 public class PigStats {
@@ -54,6 +56,8 @@
     ArrayList<String> rootJobIDs = new ArrayList<String>();
     ExecType mode;
     
+    private static final String localModeDataFile = "part-00000";
+    
     public void setMROperatorPlan(MROperPlan mrp) {
         this.mrp = mrp;
     }
@@ -99,11 +103,25 @@
         //The counter placed before a store in the local plan should be able to get the number of records
         for(PhysicalOperator op : php.getLeaves()) {
             Map<String, String> jobStats = new HashMap<String, String>();
-            stats.put(op.toString(), jobStats);
-            POCounter counter = (POCounter) php.getPredecessors(op).get(0);
-            jobStats.put("PIG_STATS_LOCAL_OUTPUT_RECORDS", (Long.valueOf(counter.getCount())).toString());
+            stats.put(op.toString(), jobStats);         
             String localFilePath=normalizeToLocalFilePath(((POStore)op).getSFile().getFileName());
-            jobStats.put("PIG_STATS_LOCAL_BYTES_WRITTEN", (Long.valueOf(new File(localFilePath).length())).toString());
+            File outputFile = new File( localFilePath + File.separator + localModeDataFile );
+            
+            long lineCounter = 0;
+            try {
+                BufferedReader in = new BufferedReader(new FileReader( outputFile ));
+                @SuppressWarnings("unused")
+                String tmpString = null;
+                while( (tmpString = in.readLine()) != null ) {
+                    lineCounter++;
+                }
+                in.close();
+            } catch (FileNotFoundException e) {
+            } catch (IOException e) {                
+            } finally {
+                jobStats.put("PIG_STATS_LOCAL_OUTPUT_RECORDS", (Long.valueOf(lineCounter)).toString());
+            }            
+            jobStats.put("PIG_STATS_LOCAL_BYTES_WRITTEN", (Long.valueOf(outputFile.length())).toString());
         }
         return stats;
     }
@@ -266,10 +284,10 @@
     }
     
     public long getBytesWritten() {
-    	if(mode == ExecType.LOCAL) {
-    		return getLocalBytesWritten();
-    	} else if(mode == ExecType.MAPREDUCE) {
-    		return getMapReduceBytesWritten();
+        if(mode == ExecType.LOCAL) {           
+            return getLocalBytesWritten(); 
+    	} else if( mode == ExecType.MAPREDUCE ) {
+    	    return getMapReduceBytesWritten();
     	} else {
     		throw new RuntimeException("Unrecognized mode. Either MapReduce or Local mode expected.");
     	}

Modified: hadoop/pig/branches/load-store-redesign/test/findbugsExcludeFile.xml
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/test/findbugsExcludeFile.xml?rev=883836&r1=883835&r2=883836&view=diff
==============================================================================
--- hadoop/pig/branches/load-store-redesign/test/findbugsExcludeFile.xml (original)
+++ hadoop/pig/branches/load-store-redesign/test/findbugsExcludeFile.xml Tue Nov 24 19:54:19 2009
@@ -37,6 +37,9 @@
         <Bug pattern="EI_EXPOSE_REP2" />
     </Match>
     <Match>
+        <Bug pattern="DP_CREATE_CLASSLOADER_INSIDE_DO_PRIVILEGED" />
+    </Match>
+    <Match>
         <Class name="org.apache.pig.tools.parameters.Token" />
     </Match>
     <Match>
@@ -138,14 +141,43 @@
         <Bug pattern="SE_TRANSIENT_FIELD_NOT_RESTORED" />
     </Match>
     <Match>
+        <Class name="org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigMapReduce" />
+        <Field name="sJobConf" />
+        <Bug pattern="MS_CANNOT_BE_FINAL" />
+    </Match>
+    <Match>
+        <Class name="org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator" />
+        <Field name="reporter" />
+        <Bug pattern="MS_CANNOT_BE_FINAL" />
+    </Match>
+    <Match>
+        <Class name="org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator" />
+        <Field name="pigLogger" />
+        <Bug pattern="MS_PKGPROTECT" />
+    </Match>
+    <Match>
+        <Class name="org.apache.pig.impl.logicalLayer.LogicalPlanCloneHelper" />
+        <Field name="mOpToCloneMap" />
+        <Bug pattern="MS_PKGPROTECT" />
+    </Match>
+    <Match>
         <Class name="org.apache.pig.impl.util.SpillableMemoryManager" />
         <Bug pattern="DM_GC" />
     </Match>
     <Match>
+        <Class name="org.apache.pig.impl.logicalLayer.LogicalPlanBuilder" />
+        <Field name="classloader" />
+        <Bug pattern="MS_PKGPROTECT" />
+    </Match>
+    <Match>
         <Class name="org.apache.pig.data.DistinctDataBag$DistinctDataBagIterator$TContainer" />
         <Bug pattern="SIC_INNER_SHOULD_BE_STATIC" />
     </Match>
     <Match>
+        <Class name="org.apache.pig.data.InternalDistinctBag$DistinctDataBagIterator$TContainer" />
+        <Bug pattern="SIC_INNER_SHOULD_BE_STATIC" />
+    </Match>
+    <Match>
         <Bug pattern="BC_BAD_CAST_TO_CONCRETE_COLLECTION" />
     </Match>
     <!-- This Tuple classes are not used -->
@@ -236,5 +268,61 @@
         <Method name = "sendMTFValues" />
         <Bug pattern="IM_BAD_CHECK_FOR_ODD" />
     </Match>
+    <Match>
+        <Class name="org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigHadoopLogger" />
+        <Bug pattern="UG_SYNC_SET_UNSYNC_GET" />
+    </Match>
+    <Match>
+        <Class name="org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStream" />
+        <Bug pattern="IS2_INCONSISTENT_SYNC" />
+    </Match>
+    <Match>
+        <Class name="org.apache.pig.impl.builtin.DefaultIndexableLoader" />
+        <Bug pattern="UWF_NULL_FIELD" />
+    </Match>
+    <Match>
+        <Class name="org.apache.pig.impl.builtin.MergeJoinIndexer" />
+        <Bug pattern="UWF_NULL_FIELD" />
+    </Match>
+    <Match>
+        <Class name="org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MRCompiler" />
+        <Bug pattern="NM_WRONG_PACKAGE" />
+    </Match>
+    <Match>
+        <Class name="org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.NoopStoreRemover$PhysicalRemover" />
+        <Bug pattern="NM_WRONG_PACKAGE" />
+    </Match>
+    <Match>
+        <Class name="org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PhyPlanSetter" />
+        <Bug pattern="NM_WRONG_PACKAGE" />
+    </Match>
+    <Match>
+        <Class name="org.apache.pig.impl.logicalLayer.schema.Schema" />
+        <Method name = "equals" />
+        <Bug pattern="NP_NULL_ON_SOME_PATH" />
+    </Match>
+    <Match>
+        <Class name="org.apache.pig.impl.logicalLayer.schema.Schema$FieldSchema" />
+        <Method name = "equals" />
+        <Bug pattern="NP_NULL_ON_SOME_PATH" />
+    </Match>
+    <Match>
+        <Class name="org.apache.pig.impl.logicalLayer.optimizer.StreamOptimizer" />
+        <Method name = "transform" />
+        <Bug pattern="NP_LOAD_OF_KNOWN_NULL_VALUE" />
+    </Match>
+    <Match>
+        <Class name="org.apache.pig.impl.logicalLayer.LogicalPlanBuilder" />
+        <Field name = "classloader" />
+        <Bug pattern="MS_CANNOT_BE_FINAL" />
+    </Match>
+    <Match>
+        <Bug pattern="RV_ABSOLUTE_VALUE_OF_HASHCODE" />
+    </Match>
+    <Match>
+        <Class name="org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.ConstantExpression" />
+        <Field name = "res" />
+        <Bug pattern="MF_CLASS_MASKS_FIELD" />
+    </Match>
     
 </FindBugsFilter>

Added: hadoop/pig/branches/load-store-redesign/test/hbase-site.xml
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/test/hbase-site.xml?rev=883836&view=auto
==============================================================================
--- hadoop/pig/branches/load-store-redesign/test/hbase-site.xml (added)
+++ hadoop/pig/branches/load-store-redesign/test/hbase-site.xml Tue Nov 24 19:54:19 2009
@@ -0,0 +1,137 @@
+<?xml version="1.0"?>
+<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
+<!--
+/**
+ * Copyright 2007 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+-->
+<configuration>
+  <property>
+    <name>hbase.regionserver.msginterval</name>
+    <value>1000</value>
+    <description>Interval between messages from the RegionServer to HMaster
+    in milliseconds.  Default is 15. Set this value low if you want unit
+    tests to be responsive.
+    </description>
+  </property>
+  <property>
+    <name>hbase.client.pause</name>
+    <value>5000</value>
+    <description>General client pause value.  Used mostly as value to wait
+    before running a retry of a failed get, region lookup, etc.</description>
+  </property>
+  <property>
+    <name>hbase.master.meta.thread.rescanfrequency</name>
+    <value>10000</value>
+    <description>How long the HMaster sleeps (in milliseconds) between scans of
+    the root and meta tables.
+    </description>
+  </property>
+  <property>
+    <name>hbase.server.thread.wakefrequency</name>
+    <value>1000</value>
+    <description>Time to sleep in between searches for work (in milliseconds).
+    Used as sleep interval by service threads such as META scanner and log roller.
+    </description>
+  </property>
+  <property>
+    <name>hbase.regionserver.handler.count</name>
+    <value>5</value>
+    <description>Count of RPC Server instances spun up on RegionServers
+    Same property is used by the HMaster for count of master handlers.
+    Default is 10.
+    </description>
+  </property>
+  <property>
+    <name>hbase.master.lease.period</name>
+    <value>6000</value>
+    <description>Length of time the master will wait before timing out a region
+    server lease. Since region servers report in every second (see above), this
+    value has been reduced so that the master will notice a dead region server
+    sooner. The default is 30 seconds.
+    </description>
+  </property>
+  <property>
+    <name>hbase.master.info.port</name>
+    <value>-1</value>
+    <description>The port for the hbase master web UI
+    Set to -1 if you do not want the info server to run.
+    </description>
+  </property>
+  <property>
+    <name>hbase.regionserver.info.port</name>
+    <value>-1</value>
+    <description>The port for the hbase regionserver web UI
+    Set to -1 if you do not want the info server to run.
+    </description>
+  </property>
+  <property>
+    <name>hbase.regionserver.info.port.auto</name>
+    <value>true</value>
+    <description>Info server auto port bind. Enables automatic port
+    search if hbase.regionserver.info.port is already in use.
+    Enabled for testing to run multiple tests on one machine.
+    </description>
+  </property>
+  <property>
+    <name>hbase.master.lease.thread.wakefrequency</name>
+    <value>3000</value>
+    <description>The interval between checks for expired region server leases.
+    This value has been reduced due to the other reduced values above so that
+    the master will notice a dead region server sooner. The default is 15 seconds.
+    </description>
+  </property>
+  <property>
+    <name>hbase.regionserver.optionalcacheflushinterval</name>
+    <value>10000</value>
+    <description>
+    Amount of time to wait since the last time a region was flushed before
+    invoking an optional cache flush. Default 60,000.
+    </description>
+  </property>
+  <property>
+    <name>hbase.regionserver.safemode</name>
+    <value>false</value>
+    <description>
+    Turn on/off safe mode in region server. Always on for production, always off
+    for tests.
+    </description>
+  </property>
+  <property>
+    <name>hbase.hregion.max.filesize</name>
+    <value>67108864</value>
+    <description>
+    Maximum desired file size for an HRegion.  If filesize exceeds
+    value + (value / 2), the HRegion is split in two.  Default: 256M.
+
+    Keep the maximum filesize small so we split more often in tests.
+    </description>
+  </property>
+  <property>
+    <name>hadoop.log.dir</name>
+    <value>${user.dir}/../logs</value>
+  </property>
+  <property>
+    <name>hbase.zookeeper.property.clientPort</name>
+    <value>21810</value>
+    <description>Property from ZooKeeper's config zoo.cfg.
+    The port at which the clients will connect.
+    </description>
+  </property>
+</configuration>

Added: hadoop/pig/branches/load-store-redesign/test/org/apache/pig/test/TestAccumulator.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/test/org/apache/pig/test/TestAccumulator.java?rev=883836&view=auto
==============================================================================
--- hadoop/pig/branches/load-store-redesign/test/org/apache/pig/test/TestAccumulator.java (added)
+++ hadoop/pig/branches/load-store-redesign/test/org/apache/pig/test/TestAccumulator.java Tue Nov 24 19:54:19 2009
@@ -0,0 +1,430 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.test;
+
+import java.io.*;
+import java.util.HashMap;
+import java.util.Iterator;
+import junit.framework.TestCase;
+
+import org.apache.pig.ExecType;
+import org.apache.pig.PigServer;
+import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.data.Tuple;
+import org.junit.After;
+import org.junit.Before;
+
+public class TestAccumulator extends TestCase{
+    private static final String INPUT_FILE = "AccumulatorInput.txt";
+    private static final String INPUT_FILE2 = "AccumulatorInput2.txt";
+    private static final String INPUT_FILE3 = "AccumulatorInput3.txt";
+ 
+    private PigServer pigServer;
+    private MiniCluster cluster = MiniCluster.buildCluster();
+    
+    public TestAccumulator() throws ExecException, IOException{
+        pigServer = new PigServer(ExecType.MAPREDUCE, cluster.getProperties());
+        // pigServer = new PigServer(ExecType.LOCAL);
+        pigServer.getPigContext().getProperties().setProperty("pig.accumulative.batchsize", "2");     
+        pigServer.getPigContext().getProperties().setProperty("pig.exec.batchsize", "2");
+    }
+    
+    @Before
+    public void setUp() throws Exception {
+        createFiles();
+    }
+
+    private void createFiles() throws IOException {
+        PrintWriter w = new PrintWriter(new FileWriter(INPUT_FILE));
+                
+        w.println("100\tapple");    	    	
+        w.println("200\torange");    	
+        w.println("300\tstrawberry");    	
+        w.println("300\tpear");
+        w.println("100\tapple");
+        w.println("300\tpear");
+        w.println("400\tapple");    
+        w.close();   
+        
+        Util.copyFromLocalToCluster(cluster, INPUT_FILE, INPUT_FILE);
+        
+        w = new PrintWriter(new FileWriter(INPUT_FILE2));
+        
+        w.println("100\t");    	
+        w.println("100\t");
+        w.println("200\t");    	
+        w.println("200\t");    	
+        w.println("300\tstrawberry");
+        w.close();   
+        
+        Util.copyFromLocalToCluster(cluster, INPUT_FILE2, INPUT_FILE2);
+        
+        w = new PrintWriter(new FileWriter(INPUT_FILE3));
+        
+        w.println("100\t1.0");    	
+        w.println("100\t2.0");
+        w.println("200\t1.1");    	
+        w.println("200\t2.1");
+        w.println("100\t3.0");    	
+        w.println("100\t4.0");
+        w.println("200\t3.1");
+        w.println("100\t5.0");
+        w.println("300\t3.3");
+        w.close();   
+        
+        Util.copyFromLocalToCluster(cluster, INPUT_FILE3, INPUT_FILE3);
+    }
+    
+    @After
+    public void tearDown() throws Exception {
+        new File(INPUT_FILE).delete();
+        Util.deleteFile(cluster, INPUT_FILE);
+        new File(INPUT_FILE2).delete();
+        Util.deleteFile(cluster, INPUT_FILE2);
+        new File(INPUT_FILE3).delete();
+        Util.deleteFile(cluster, INPUT_FILE3);
+    }
+    
+    
+    public void testAccumBasic() throws IOException{
+        // test group by
+        pigServer.registerQuery("A = load '" + INPUT_FILE + "' as (id:int, fruit);");
+        pigServer.registerQuery("B = group A by id;");
+        pigServer.registerQuery("C = foreach B generate group,  org.apache.pig.test.utils.AccumulatorBagCount(A);");                     
+
+        HashMap<Integer, Integer> expected = new HashMap<Integer, Integer>();
+        expected.put(100, 2);
+        expected.put(200, 1);
+        expected.put(300, 3);
+        expected.put(400, 1);
+        
+                  
+        Iterator<Tuple> iter = pigServer.openIterator("C");
+        
+        while(iter.hasNext()) {
+            Tuple t = iter.next();
+            assertEquals(expected.get((Integer)t.get(0)), (Integer)t.get(1));                
+        }            
+        
+        pigServer.registerQuery("B = group A by id;");
+        pigServer.registerQuery("C = foreach B generate group,  " +
+                "org.apache.pig.test.utils.AccumulatorBagCount(A), org.apache.pig.test.utils.BagCount(A);");                     
+        
+        try{
+            iter = pigServer.openIterator("C");
+        
+            while(iter.hasNext()) {
+                Tuple t = iter.next();
+                assertEquals(expected.get((Integer)t.get(0)), (Integer)t.get(1));                
+            }      
+            fail("accumulator should not be called.");
+        }catch(IOException e) {
+            // should throw exception from AccumulatorBagCount.
+        }
+        
+        // test cogroup
+        pigServer.registerQuery("A = load '" + INPUT_FILE + "' as (id:int, fruit);");
+        pigServer.registerQuery("B = load '" + INPUT_FILE + "' as (id:int, fruit);");
+        pigServer.registerQuery("C = cogroup A by id, B by id;");
+        pigServer.registerQuery("D = foreach C generate group,  " +
+                "org.apache.pig.test.utils.AccumulatorBagCount(A), org.apache.pig.test.utils.AccumulatorBagCount(B);");                     
+
+        HashMap<Integer, String> expected2 = new HashMap<Integer, String>();
+        expected2.put(100, "2,2");
+        expected2.put(200, "1,1");
+        expected2.put(300, "3,3");
+        expected2.put(400, "1,1");
+        
+                  
+        iter = pigServer.openIterator("D");
+        
+        while(iter.hasNext()) {
+            Tuple t = iter.next();
+            assertEquals(expected2.get((Integer)t.get(0)), t.get(1).toString()+","+t.get(2).toString());                
+        }            
+    }      
+    
+    public void testAccumWithNegative() throws IOException{
+        pigServer.registerQuery("A = load '" + INPUT_FILE + "' as (id:int, fruit);");
+        pigServer.registerQuery("B = group A by id;");
+        pigServer.registerQuery("C = foreach B generate group,  -org.apache.pig.test.utils.AccumulatorBagCount(A);");                     
+
+        HashMap<Integer, Integer> expected = new HashMap<Integer, Integer>();
+        expected.put(100, -2);
+        expected.put(200, -1);
+        expected.put(300, -3);
+        expected.put(400, -1);
+        
+                  
+        Iterator<Tuple> iter = pigServer.openIterator("C");
+        
+        while(iter.hasNext()) {
+            Tuple t = iter.next();
+            assertEquals(expected.get((Integer)t.get(0)), (Integer)t.get(1));                
+        }            
+    }
+    
+    public void testAccumWithAdd() throws IOException{
+        pigServer.registerQuery("A = load '" + INPUT_FILE + "' as (id:int, fruit);");
+        pigServer.registerQuery("B = group A by id;");
+        pigServer.registerQuery("C = foreach B generate group,  org.apache.pig.test.utils.AccumulatorBagCount(A)+1.0;");                     
+        
+        {
+            HashMap<Integer, Double> expected = new HashMap<Integer, Double>();
+            expected.put(100, 3.0);
+            expected.put(200, 2.0);
+            expected.put(300, 4.0);
+            expected.put(400, 2.0);
+            
+                      
+            Iterator<Tuple> iter = pigServer.openIterator("C");
+            
+            while(iter.hasNext()) {
+                Tuple t = iter.next();
+                assertEquals(expected.get((Integer)t.get(0)), (Double)t.get(1));                
+            }                            
+        }
+        
+        {
+            pigServer.registerQuery("C = foreach B generate group,  " +
+            "org.apache.pig.test.utils.AccumulatorBagCount(A)+org.apache.pig.test.utils.AccumulatorBagCount(A);");                     
+
+            HashMap<Integer, Integer>expected = new HashMap<Integer, Integer>();
+            expected.put(100, 4);
+            expected.put(200, 2);
+            expected.put(300, 6);
+            expected.put(400, 2);
+    
+              
+            Iterator<Tuple> iter = pigServer.openIterator("C");
+    
+            while(iter.hasNext()) {
+                Tuple t = iter.next();
+                assertEquals(expected.get((Integer)t.get(0)), (Integer)t.get(1));                
+            }
+        }
+    }      
+    
+    public void testAccumWithMinus() throws IOException{
+        pigServer.registerQuery("A = load '" + INPUT_FILE + "' as (id:int, fruit);");
+        pigServer.registerQuery("B = group A by id;");
+        pigServer.registerQuery("C = foreach B generate group, " +
+                " org.apache.pig.test.utils.AccumulatorBagCount(A)*3.0-org.apache.pig.test.utils.AccumulatorBagCount(A);");                     
+
+        HashMap<Integer, Double> expected = new HashMap<Integer, Double>();
+        expected.put(100, 4.0);
+        expected.put(200, 2.0);
+        expected.put(300, 6.0);
+        expected.put(400, 2.0);
+        
+                  
+        Iterator<Tuple> iter = pigServer.openIterator("C");
+        
+        while(iter.hasNext()) {
+            Tuple t = iter.next();
+            assertEquals(expected.get((Integer)t.get(0)), (Double)t.get(1));                
+        }                                   
+    }              
+    
+    public void testAccumWithMod() throws IOException{
+        pigServer.registerQuery("A = load '" + INPUT_FILE + "' as (id:int, fruit);");
+        pigServer.registerQuery("B = group A by id;");
+        pigServer.registerQuery("C = foreach B generate group,  " +
+                "org.apache.pig.test.utils.AccumulatorBagCount(A) % 2;");                     
+
+        HashMap<Integer, Integer> expected = new HashMap<Integer, Integer>();
+        expected.put(100, 0);
+        expected.put(200, 1);
+        expected.put(300, 1);
+        expected.put(400, 1);
+        
+                  
+        Iterator<Tuple> iter = pigServer.openIterator("C");
+        
+        while(iter.hasNext()) {
+            Tuple t = iter.next();
+            assertEquals(expected.get((Integer)t.get(0)), (Integer)t.get(1));                
+        }                                   
+    }             
+    
+    public void testAccumWithDivide() throws IOException{
+        pigServer.registerQuery("A = load '" + INPUT_FILE + "' as (id:int, fruit);");
+        pigServer.registerQuery("B = group A by id;");
+        pigServer.registerQuery("C = foreach B generate group,  " +
+                "org.apache.pig.test.utils.AccumulatorBagCount(A)/2;");                     
+
+        HashMap<Integer, Integer> expected = new HashMap<Integer, Integer>();
+        expected.put(100, 1);
+        expected.put(200, 0);
+        expected.put(300, 1);
+        expected.put(400, 0);
+        
+                  
+        Iterator<Tuple> iter = pigServer.openIterator("C");
+        
+        while(iter.hasNext()) {
+            Tuple t = iter.next();
+            assertEquals(expected.get((Integer)t.get(0)), (Integer)t.get(1));                
+        }                                   
+    }        
+    
+    public void testAccumWithAnd() throws IOException{
+        pigServer.registerQuery("A = load '" + INPUT_FILE + "' as (id:int, fruit);");
+        pigServer.registerQuery("B = group A by id;");
+        pigServer.registerQuery("C = foreach B generate group,  " +
+                "((org.apache.pig.test.utils.AccumulatorBagCount(A)>1 and " +
+                "org.apache.pig.test.utils.AccumulatorBagCount(A)<3)?0:1);");                     
+
+        HashMap<Integer, Integer> expected = new HashMap<Integer, Integer>();
+        expected.put(100, 0);
+        expected.put(200, 1);
+        expected.put(300, 1);
+        expected.put(400, 1);
+        
+                  
+        Iterator<Tuple> iter = pigServer.openIterator("C");
+        
+        while(iter.hasNext()) {
+            Tuple t = iter.next();
+            assertEquals(expected.get((Integer)t.get(0)), (Integer)t.get(1));                
+        }                                   
+    }          
+    
+    public void testAccumWithOr() throws IOException{
+        pigServer.registerQuery("A = load '" + INPUT_FILE + "' as (id:int, fruit);");
+        pigServer.registerQuery("B = group A by id;");
+        pigServer.registerQuery("C = foreach B generate group,  " +
+                "((org.apache.pig.test.utils.AccumulatorBagCount(A)>3 or " +
+                "org.apache.pig.test.utils.AccumulatorBagCount(A)<2)?0:1);");                     
+
+        HashMap<Integer, Integer> expected = new HashMap<Integer, Integer>();
+        expected.put(100, 1);
+        expected.put(200, 0);
+        expected.put(300, 1);
+        expected.put(400, 0);
+        
+                  
+        Iterator<Tuple> iter = pigServer.openIterator("C");
+        
+        while(iter.hasNext()) {
+            Tuple t = iter.next();
+            assertEquals(expected.get((Integer)t.get(0)), (Integer)t.get(1));                
+        }                                   
+    }  
+    
+    public void testAccumWithRegexp() throws IOException{
+        pigServer.registerQuery("A = load '" + INPUT_FILE + "' as (id:int, fruit);");
+        pigServer.registerQuery("B = group A by id;");
+        pigServer.registerQuery("C = foreach B generate group,  " +
+                "(((chararray)org.apache.pig.test.utils.AccumulatorBagCount(A)) matches '1*' ?0:1);");                     
+
+        HashMap<Integer, Integer> expected = new HashMap<Integer, Integer>();
+        expected.put(100, 1);
+        expected.put(200, 0);
+        expected.put(300, 1);
+        expected.put(400, 0);
+        
+                  
+        Iterator<Tuple> iter = pigServer.openIterator("C");
+        
+        while(iter.hasNext()) {
+            Tuple t = iter.next();
+            assertEquals(expected.get((Integer)t.get(0)), (Integer)t.get(1));                
+        }                                   
+    }              
+    
+
+    public void testAccumWithIsNull() throws IOException{
+        pigServer.registerQuery("A = load '" + INPUT_FILE2 + "' as (id:int, fruit);");
+        pigServer.registerQuery("B = group A by id;");
+        pigServer.registerQuery("C = foreach B generate group,  " +
+                "((chararray)org.apache.pig.test.utils.AccumulativeSumBag(A) is null?0:1);");                     
+
+        HashMap<Integer, Integer> expected = new HashMap<Integer, Integer>();
+        expected.put(100, 0);
+        expected.put(200, 0);
+        expected.put(300, 1);                
+                  
+        Iterator<Tuple> iter = pigServer.openIterator("C");
+        
+        while(iter.hasNext()) {
+            Tuple t = iter.next();
+            assertEquals(expected.get((Integer)t.get(0)), (Integer)t.get(1));                
+        }                                   
+    }              
+    
+    public void testAccumWithDistinct() throws IOException{
+        pigServer.registerQuery("A = load '" + INPUT_FILE + "' as (id:int, f);");
+        pigServer.registerQuery("B = group A by id;");
+        pigServer.registerQuery("C = foreach B { D = distinct A; generate group, org.apache.pig.test.utils.AccumulatorBagCount(D)+1;};");                     
+
+        HashMap<Integer, Integer> expected = new HashMap<Integer, Integer>();
+        expected.put(100, 2);
+        expected.put(200, 2);
+        expected.put(300, 3);
+        expected.put(400, 2);
+                  
+        Iterator<Tuple> iter = pigServer.openIterator("C");
+        
+        while(iter.hasNext()) {
+            Tuple t = iter.next();
+            assertEquals(expected.get((Integer)t.get(0)), (Integer)t.get(1));                
+        }                                   
+    }             
+    
+    public void testAccumWithSort() throws IOException{
+        pigServer.registerQuery("A = load '" + INPUT_FILE + "' as (id:int, f);");
+        pigServer.registerQuery("B = foreach A generate id, f, id as t;");
+        pigServer.registerQuery("C = group B by id;");
+        pigServer.registerQuery("D = foreach C { E = order B by f; F = E.f; generate group, org.apache.pig.test.utils.AccumulativeSumBag(F);};");                     
+
+        HashMap<Integer, String> expected = new HashMap<Integer, String>();
+        expected.put(100, "(apple)(apple)");
+        expected.put(200, "(orange)");
+        expected.put(300, "(pear)(pear)(strawberry)");
+        expected.put(400, "(apple)");
+                  
+        Iterator<Tuple> iter = pigServer.openIterator("D");
+        
+        while(iter.hasNext()) {
+            Tuple t = iter.next();
+            assertEquals(expected.get((Integer)t.get(0)), (String)t.get(1));                
+        }                                   
+    }             
+    
+    public void testAccumWithBuildin() throws IOException{
+        pigServer.registerQuery("A = load '" + INPUT_FILE3 + "' as (id:int, v:double);");
+        pigServer.registerQuery("C = group A by id;");
+        pigServer.registerQuery("D = foreach C generate group, SUM(A.v), AVG(A.v), COUNT(A.v), MIN(A.v), MAX(A.v);");                     
+
+        HashMap<Integer, Double[]> expected = new HashMap<Integer, Double[]>();
+        expected.put(100, new Double[]{15.0,3.0,5.0,1.0,5.0});
+        expected.put(200, new Double[]{6.3,2.1,3.0,1.1,3.1});
+        expected.put(300, new Double[]{3.3,3.3,1.0,3.3,3.3});
+                  
+        Iterator<Tuple> iter = pigServer.openIterator("D");
+        
+        while(iter.hasNext()) {
+            Tuple t = iter.next();
+            Double[] v = expected.get((Integer)t.get(0));
+            for(int i=0; i<v.length; i++) {
+                assertEquals(v[i].doubleValue(), ((Number)t.get(i+1)).doubleValue(), 0.0001);
+            }            
+        }    
+    }
+}

Modified: hadoop/pig/branches/load-store-redesign/test/org/apache/pig/test/TestCombiner.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/test/org/apache/pig/test/TestCombiner.java?rev=883836&r1=883835&r2=883836&view=diff
==============================================================================
--- hadoop/pig/branches/load-store-redesign/test/org/apache/pig/test/TestCombiner.java (original)
+++ hadoop/pig/branches/load-store-redesign/test/org/apache/pig/test/TestCombiner.java Tue Nov 24 19:54:19 2009
@@ -33,6 +33,7 @@
 import junit.framework.Assert;
 import junit.framework.TestCase;
 
+import org.apache.pig.EvalFunc;
 import org.apache.pig.ExecType;
 import org.apache.pig.PigServer;
 import org.apache.pig.builtin.PigStorage;
@@ -366,4 +367,53 @@
         }
     }
 
+    public static class JiraPig1030 extends EvalFunc<String> {
+        
+        public String exec(Tuple input) throws IOException {
+            return "";
+        }
+    }
+    
+    @Test
+    public void testJiraPig1030() {
+        // test that combiner is NOT invoked when
+        // one of the elements in the foreach generate
+        // has a non-algebraic UDF that have multiple inputs
+        // (one of them is distinct).
+        
+        String input[] = {
+                "pig1\t18\t2.1",
+                "pig2\t24\t3.3",
+                "pig5\t45\t2.4",
+                "pig1\t18\t2.1",
+                "pig1\t19\t2.1",
+                "pig2\t24\t4.5",
+                "pig1\t20\t3.1" };
+ 
+        try {
+            Util.createInputFile(cluster, "forEachNoCombinerInput.txt", input);
+            PigServer pigServer = new PigServer(ExecType.MAPREDUCE, cluster.getProperties());
+            pigServer.registerQuery("a = load 'forEachNoCombinerInput.txt' as (name:chararray, age:int, gpa:double);");
+            pigServer.registerQuery("b = group a all;");
+            pigServer.registerQuery("c = foreach b  {" +
+                    "        d = distinct a.age;" +
+                    "        generate group, " + JiraPig1030.class.getName() + "(d, 0);};");
+            
+            // make sure there isn't a combine plan in the explain output
+            ByteArrayOutputStream baos = new ByteArrayOutputStream();
+            PrintStream ps = new PrintStream(baos);
+            pigServer.explain("c", ps);
+            assertFalse(baos.toString().matches("(?si).*combine plan.*"));    
+        } catch (Exception e) {
+            e.printStackTrace();
+            Assert.fail();
+        } finally {
+            try {
+                Util.deleteFile(cluster, "forEachNoCombinerInput.txt");
+            } catch (IOException e) {
+                e.printStackTrace();
+                Assert.fail();
+            }
+        }
+    }
 }

Modified: hadoop/pig/branches/load-store-redesign/test/org/apache/pig/test/TestCounters.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/test/org/apache/pig/test/TestCounters.java?rev=883836&r1=883835&r2=883836&view=diff
==============================================================================
--- hadoop/pig/branches/load-store-redesign/test/org/apache/pig/test/TestCounters.java (original)
+++ hadoop/pig/branches/load-store-redesign/test/org/apache/pig/test/TestCounters.java Tue Nov 24 19:54:19 2009
@@ -538,11 +538,13 @@
         File out = File.createTempFile("output", ".txt");
         out.delete();
         PigServer pigServer = new PigServer("local");
+        // FileLocalizer is initialized before using HDFS by previous tests
+        FileLocalizer.setInitialized(false);
         pigServer.registerQuery("a = load '" + Util.encodeEscape(file.toString()) + "';");
         pigServer.registerQuery("b = order a by $0;");
         pigServer.registerQuery("c = group b by $0;");
         pigServer.registerQuery("d = foreach c generate group, SUM(b.$1);");
-        PigStats pigStats = pigServer.store("d", out.getAbsolutePath()).getStatistics();
+        PigStats pigStats = pigServer.store("d", "file://" + out.getAbsolutePath()).getStatistics();
         InputStream is = FileLocalizer.open(FileLocalizer.fullPath(out.getAbsolutePath(), pigServer.getPigContext()), ExecType.MAPREDUCE, pigServer.getPigContext().getDfs());
         long filesize = 0;
         while(is.read() != -1) filesize++;
@@ -552,8 +554,8 @@
         
         //Map<String, Map<String, String>> stats = pigStats.getPigStats();
         
-        assertEquals(count, pigStats.getRecordsWritten());
-        assertEquals(filesize, pigStats.getBytesWritten());
+        assertEquals(10, pigStats.getRecordsWritten());
+        assertEquals(110, pigStats.getBytesWritten());
 
     }
 

Modified: hadoop/pig/branches/load-store-redesign/test/org/apache/pig/test/TestFRJoin.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/test/org/apache/pig/test/TestFRJoin.java?rev=883836&r1=883835&r2=883836&view=diff
==============================================================================
--- hadoop/pig/branches/load-store-redesign/test/org/apache/pig/test/TestFRJoin.java (original)
+++ hadoop/pig/branches/load-store-redesign/test/org/apache/pig/test/TestFRJoin.java Tue Nov 24 19:54:19 2009
@@ -22,8 +22,11 @@
 import java.io.FileOutputStream;
 import java.io.IOException;
 import java.io.PrintStream;
+import java.util.HashMap;
 import java.util.Hashtable;
 import java.util.Iterator;
+import java.util.Map;
+import java.util.Set;
 
 import junit.framework.Assert;
 import junit.framework.TestCase;
@@ -56,6 +59,7 @@
 
 public class TestFRJoin extends TestCase{
     private static final String INPUT_FILE = "testFrJoinInput.txt";
+    private static final String INPUT_FILE2 = "testFrJoinInput2.txt";
     private PigServer pigServer;
     private MiniCluster cluster = MiniCluster.buildCluster();
     private File tmpFile;
@@ -77,11 +81,21 @@
                 input[k++] = si + "\t" + j;
         }
         Util.createInputFile(cluster, INPUT_FILE, input);
+        
+        String[] input2 = new String[2*(LOOP_SIZE/2)];
+        k = 0;
+        for(int i = 1; i <= LOOP_SIZE/2; i++) {
+            String si = i + "";
+            for(int j=1;j<=LOOP_SIZE/2;j++)
+                input2[k++] = si + "\t" + j;
+        }
+        Util.createInputFile(cluster, INPUT_FILE2, input2);
     }
 
     @After
     public void tearDown() throws Exception {
         Util.deleteFile(cluster, INPUT_FILE);
+        Util.deleteFile(cluster, INPUT_FILE2 );
     }
     
     public static class FRJoin extends EvalFunc<DataBag>{
@@ -408,8 +422,83 @@
         Assert.assertTrue(dbfrj.size()>0 && dbshj.size()>0);
         Assert.assertEquals(true, TestHelper.compareBags(dbfrj, dbshj));
     }
+
+    @Test
+    public void testFRJoinOut8() throws IOException {
+        pigServer.registerQuery("A = LOAD '" + INPUT_FILE + "' as (x:int,y:int);");
+        pigServer.registerQuery("B = LOAD '" + INPUT_FILE2 + "' as (x:int,y:int);");
+        DataBag dbfrj = BagFactory.getInstance().newDefaultBag(), dbshj = BagFactory.getInstance().newDefaultBag();
+        Map<String,Tuple> hashFRJoin = new HashMap<String,Tuple>();
+        Map<String,Tuple> hashJoin = new HashMap<String,Tuple>();
+        {
+            pigServer.registerQuery("C = join A by $0 left, B by $0 using \"replicated\";");
+            pigServer.registerQuery("D = join A by $1 left, B by $1 using \"replicated\";");
+            pigServer.registerQuery("E = union C,D;");
+            Iterator<Tuple> iter = pigServer.openIterator("E");
+            
+            while(iter.hasNext()) {
+                Tuple tuple = iter.next();
+                String Key = tuple.toDelimitedString(",");
+                hashFRJoin.put( Key, tuple);
+                dbfrj.add(tuple);
+                
+            }
+        }
+        {
+            pigServer.registerQuery("C = join A by $0 left, B by $0;");
+            pigServer.registerQuery("D = join A by $1 left, B by $1;");
+            pigServer.registerQuery("E = union C,D;");
+            Iterator<Tuple> iter = pigServer.openIterator("E");
+            while(iter.hasNext()) {
+                Tuple tuple = iter.next();
+                String Key = tuple.toDelimitedString(",");
+                hashJoin.put( Key, tuple);
+                dbshj.add(tuple);
+            }
+        }
+        Assert.assertTrue(dbfrj.size()>0 && dbshj.size()>0);
+                
+        Assert.assertEquals(true, TestHelper.compareBags(dbfrj, dbshj));
+    }
     
     @Test
+    public void testFRJoinOut9() throws IOException {
+        pigServer.registerQuery("A = LOAD '" + INPUT_FILE + "' as (x:int,y:int);");
+        pigServer.registerQuery("B = LOAD '" + INPUT_FILE2 + "' as (x:int,y:int);");
+        DataBag dbfrj = BagFactory.getInstance().newDefaultBag(), dbshj = BagFactory.getInstance().newDefaultBag();
+        Map<String,Tuple> hashFRJoin = new HashMap<String,Tuple>();
+        Map<String,Tuple> hashJoin = new HashMap<String,Tuple>();
+        {
+            pigServer.registerQuery("C = join A by $0 left, B by $0 using \"repl\";");
+            pigServer.registerQuery("D = join A by $1 left, B by $1 using \"repl\";");
+            pigServer.registerQuery("E = union C,D;");
+            Iterator<Tuple> iter = pigServer.openIterator("E");
+            
+            while(iter.hasNext()) {
+                Tuple tuple = iter.next();
+                String Key = tuple.toDelimitedString(",");
+                hashFRJoin.put( Key, tuple);
+                dbfrj.add(tuple);
+                
+            }
+        }
+        {
+            pigServer.registerQuery("C = join A by $0 left, B by $0;");
+            pigServer.registerQuery("D = join A by $1 left, B by $1;");
+            pigServer.registerQuery("E = union C,D;");
+            Iterator<Tuple> iter = pigServer.openIterator("E");
+            while(iter.hasNext()) {
+                Tuple tuple = iter.next();
+                String Key = tuple.toDelimitedString(",");
+                hashJoin.put( Key, tuple);
+                dbshj.add(tuple);
+            }
+        }
+        Assert.assertTrue(dbfrj.size()>0 && dbshj.size()>0);        
+        Assert.assertEquals(true, TestHelper.compareBags(dbfrj, dbshj));
+    }
+        
+    @Test
     public void testFRJoinSch1() throws IOException{
         pigServer.registerQuery("A = LOAD '" + INPUT_FILE + "' as (x:int,y:int);");
         pigServer.registerQuery("B = LOAD '" + INPUT_FILE + "' as (x:int,y:int);");

Modified: hadoop/pig/branches/load-store-redesign/test/org/apache/pig/test/TestForEachNestedPlanLocal.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/test/org/apache/pig/test/TestForEachNestedPlanLocal.java?rev=883836&r1=883835&r2=883836&view=diff
==============================================================================
--- hadoop/pig/branches/load-store-redesign/test/org/apache/pig/test/TestForEachNestedPlanLocal.java (original)
+++ hadoop/pig/branches/load-store-redesign/test/org/apache/pig/test/TestForEachNestedPlanLocal.java Tue Nov 24 19:54:19 2009
@@ -78,12 +78,13 @@
             t = it.next();
             count[i] = (Long)t.get(0);
         }
-
+        
         Assert.assertFalse(it.hasNext());
 
-        Assert.assertEquals(3L, count[0]);
+        // Pig's previous local mode was screwed up correcting that
+        Assert.assertEquals(5L, count[0]);
         Assert.assertEquals(5L, count[1]);
-        Assert.assertEquals(5L, count[2]);
+        Assert.assertEquals(3L, count[2]);
     }
 
 

Modified: hadoop/pig/branches/load-store-redesign/test/org/apache/pig/test/TestHBaseStorage.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/test/org/apache/pig/test/TestHBaseStorage.java?rev=883836&r1=883835&r2=883836&view=diff
==============================================================================
--- hadoop/pig/branches/load-store-redesign/test/org/apache/pig/test/TestHBaseStorage.java (original)
+++ hadoop/pig/branches/load-store-redesign/test/org/apache/pig/test/TestHBaseStorage.java Tue Nov 24 19:54:19 2009
@@ -16,19 +16,27 @@
  */
 package org.apache.pig.test;
 
+import java.io.File;
 import java.io.IOException;
 import java.util.Iterator;
 
+import junit.framework.TestCase;
+
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.HBaseConfiguration;
 import org.apache.hadoop.hbase.HColumnDescriptor;
 import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.HTableDescriptor;
 import org.apache.hadoop.hbase.MiniHBaseCluster;
+import org.apache.hadoop.hbase.MiniZooKeeperCluster;
 import org.apache.hadoop.hbase.client.HBaseAdmin;
 import org.apache.hadoop.hbase.client.HConnectionManager;
 import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.ResultScanner;
+import org.apache.hadoop.hbase.client.Scan;
 import org.apache.hadoop.hbase.io.BatchUpdate;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.pig.ExecType;
@@ -37,11 +45,10 @@
 import org.apache.pig.backend.hadoop.datastorage.ConfigurationUtil;
 import org.apache.pig.data.DataByteArray;
 import org.apache.pig.data.Tuple;
+import org.apache.pig.impl.io.FileLocalizer;
 import org.junit.Before;
 import org.junit.Test;
 
-import junit.framework.TestCase;
-
 /** {@link org.apache.pig.backend.hadoop.hbase.HBaseStorage} Test Case **/
 public class TestHBaseStorage extends TestCase {
 
@@ -51,6 +58,7 @@
     private MiniCluster cluster = MiniCluster.buildCluster();
     private HBaseConfiguration conf;
     private MiniHBaseCluster hbaseCluster;
+    private MiniZooKeeperCluster zooKeeperCluster;
     
     private PigServer pig;
     
@@ -70,8 +78,23 @@
     @Override
     protected void setUp() throws Exception {
         super.setUp();
+        
         conf = new HBaseConfiguration(ConfigurationUtil.
              toConfiguration(cluster.getProperties()));
+        conf.set("fs.default.name", cluster.getFileSystem().getUri().toString());
+        Path parentdir = cluster.getFileSystem().getHomeDirectory();
+        conf.set(HConstants.HBASE_DIR, parentdir.toString());
+        
+        // Make the thread wake frequency a little slower so other threads
+        // can run
+        conf.setInt("hbase.server.thread.wakefrequency", 2000);
+        
+        // Make lease timeout longer, lease checks less frequent
+        conf.setInt("hbase.master.lease.period", 10 * 1000);
+        
+        // Increase the amount of time between client retries
+        conf.setLong("hbase.client.pause", 15 * 1000);
+        
         try {
             hBaseClusterSetup();
         } catch (Exception e) {
@@ -81,17 +104,28 @@
             throw e;
         }
         
-        pig = new PigServer(ExecType.MAPREDUCE, cluster.getProperties());
+        pig = new PigServer(ExecType.MAPREDUCE, ConfigurationUtil.toProperties(conf));
     }
     
     /**
      * Actually start the MiniHBase instance.
      */
     protected void hBaseClusterSetup() throws Exception {
+        zooKeeperCluster = new MiniZooKeeperCluster();
+        int clientPort = this.zooKeeperCluster.startup(new File("build/test"));
+        conf.set("hbase.zookeeper.property.clientPort",clientPort+"");
       // start the mini cluster
       hbaseCluster = new MiniHBaseCluster(conf, NUM_REGIONSERVERS);
       // opening the META table ensures that cluster is running
-      new HTable(conf, HConstants.META_TABLE_NAME);
+      while(true){
+    	  try{
+    		  new HTable(conf, HConstants.META_TABLE_NAME);
+    		  break;
+    	  }catch(IOException e){
+    		  Thread.sleep(1000);
+    	  }
+    	  
+      }
     }
 
     @Override
@@ -108,6 +142,13 @@
                     LOG.warn("Closing mini hbase cluster", e);
                 }
             }
+            if (zooKeeperCluster!=null){
+            	try{
+            		zooKeeperCluster.shutdown();
+            	} catch (IOException e){
+            		LOG.warn("Closing zookeeper cluster",e);
+            	}
+            }
         } catch (Exception e) {
             LOG.error(e);
         }
@@ -122,6 +163,7 @@
     @Test
     public void testLoadFromHBase() throws IOException, ExecException {
         prepareTable();
+
         pig.registerQuery("a = load 'hbase://" + TESTTABLE + "' using " +
             "org.apache.pig.backend.hadoop.hbase.HBaseStorage('" + TESTCOLUMN_A + 
             " " + TESTCOLUMN_B + " " + TESTCOLUMN_C + "') as (col_a, col_b:int, col_c);");

Modified: hadoop/pig/branches/load-store-redesign/test/org/apache/pig/test/TestJoin.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/test/org/apache/pig/test/TestJoin.java?rev=883836&r1=883835&r2=883836&view=diff
==============================================================================
--- hadoop/pig/branches/load-store-redesign/test/org/apache/pig/test/TestJoin.java (original)
+++ hadoop/pig/branches/load-store-redesign/test/org/apache/pig/test/TestJoin.java Tue Nov 24 19:54:19 2009
@@ -456,7 +456,7 @@
         lpt.buildPlan("a = load 'a.txt' as (n:chararray, a:int); ");
         lpt.buildPlan("b = load 'b.txt' as (n:chararray, m:chararray); ");
         String[] types = new String[] { "left", "right", "full" };
-        String[] joinTypes = new String[] { "replicated", "repl", "skewed", "merge" };
+        String[] joinTypes = new String[] { "replicated", "repl", "merge" };
         for (int i = 0; i < types.length; i++) {
             for(int j = 0; j < joinTypes.length; j++) {
                 boolean errCaught = false;
@@ -466,9 +466,20 @@
                     
                 } catch(Exception e) {
                     errCaught = true;
-                    assertEquals(true, e.getMessage().contains("does not support (left|right|full) outer joins"));
+                    if( j == 0 || j == 1 ) {
+                     // This after adding support of LeftOuter Join to replicated Join
+                        assertEquals(true, e.getMessage().contains("does not support (right|full) outer joins"));   
+                    } else {
+                        assertEquals(true, e.getMessage().contains("does not support (left|right|full) outer joins"));
+                    }                    
+                }
+                if( i == 0 && ( j == 0 || j== 1 ) ) {
+                    // This after adding support of LeftOuter Join to replicated Join
+                    assertEquals(false, errCaught);
+                }
+                else {
+                    assertEquals(true, errCaught);
                 }
-                assertEquals(true, errCaught);
             }
             
         }

Modified: hadoop/pig/branches/load-store-redesign/test/org/apache/pig/test/TestLoad.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/test/org/apache/pig/test/TestLoad.java?rev=883836&r1=883835&r2=883836&view=diff
==============================================================================
--- hadoop/pig/branches/load-store-redesign/test/org/apache/pig/test/TestLoad.java (original)
+++ hadoop/pig/branches/load-store-redesign/test/org/apache/pig/test/TestLoad.java Tue Nov 24 19:54:19 2009
@@ -162,6 +162,36 @@
         }
     }
 
+    @Test
+    public void testCommaSeparatedString() throws Exception {
+        checkLoadPath("usr/pig/a,usr/pig/b","/tmp/usr/pig/a,/tmp/usr/pig/b");
+    }
+
+    @Test
+    public void testCommaSeparatedString2() throws Exception {
+        checkLoadPath("t?s*,test","/tmp/t?s*,/tmp/test");
+    }
+
+    @Test
+    public void testCommaSeparatedString3() throws Exception {
+        checkLoadPath("hdfs:/tmp/test,hdfs:/tmp/test2,hdfs:/tmp/test3","/tmp/test,/tmp/test2,/tmp/test3");
+    }
+    
+    @Test
+    public void testCommaSeparatedString4() throws Exception {
+        checkLoadPath("usr/pig/{a,c},usr/pig/b","/tmp/usr/pig/{a,c},/tmp/usr/pig/b");
+    }
+
+    @Test
+    public void testCommaSeparatedString5() throws Exception {
+        checkLoadPath("/usr/pig/{a,c},usr/pig/b","/usr/pig/{a,c},/tmp/usr/pig/b");
+    }
+    
+    @Test
+    public void testCommaSeparatedString6() throws Exception {
+        checkLoadPath("usr/pig/{a,c},/usr/pig/b","/tmp/usr/pig/{a,c},/usr/pig/b");
+    }    
+
     private void checkLoadPath(String orig, String expected) throws Exception {
         pc.getProperties().setProperty("opt.multiquery", "" + true);