You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by pr...@apache.org on 2009/10/30 21:23:56 UTC

svn commit: r831443 [1/4] - in /hadoop/pig/branches/load-store-redesign: ./ lib-src/bzip2/org/apache/tools/bzip2r/ src/org/apache/pig/ src/org/apache/pig/backend/hadoop/executionengine/ src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/ ...

Author: pradeepkth
Date: Fri Oct 30 20:23:54 2009
New Revision: 831443

URL: http://svn.apache.org/viewvc?rev=831443&view=rev
Log:
svn merge -r829891:831153 http://svn.apache.org/repos/asf/hadoop/pig/trunk

Added:
    hadoop/pig/branches/load-store-redesign/src/org/apache/pig/CommittableStoreFunc.java
    hadoop/pig/branches/load-store-redesign/src/org/apache/pig/IndexableLoadFunc.java
    hadoop/pig/branches/load-store-redesign/src/org/apache/pig/SortColInfo.java
    hadoop/pig/branches/load-store-redesign/src/org/apache/pig/SortInfo.java
    hadoop/pig/branches/load-store-redesign/src/org/apache/pig/data/InternalDistinctBag.java
    hadoop/pig/branches/load-store-redesign/src/org/apache/pig/data/InternalSortedBag.java
    hadoop/pig/branches/load-store-redesign/src/org/apache/pig/impl/builtin/DefaultIndexableLoader.java
    hadoop/pig/branches/load-store-redesign/src/org/apache/pig/impl/util/Utils.java
Modified:
    hadoop/pig/branches/load-store-redesign/CHANGES.txt
    hadoop/pig/branches/load-store-redesign/lib-src/bzip2/org/apache/tools/bzip2r/CBZip2OutputStream.java
    hadoop/pig/branches/load-store-redesign/src/org/apache/pig/LoadMetadata.java
    hadoop/pig/branches/load-store-redesign/src/org/apache/pig/Main.java
    hadoop/pig/branches/load-store-redesign/src/org/apache/pig/PigServer.java
    hadoop/pig/branches/load-store-redesign/src/org/apache/pig/StandAloneParser.java
    hadoop/pig/branches/load-store-redesign/src/org/apache/pig/StoreConfig.java
    hadoop/pig/branches/load-store-redesign/src/org/apache/pig/StoreFunc.java
    hadoop/pig/branches/load-store-redesign/src/org/apache/pig/backend/hadoop/executionengine/HExecutionEngine.java
    hadoop/pig/branches/load-store-redesign/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/CombinerOptimizer.java
    hadoop/pig/branches/load-store-redesign/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java
    hadoop/pig/branches/load-store-redesign/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/Launcher.java
    hadoop/pig/branches/load-store-redesign/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java
    hadoop/pig/branches/load-store-redesign/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReducePOStoreImpl.java
    hadoop/pig/branches/load-store-redesign/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigMapReduce.java
    hadoop/pig/branches/load-store-redesign/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigOutputCommitter.java
    hadoop/pig/branches/load-store-redesign/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigOutputFormat.java
    hadoop/pig/branches/load-store-redesign/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/plans/DotMRPrinter.java
    hadoop/pig/branches/load-store-redesign/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/LogToPhyTranslationVisitor.java
    hadoop/pig/branches/load-store-redesign/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/PhysicalOperator.java
    hadoop/pig/branches/load-store-redesign/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/plans/PhysicalPlan.java
    hadoop/pig/branches/load-store-redesign/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POCombinerPackage.java
    hadoop/pig/branches/load-store-redesign/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/PODistinct.java
    hadoop/pig/branches/load-store-redesign/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POFRJoin.java
    hadoop/pig/branches/load-store-redesign/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POMergeJoin.java
    hadoop/pig/branches/load-store-redesign/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POOptimizedForEach.java
    hadoop/pig/branches/load-store-redesign/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POSort.java
    hadoop/pig/branches/load-store-redesign/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POStore.java
    hadoop/pig/branches/load-store-redesign/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POStoreImpl.java
    hadoop/pig/branches/load-store-redesign/src/org/apache/pig/backend/local/executionengine/LocalPOStoreImpl.java
    hadoop/pig/branches/load-store-redesign/src/org/apache/pig/builtin/Distinct.java
    hadoop/pig/branches/load-store-redesign/src/org/apache/pig/builtin/IntSum.java
    hadoop/pig/branches/load-store-redesign/src/org/apache/pig/builtin/MAX.java
    hadoop/pig/branches/load-store-redesign/src/org/apache/pig/builtin/MIN.java
    hadoop/pig/branches/load-store-redesign/src/org/apache/pig/builtin/PigStorage.java
    hadoop/pig/branches/load-store-redesign/src/org/apache/pig/builtin/SUM.java
    hadoop/pig/branches/load-store-redesign/src/org/apache/pig/data/DataType.java
    hadoop/pig/branches/load-store-redesign/src/org/apache/pig/data/DefaultAbstractBag.java
    hadoop/pig/branches/load-store-redesign/src/org/apache/pig/data/DefaultTuple.java
    hadoop/pig/branches/load-store-redesign/src/org/apache/pig/data/InternalCachedBag.java
    hadoop/pig/branches/load-store-redesign/src/org/apache/pig/data/NonSpillableDataBag.java
    hadoop/pig/branches/load-store-redesign/src/org/apache/pig/impl/io/BufferedPositionedInputStream.java
    hadoop/pig/branches/load-store-redesign/src/org/apache/pig/impl/logicalLayer/LOCross.java
    hadoop/pig/branches/load-store-redesign/src/org/apache/pig/impl/logicalLayer/LOForEach.java
    hadoop/pig/branches/load-store-redesign/src/org/apache/pig/impl/logicalLayer/LOProject.java
    hadoop/pig/branches/load-store-redesign/src/org/apache/pig/impl/logicalLayer/LOSort.java
    hadoop/pig/branches/load-store-redesign/src/org/apache/pig/impl/logicalLayer/optimizer/OpLimitOptimizer.java
    hadoop/pig/branches/load-store-redesign/src/org/apache/pig/impl/logicalLayer/validators/TypeCheckingVisitor.java
    hadoop/pig/branches/load-store-redesign/src/org/apache/pig/impl/plan/CompilationMessageCollector.java
    hadoop/pig/branches/load-store-redesign/src/org/apache/pig/impl/plan/PlanPrinter.java
    hadoop/pig/branches/load-store-redesign/src/org/apache/pig/impl/plan/optimizer/RulePlanPrinter.java
    hadoop/pig/branches/load-store-redesign/src/org/apache/pig/impl/util/JarManager.java
    hadoop/pig/branches/load-store-redesign/src/org/apache/pig/tools/parameters/PreprocessorContext.java
    hadoop/pig/branches/load-store-redesign/src/org/apache/pig/tools/streams/StreamGenerator.java
    hadoop/pig/branches/load-store-redesign/test/findbugsExcludeFile.xml
    hadoop/pig/branches/load-store-redesign/test/org/apache/pig/test/TestCombiner.java
    hadoop/pig/branches/load-store-redesign/test/org/apache/pig/test/TestDataBag.java
    hadoop/pig/branches/load-store-redesign/test/org/apache/pig/test/TestLogToPhyCompiler.java
    hadoop/pig/branches/load-store-redesign/test/org/apache/pig/test/TestMergeJoin.java

Modified: hadoop/pig/branches/load-store-redesign/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/CHANGES.txt?rev=831443&r1=831442&r2=831443&view=diff
==============================================================================
--- hadoop/pig/branches/load-store-redesign/CHANGES.txt (original)
+++ hadoop/pig/branches/load-store-redesign/CHANGES.txt Fri Oct 30 20:23:54 2009
@@ -26,6 +26,20 @@
 
 IMPROVEMENTS
 
+PIG-953: Enable merge join in pig to work with loaders and store functions
+which can internally index sorted data (pradeepkth)
+
+PIG-1055: FINDBUGS: remaining "Dodgy Warnings" (olgan)
+
+PIG-1052: FINDBUGS: remaining performance warningse(olgan)
+
+PIG-1037: Converted sorted and distinct bags to use the new active spilling
+          paradigm (yinghe via gates)
+
+PIG-1051: FINDBUGS: NP_NULL_ON_SOME_PATH_FROM_RETURN_VALUE (olgan)
+
+PIG-1050: FINDBUGS: DLS_DEAD_LOCAL_STORE: Dead store to local variable (olgan)
+
 PIG-1045: Integration with Hadoop 20 New API (rding via pradeepkth)
 
 PIG-1043: FINDBUGS: SIC_INNER_SHOULD_BE_STATIC: Should be a static inner class
@@ -91,6 +105,9 @@
 
 BUG FIXES
 
+PIG-746: Works in --exectype local, fails on grid - ERROR 2113: SingleTupleBag
+should never be serialized (rding via pradeepkth)
+
 PIG-1027: Number of bytes written are always zero in local mode (zjffdu via gates)
 
 PIG-976: Multi-query optimization throws ClassCastException (rding via
@@ -130,6 +147,8 @@
 
 PIG-927: null should be handled consistently in Join (daijy)
 
+PIG-790: Error message should indicate in which line number in the Pig script the error occured (debugging BinCond) (daijy)
+
 Release 0.5.0 - Unreleased
 
 INCOMPATIBLE CHANGES

Modified: hadoop/pig/branches/load-store-redesign/lib-src/bzip2/org/apache/tools/bzip2r/CBZip2OutputStream.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/lib-src/bzip2/org/apache/tools/bzip2r/CBZip2OutputStream.java?rev=831443&r1=831442&r2=831443&view=diff
==============================================================================
--- hadoop/pig/branches/load-store-redesign/lib-src/bzip2/org/apache/tools/bzip2r/CBZip2OutputStream.java (original)
+++ hadoop/pig/branches/load-store-redesign/lib-src/bzip2/org/apache/tools/bzip2r/CBZip2OutputStream.java Fri Oct 30 20:23:54 2009
@@ -1050,9 +1050,7 @@
             b = t;
         }
         if (b > c) {
-            t = b;
             b = c;
-            c = t;
         }
         if (a > b) {
             b = a;

Added: hadoop/pig/branches/load-store-redesign/src/org/apache/pig/CommittableStoreFunc.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/src/org/apache/pig/CommittableStoreFunc.java?rev=831443&view=auto
==============================================================================
--- hadoop/pig/branches/load-store-redesign/src/org/apache/pig/CommittableStoreFunc.java (added)
+++ hadoop/pig/branches/load-store-redesign/src/org/apache/pig/CommittableStoreFunc.java Fri Oct 30 20:23:54 2009
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.pig;
+
+import java.io.IOException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.pig.backend.hadoop.executionengine.util.MapRedUtil;
+
+/**
+ * A storefunc which has an extra commit() method which is called
+ * when all mappers (when the storefunc is part of map) or reducers (when the
+ * storefunc is part of reduce) are finished. Currently this will allow storefuncs
+ * to do any cleanup/finalizing activities knowing that all the maps/reducers
+ * have finished - one such use case is for zebra storage to build an index
+ * for sorted files once all writes are done.
+ */
+public interface CommittableStoreFunc extends StoreFunc {
+    /**
+     * This method is called when all mappers (when the storefunc is part of 
+     * map) or reducers (when the storefunc is part of reduce) are finished.
+     * This allows the storeFunc to do any global commit actions - only called
+     * when all mappers/reducers successfully complete.
+     * 
+     * If the StoreFunc needs to get hold of StoreConfig object for the store
+     * it can call {@link MapRedUtil#getStoreConfig(Configuration)} where
+     * conf is the Configuration object passed in the commit() call.
+     * 
+     * @param conf Configuration object for the job
+     */
+    public void commit(Configuration conf) throws IOException;
+}

Added: hadoop/pig/branches/load-store-redesign/src/org/apache/pig/IndexableLoadFunc.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/src/org/apache/pig/IndexableLoadFunc.java?rev=831443&view=auto
==============================================================================
--- hadoop/pig/branches/load-store-redesign/src/org/apache/pig/IndexableLoadFunc.java (added)
+++ hadoop/pig/branches/load-store-redesign/src/org/apache/pig/IndexableLoadFunc.java Fri Oct 30 20:23:54 2009
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig;
+
+import java.io.IOException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.pig.data.Tuple;
+
+/**
+ * This interface is intended for use by LoadFunc implementations
+ * which have an internal index for sorted data and can use the index
+ * to support merge join in pig. Interaction with the index 
+ * is abstracted away by the methods in this interface which the pig
+ * runtime will call in a particular sequence to get the records it
+ * needs to perform the merge based join.
+ * 
+ * The sequence of calls made from the pig runtime are:
+ * 
+ * {@link IndexableLoadFunc#initialize(Configuration)}
+ * IndexableLoadFunc.bindTo(filename, bufferedPositionedInputStream, 0, LONG.MAX_VALUE);
+ * (the bufferedPositionedInputStream is a decorator around the underlying
+ * DFS input stream)
+ * IndexableLoadFunc.seekNear(keys);
+ * A series of IndexableLoadFunc.getNext(); calls to perform the join
+ * IndexableLoadFunc.close(); 
+ * 
+ */
+public interface IndexableLoadFunc extends LoadFunc {
+    
+    /**
+     * This method is called by pig run time to allow the
+     * IndexableLoadFunc to perform any initialization actions
+     * @param conf The job configuration object
+     */
+    public void initialize(Configuration conf) throws IOException;
+
+    /**
+     * This method is called by the pig runtime to indicate
+     * to the LoadFunc to position its underlying input stream
+     * near the keys supplied as the argument. Specifically:
+     * 1) if the keys are present in the input stream, the loadfunc
+     * implementation should position its read position to 
+     * a record where the key(s) is/are the biggest key(s) less than
+     * the key(s) supplied in the argument OR to the record with the
+     * first occurrence of the keys(s) supplied.
+     * 2) if the key(s) are absent in the input stream, the implementation
+     * should position its read position to a record where the key(s)
+     * is/are the biggest key(s) less than the key(s) supplied OR to the
+     * first record where the key(s) is/are the smallest key(s) greater
+     * than the keys(s) supplied. 
+     * The description above holds for descending order data in 
+     * a similar manner with "biggest" and "less than" replaced with
+     * "smallest" and "greater than" and vice versa.
+     *  
+     * @param keys Tuple with join keys (which are a prefix of the sort
+     * keys of the input data). For example if the data is sorted on
+     * columns in position 2,4,5 any of the following Tuples are
+     * valid as an argument value:
+     * (fieldAt(2))
+     * (fieldAt(2), fieldAt(4))
+     * (fieldAt(2), fieldAt(4), fieldAt(5))
+     * 
+     * The following are some invalid cases:
+     * (fieldAt(4))
+     * (fieldAt(2), fieldAt(5))
+     * (fieldAt(4), fieldAt(5))
+     * 
+     * @throws IOException When the loadFunc is unable to position
+     * to the required point in its input stream
+     */
+    public void seekNear(Tuple keys) throws IOException;
+    
+    
+    /**
+     * A method called by the pig runtime to give an opportunity
+     * for implementations to perform cleanup actions like closing
+     * the underlying input stream. This is necessary since while
+     * performing a join the pig run time may determine than no further
+     * join is possible with remaining records and may indicate to the
+     * IndexableLoader to cleanup by calling this method.
+     * 
+     * @throws IOException if the loadfunc is unable to perform
+     * its close actions.
+     */
+    public void close() throws IOException;
+}

Modified: hadoop/pig/branches/load-store-redesign/src/org/apache/pig/LoadMetadata.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/src/org/apache/pig/LoadMetadata.java?rev=831443&r1=831442&r2=831443&view=diff
==============================================================================
--- hadoop/pig/branches/load-store-redesign/src/org/apache/pig/LoadMetadata.java (original)
+++ hadoop/pig/branches/load-store-redesign/src/org/apache/pig/LoadMetadata.java Fri Oct 30 20:23:54 2009
@@ -34,7 +34,7 @@
      * not possible to return a schema that represents all returned data,
      * then null should be returned.
      * This method will be called after a 
-     * {@link LoadFunc#setLocation(String, org.apache.hadoop.conf.Configuration)}
+     * {@link LoadFunc#setLocation(String, org.apache.hadoop.mapreduce.Job)}
      * call is made on the Loader implementing {@link LoadFunc} and {@link LoadMetadata}
      */
     ResourceSchema getSchema();

Modified: hadoop/pig/branches/load-store-redesign/src/org/apache/pig/Main.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/src/org/apache/pig/Main.java?rev=831443&r1=831442&r2=831443&view=diff
==============================================================================
--- hadoop/pig/branches/load-store-redesign/src/org/apache/pig/Main.java (original)
+++ hadoop/pig/branches/load-store-redesign/src/org/apache/pig/Main.java Fri Oct 30 20:23:54 2009
@@ -532,7 +532,6 @@
 private static String getVersionString() {
 	String findContainingJar = JarManager.findContainingJar(Main.class);
 	  try { 
-		  StringBuffer buffer = new  StringBuffer();
           JarFile jar = new JarFile(findContainingJar); 
           final Manifest manifest = jar.getManifest(); 
           final Map <String,Attributes> attrs = manifest.getEntries(); 

Modified: hadoop/pig/branches/load-store-redesign/src/org/apache/pig/PigServer.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/src/org/apache/pig/PigServer.java?rev=831443&r1=831442&r2=831443&view=diff
==============================================================================
--- hadoop/pig/branches/load-store-redesign/src/org/apache/pig/PigServer.java (original)
+++ hadoop/pig/branches/load-store-redesign/src/org/apache/pig/PigServer.java Fri Oct 30 20:23:54 2009
@@ -320,14 +320,17 @@
         }
         
         if (urls.hasMoreElements()) {
-            String logMessage = "Found multiple resources that match " 
-                + jarName + ": " + resourceLocation;
+            StringBuffer sb = new StringBuffer("Found multiple resources that match ");
+            sb.append(jarName);
+            sb.append(": ");
+            sb.append(resourceLocation);
             
             while (urls.hasMoreElements()) {
-                logMessage += (logMessage + urls.nextElement() + "; ");
+                sb.append(urls.nextElement());
+                sb.append("; ");
             }
             
-            log.debug(logMessage);
+            log.debug(sb.toString());
         }
     
         return resourceLocation;

Added: hadoop/pig/branches/load-store-redesign/src/org/apache/pig/SortColInfo.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/src/org/apache/pig/SortColInfo.java?rev=831443&view=auto
==============================================================================
--- hadoop/pig/branches/load-store-redesign/src/org/apache/pig/SortColInfo.java (added)
+++ hadoop/pig/branches/load-store-redesign/src/org/apache/pig/SortColInfo.java Fri Oct 30 20:23:54 2009
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig;
+
+import java.io.Serializable;
+
+import org.apache.pig.impl.util.Utils;
+
+/**
+ * A class representing information about a sort column to pass
+ * in {@link SortInfo} to storage functions in {@link StoreConfig}
+ */
+public class SortColInfo implements Serializable {
+    
+    /**
+     * 
+     */
+    private static final long serialVersionUID = 1L;
+
+    // name of sort column
+    private String colName;
+    
+    // index position (0 based) of sort column
+    private int colIndex;
+    
+    public enum Order { ASCENDING, DESCENDING }
+    
+    private Order sortOrder;
+
+    
+    /**
+     * @param colName sort column name
+     * @param colIndex index position (0 based) of sort column
+     * @param orderingType whether the column is sorted ascending or descending
+     */
+    public SortColInfo(String colName, int colIndex, Order orderingType) {
+        this.colName = colName;
+        this.colIndex = colIndex;
+        this.sortOrder = orderingType;
+    }
+
+    /**
+     * @return the sort column name - could be null or empty string if
+     * column name could not be determined either because of the absence of
+     * a schema or because the schema had the column name as null or empty
+     * string - caller should check for these conditions.
+     */
+    public String getColName() {
+        return colName;
+    }
+
+    /**
+     * @return index position (0 based) of sort column
+     */
+    public int getColIndex() {
+        return colIndex;
+    }
+
+    /**
+     * @return whether the column is sorted ascending or descending
+     */
+    public Order getSortOrder() {
+        return sortOrder;
+    }
+    
+    /* (non-Javadoc)
+     * @see java.lang.Object#hashCode()
+     */
+    @Override
+    public int hashCode() {
+        final int prime = 31; 
+        int result = 1;
+        result = prime * result + ((colName == null) ? 0 : colName.hashCode());
+        result = prime * result + colIndex;
+        result = prime * result + ((sortOrder == Order.ASCENDING) ? 1 : 2);
+        return result;
+    }   
+    
+    /* (non-Javadoc)
+     * @see java.lang.Object#equals(java.lang.Object)
+     */
+    @Override
+    public boolean equals(Object obj) {
+        if(!Utils.checkNullAndClass(this, obj)) {
+            return false;
+        }
+        SortColInfo other = (SortColInfo)obj;
+        return Utils.checkNullEquals(this.colName, other.colName, true) &&
+        this.colIndex == other.colIndex && 
+        this.sortOrder == other.sortOrder;
+    }
+    
+    /* (non-Javadoc)
+     * @see java.lang.Object#toString()
+     */
+    @Override
+    public String toString() {
+        return "colname:" + colName +",colindex:" + colIndex + ",orderingType:" 
+        + (sortOrder == Order.ASCENDING ? "ascending" : "descending");
+    }
+    
+    
+    
+    
+}

Added: hadoop/pig/branches/load-store-redesign/src/org/apache/pig/SortInfo.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/src/org/apache/pig/SortInfo.java?rev=831443&view=auto
==============================================================================
--- hadoop/pig/branches/load-store-redesign/src/org/apache/pig/SortInfo.java (added)
+++ hadoop/pig/branches/load-store-redesign/src/org/apache/pig/SortInfo.java Fri Oct 30 20:23:54 2009
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.pig;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.List;
+import org.apache.pig.impl.util.Utils;
+
+/**
+ * Class to communicate sort column information based on 
+ * order by statment's sort columns and schema
+ */
+public class SortInfo implements Serializable {
+    
+    /**
+     * 
+     */
+    private static final long serialVersionUID = 1L;
+
+    boolean isGloballySorted = true; // in pig this is the default
+    
+    List<SortColInfo> sortColInfoList;
+    
+    /**
+     * @param sortColInfoList
+     */
+    public SortInfo(List<SortColInfo> sortColInfoList){
+        this.sortColInfoList = sortColInfoList;
+    }
+
+    /**
+     * @return the sortColInfoList
+     */
+    public List<SortColInfo> getSortColInfoList() {
+        return new ArrayList<SortColInfo>(sortColInfoList);
+    }
+    
+    /* (non-Javadoc)
+     * @see java.lang.Object#hashCode()
+     */
+    @Override
+    public int hashCode() {
+        final int prime = 31; 
+        int result = 1;
+        result = prime * result + ((sortColInfoList == null) ? 0 : 
+            sortColInfoList.hashCode());
+        result = prime * result + (isGloballySorted ? 1: 0);
+        return result;
+    }   
+
+    /**
+     * @return the isGloballySorted
+     */
+    public boolean isGloballySorted() {
+        return isGloballySorted;
+    }
+
+    
+        /* (non-Javadoc)
+     * @see java.lang.Object#equals(java.lang.Object)
+     */
+    @Override
+    public boolean equals(Object obj) {
+        if(!Utils.checkNullAndClass(this, obj)) {
+            return false;
+        }
+        SortInfo other = (SortInfo)obj;
+        return (
+            isGloballySorted == other.isGloballySorted &&
+            Utils.checkNullEquals(sortColInfoList, other.sortColInfoList, true));
+    }
+    
+    /* (non-Javadoc)
+     * @see java.lang.Object#toString()
+     */
+    @Override
+    public String toString() {
+        return "GlobalSort:" + isGloballySorted +", sort column info list:" + sortColInfoList;
+    }
+    
+
+
+}

Modified: hadoop/pig/branches/load-store-redesign/src/org/apache/pig/StandAloneParser.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/src/org/apache/pig/StandAloneParser.java?rev=831443&r1=831442&r2=831443&view=diff
==============================================================================
--- hadoop/pig/branches/load-store-redesign/src/org/apache/pig/StandAloneParser.java (original)
+++ hadoop/pig/branches/load-store-redesign/src/org/apache/pig/StandAloneParser.java Fri Oct 30 20:23:54 2009
@@ -54,9 +54,11 @@
                 return;
             }
             
-            if (line.toLowerCase().equals("quit")) break;
-            if (line.toLowerCase().startsWith("#")) continue;
-            else tryParse(line);
+            if (line != null){
+                if (line.toLowerCase().equals("quit")) break;
+                if (line.toLowerCase().startsWith("#")) continue;
+                else tryParse(line);
+            }
             
         }
         
@@ -70,10 +72,10 @@
             pig.registerQuery(query);
             System.out.print("Current aliases: ");
             Map<String, LogicalPlan> aliasPlan = pig.getAliases();
-            for (Iterator<String> it = aliasPlan.keySet().iterator(); it.hasNext(); ) {
-                String alias = it.next();
-                LogicalPlan lp = aliasPlan.get(alias);
-                System.out.print(alias + "->" + lp.getLeaves().get(0).getSchema());
+            for (Iterator<Map.Entry<String,LogicalPlan>> it = aliasPlan.entrySet().iterator(); it.hasNext(); ) {
+                Map.Entry<String, LogicalPlan> e = it.next();
+                LogicalPlan lp = e.getValue();
+                System.out.print(e.getKey() + "->" + lp.getLeaves().get(0).getSchema());
                 if (it.hasNext()) System.out.print(", \n");
                 else System.out.print("\n");
             }

Modified: hadoop/pig/branches/load-store-redesign/src/org/apache/pig/StoreConfig.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/src/org/apache/pig/StoreConfig.java?rev=831443&r1=831442&r2=831443&view=diff
==============================================================================
--- hadoop/pig/branches/load-store-redesign/src/org/apache/pig/StoreConfig.java (original)
+++ hadoop/pig/branches/load-store-redesign/src/org/apache/pig/StoreConfig.java Fri Oct 30 20:23:54 2009
@@ -20,15 +20,17 @@
     private static final long serialVersionUID = 1L;
     private String location;
     private Schema schema;
+    private SortInfo sortInfo;
     
     
     /**
      * @param location
      * @param schema
      */
-    public StoreConfig(String location, Schema schema) {
+    public StoreConfig(String location, Schema schema, SortInfo sortInfo) {
         this.location = location;
         this.schema = schema;
+        this.sortInfo = sortInfo;
     }
     
     /**
@@ -64,4 +66,33 @@
         return "{location:" + location + ", schema:" + schema + "}";
     }
 
+    /**
+     * @param sortInfo the sortInfo to set
+     */
+    public void setSortInfo(SortInfo sortInfo) {
+        this.sortInfo = sortInfo;
+    }
+
+    /**
+     * This method returns a {@link SortInfo} object giving
+     * information on the column names in the output schema which
+     * correspond to the sort columns and which columns are
+     * ascending and those which are descending
+     * @return the sortInfo object if one could be determined else null
+     * null is returned in the following scenarios (wherein
+     * the sortInfo could not be determined):
+     * 1) the store does not follow an order by
+     * 2) There are operators other than limit between "order by"
+     * and store. If there is a limit between order by and store and
+     * if non of the above conditions are true, then sortInfo will be
+     * non-null.
+     * 
+     * IMPORTANT NOTE:
+     * The caller should check if the return value is null and
+     * take appropriate action
+     */
+    public SortInfo getSortInfo() {
+        return sortInfo;
+    }
+
 }

Modified: hadoop/pig/branches/load-store-redesign/src/org/apache/pig/StoreFunc.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/src/org/apache/pig/StoreFunc.java?rev=831443&r1=831442&r2=831443&view=diff
==============================================================================
--- hadoop/pig/branches/load-store-redesign/src/org/apache/pig/StoreFunc.java (original)
+++ hadoop/pig/branches/load-store-redesign/src/org/apache/pig/StoreFunc.java Fri Oct 30 20:23:54 2009
@@ -70,8 +70,8 @@
      * can check that the correct partition keys are included;
      * a storage function to be written directly to an OutputFormat can
      * make sure the schema will translate in a well defined way.  
-     * @param schema to be checked/set
-     * @throw IOException if this schema is not acceptable.  It should include
+     * @param s to be checked/set
+     * @throws IOException if this schema is not acceptable.  It should include
      * a detailed error message indicating what is wrong with the schema.
      */
     void setSchema(ResourceSchema s) throws IOException;
@@ -95,7 +95,7 @@
      * Write a tuple the output stream to which this instance was
      * previously bound.
      * 
-     * @param f the tuple to store.
+     * @param t the tuple to store.
      * @throws IOException
      */
     void putNext(Tuple t) throws IOException;

Modified: hadoop/pig/branches/load-store-redesign/src/org/apache/pig/backend/hadoop/executionengine/HExecutionEngine.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/src/org/apache/pig/backend/hadoop/executionengine/HExecutionEngine.java?rev=831443&r1=831442&r2=831443&view=diff
==============================================================================
--- hadoop/pig/branches/load-store-redesign/src/org/apache/pig/backend/hadoop/executionengine/HExecutionEngine.java (original)
+++ hadoop/pig/branches/load-store-redesign/src/org/apache/pig/backend/hadoop/executionengine/HExecutionEngine.java Fri Oct 30 20:23:54 2009
@@ -353,7 +353,6 @@
             return new String[] {hodHDFS, hodMapRed};
         }
         
-        try {
             // first, create temp director to store the configuration
             hodConfDir = createTempDir(server);
 			
@@ -431,12 +430,6 @@
             hodMapRed = mapred;
 
             return new String[] {hdfs, mapred};
-        } 
-        catch (Exception e) {
-            int errCode = 6010;
-            String msg = "Could not connect to HOD";
-            throw new ExecException(msg, errCode, PigException.REMOTE_ENVIRONMENT, e);
-        }
     }
 
     private synchronized void closeHod(String server){

Modified: hadoop/pig/branches/load-store-redesign/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/CombinerOptimizer.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/CombinerOptimizer.java?rev=831443&r1=831442&r2=831443&view=diff
==============================================================================
--- hadoop/pig/branches/load-store-redesign/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/CombinerOptimizer.java (original)
+++ hadoop/pig/branches/load-store-redesign/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/CombinerOptimizer.java Fri Oct 30 20:23:54 2009
@@ -406,6 +406,11 @@
             // the reduce side.
             if (pp.getPredecessors(proj) != null) return ExprType.NOT_ALGEBRAIC;
 
+            // Check if it's a projection of bag. Currently we can't use combiner 
+            // for statement like c = foreach b generate group, SUM(a), a; 
+            // where a is a bag.
+            if (proj.getResultType() == DataType.BAG) return ExprType.NOT_ALGEBRAIC;
+            
             // Check to see if this is a projection of the grouping column.
             // If so, it will be a projection of col 0 and will have no
             // predecessors (to avoid things like group.$0, which isn't what we

Modified: hadoop/pig/branches/load-store-redesign/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java?rev=831443&r1=831442&r2=831443&view=diff
==============================================================================
--- hadoop/pig/branches/load-store-redesign/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java (original)
+++ hadoop/pig/branches/load-store-redesign/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java Fri Oct 30 20:23:54 2009
@@ -435,7 +435,8 @@
                 // hadoop from the jobconf.
                 conf.set("pig.storeFunc", ObjectSerializer.serialize(outputFuncSpec.toString()));
                 conf.set(PIG_STORE_CONFIG, 
-                            ObjectSerializer.serialize(new StoreConfig(outputPath, st.getSchema())));
+                            ObjectSerializer.serialize(
+                                    new StoreConfig(outputPath, st.getSchema(), st.getSortInfo())));
                 
                 conf.set("pig.streaming.log.dir", 
                             new Path(outputPath, LOG_DIR).toString());

Modified: hadoop/pig/branches/load-store-redesign/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/Launcher.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/Launcher.java?rev=831443&r1=831442&r2=831443&view=diff
==============================================================================
--- hadoop/pig/branches/load-store-redesign/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/Launcher.java (original)
+++ hadoop/pig/branches/load-store-redesign/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/Launcher.java Fri Oct 30 20:23:54 2009
@@ -60,7 +60,7 @@
     String newLine = "\n";
     boolean pigException = false;
     boolean outOfMemory = false;
-    final String OOM_ERR = "OutOfMemoryError";
+    static final String OOM_ERR = "OutOfMemoryError";
 
     protected List<FileSpec> succeededStores = null;
     protected List<FileSpec> failedStores = null;
@@ -470,7 +470,11 @@
                 		}
             			
                 		//could receive a number format exception here but it will be propagated up the stack                		
-                		int errCode = Integer.parseInt(code);
+                		int errCode;
+                        if (code != null) 
+                            errCode = Integer.parseInt(code);
+                        else
+                            errCode = 2998;
                 		
                 		//create the exception with the message and then set the error code and error source
                 		FuncSpec funcSpec = new FuncSpec(exceptionName, exceptionMessage);		                		
@@ -570,12 +574,15 @@
         String declaringClass = items[0]; 
         //the last member is always the method name
         String methodName = items[items.length - 1];
+        StringBuilder sb = new StringBuilder();
         
         //concatenate the names by adding the dot (.) between the members till the penultimate member
         for(int i = 1; i < items.length - 1; ++i) {
-        	declaringClass += ".";
-        	declaringClass += items[i];
+        	sb.append('.');
+        	sb.append(items[i]);
         }
+
+        declaringClass = sb.toString();
         
         //from the file details extract the file name and the line number
         //PigMapOnly.java:65

Modified: hadoop/pig/branches/load-store-redesign/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java?rev=831443&r1=831442&r2=831443&view=diff
==============================================================================
--- hadoop/pig/branches/load-store-redesign/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java (original)
+++ hadoop/pig/branches/load-store-redesign/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java Fri Oct 30 20:23:54 2009
@@ -32,12 +32,15 @@
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.pig.FuncSpec;
+import org.apache.pig.IndexableLoadFunc;
+import org.apache.pig.LoadFunc;
 import org.apache.pig.PigException;
 import org.apache.pig.PigWarning;
 import org.apache.pig.SamplableLoader;
 import org.apache.pig.builtin.BinStorage;
 import org.apache.pig.data.DataType;
 import org.apache.pig.impl.PigContext;
+import org.apache.pig.impl.builtin.DefaultIndexableLoader;
 import org.apache.pig.impl.builtin.FindQuantiles;
 import org.apache.pig.impl.builtin.PoissonSampleLoader;
 import org.apache.pig.impl.builtin.MergeJoinIndexer;
@@ -1148,92 +1151,131 @@
 	    rightMROpr.requestedParallelism = 1; // we need exactly one reducer for indexing job.        
             
             // At this point, we must be operating on map plan of right input and it would contain nothing else other then a POLoad.
-            POLoad rightLoader = (POLoad)rightMROpr.mapPlan.getRoots().get(0);
-            joinOp.setRightLoaderFuncSpec(rightLoader.getLFile().getFuncSpec());
-
-            // Replace POLoad with  indexer.
-            String[] indexerArgs = new String[3];
-            indexerArgs[0] = rightLoader.getLFile().getFuncSpec().toString();
-             if (! (PigContext.instantiateFuncFromSpec(indexerArgs[0]) instanceof SamplableLoader)){
-                 int errCode = 1104;
-                 String errMsg = "Right input of merge-join must implement SamplableLoader interface. The specified loader " + indexerArgs[0] + " doesn't implement it";
-                 throw new MRCompilerException(errMsg,errCode);
-             }
-            List<PhysicalPlan> rightInpPlans = joinOp.getInnerPlansOf(1);
-            indexerArgs[1] = ObjectSerializer.serialize((Serializable)rightInpPlans);
-            indexerArgs[2] = ObjectSerializer.serialize(rightPipelinePlan);
-            FileSpec lFile = new FileSpec(rightLoader.getLFile().getFileName(),new FuncSpec(MergeJoinIndexer.class.getName(), indexerArgs));
-            rightLoader.setLFile(lFile);
-
-            // Loader of mro will return a tuple of form (key1, key2, ..,filename, offset)
-            // Now set up a POLocalRearrange which has "all" as the key and tuple fetched
-            // by loader as the "value" of POLocalRearrange
-            // Sorting of index can possibly be achieved by using Hadoop sorting between map and reduce instead of Pig doing sort. If that is so, 
-            // it will simplify lot of the code below.
-            
-            PhysicalPlan lrPP = new PhysicalPlan();
-            ConstantExpression ce = new ConstantExpression(new OperatorKey(scope,nig.getNextNodeId(scope)));
-            ce.setValue("all");
-            ce.setResultType(DataType.CHARARRAY);
-            lrPP.add(ce);
-
-            List<PhysicalPlan> lrInnerPlans = new ArrayList<PhysicalPlan>();
-            lrInnerPlans.add(lrPP);
-
-            POLocalRearrange lr = new POLocalRearrange(new OperatorKey(scope,nig.getNextNodeId(scope)));
-            lr.setIndex(0);
-            lr.setKeyType(DataType.CHARARRAY);
-            lr.setPlans(lrInnerPlans);
-            lr.setResultType(DataType.TUPLE);
-            rightMROpr.mapPlan.addAsLeaf(lr);
-
-            rightMROpr.setMapDone(true);
-
-            // On the reduce side of this indexing job, there will be a global rearrange followed by POSort.
-            // Output of POSort will be index file dumped on the DFS.
-
-            // First add POPackage.
-            POPackage pkg = new POPackage(new OperatorKey(scope,nig.getNextNodeId(scope)));
-            pkg.setKeyType(DataType.CHARARRAY);
-            pkg.setNumInps(1); 
-            pkg.setInner(new boolean[]{false});
-            rightMROpr.reducePlan.add(pkg);
-
-            // Next project tuples from the bag created by POPackage.
-            POProject topPrj = new POProject(new OperatorKey(scope,nig.getNextNodeId(scope)));
-            topPrj.setColumn(1);
-            topPrj.setResultType(DataType.TUPLE);
-            topPrj.setOverloaded(true);
-            rightMROpr.reducePlan.add(topPrj);
-            rightMROpr.reducePlan.connect(pkg, topPrj);
-
-            // Now create and add POSort. Sort plan is project *.
-            List<PhysicalPlan> sortPlans = new ArrayList<PhysicalPlan>(1);
-            PhysicalPlan innerSortPlan = new PhysicalPlan();
-            POProject prj = new POProject(new OperatorKey(scope,nig.getNextNodeId(scope)));
-            prj.setStar(true);
-            prj.setOverloaded(false);
-            prj.setResultType(DataType.TUPLE);
-            innerSortPlan.add(prj);
-            sortPlans.add(innerSortPlan);
-
-            // Currently we assume all columns are in asc order.
-            // Add two because filename and offset are added by Indexer in addition to keys.
-            List<Boolean>  mAscCols = new ArrayList<Boolean>(rightInpPlans.size()+2);
-            for(int i=0; i< rightInpPlans.size()+2; i++)
-                mAscCols.add(true);
-
-            POSort sortOp = new POSort(new OperatorKey(scope,nig.getNextNodeId(scope)),1, null, sortPlans, mAscCols, null);
-            rightMROpr.reducePlan.add(sortOp);
-            rightMROpr.reducePlan.connect(topPrj, sortOp);
-
-            POStore st = getStore();
-            FileSpec strFile = getTempFileSpec();
-            st.setSFile(strFile);
-            rightMROpr.reducePlan.addAsLeaf(st);
-            rightMROpr.setReduceDone(true);
+            POLoad rightLoader = (POLoad)rightMROpr.mapPlan.getRoots().get(0);            
+            LoadFunc rightLoadFunc = (LoadFunc) PigContext.instantiateFuncFromSpec(rightLoader.getLFile().getFuncSpec());
+            if(rightLoadFunc instanceof IndexableLoadFunc) {
+                joinOp.setRightLoaderFuncSpec(rightLoader.getLFile().getFuncSpec());
+                joinOp.setRightInputFileName(rightLoader.getLFile().getFileName());
+                rightMROpr = null; // we don't need the right MROper since
+                // the right loader is an IndexableLoadFunc which can handle the index
+                // itself 
+                // validate that the join keys in merge join are only                                                                                                                                                                              
+                // simple column projections or '*' and not expression - expressions                                                                                                                                                               
+                // cannot be handled when the index is built by the storage layer on the sorted                                                                                                                                                    
+                // data when the sorted data (and corresponding index) is written.                                                                                                                                                                 
+                // So merge join will be restricted not have expressions as                                                                                                                                                                        
+                // join keys      
+                int numInputs = mPlan.getPredecessors(joinOp).size(); // should be 2
+                for(int i = 0; i < numInputs; i++) {
+                    List<PhysicalPlan> keyPlans = joinOp.getInnerPlansOf(i);
+                    for (PhysicalPlan keyPlan : keyPlans) {
+                        for(PhysicalOperator op : keyPlan) {
+                            if(!(op instanceof POProject)) {
+                                int errCode = 1106;
+                                String errMsg = "Merge join is possible only for simple column or '*' join keys when using " +
+                                rightLoader.getLFile().getFuncSpec() + " as the loader";
+                                throw new MRCompilerException(errMsg, errCode, PigException.INPUT);
+                            }
+                        }
+                    }
+                }
+            } else {
+                // Replace POLoad with  indexer.
+                String[] indexerArgs = new String[3];
+                FileSpec origRightLoaderFileSpec = rightLoader.getLFile();
+                indexerArgs[0] = origRightLoaderFileSpec.getFuncSpec().toString();
+                 if (! (PigContext.instantiateFuncFromSpec(indexerArgs[0]) instanceof SamplableLoader)){
+                     int errCode = 1104;
+                     String errMsg = "Right input of merge-join must implement SamplableLoader interface. The specified loader " + indexerArgs[0] + " doesn't implement it";
+                     throw new MRCompilerException(errMsg,errCode);
+                 }
+                List<PhysicalPlan> rightInpPlans = joinOp.getInnerPlansOf(1);
+                indexerArgs[1] = ObjectSerializer.serialize((Serializable)rightInpPlans);
+                indexerArgs[2] = ObjectSerializer.serialize(rightPipelinePlan);
+                FileSpec lFile = new FileSpec(rightLoader.getLFile().getFileName(),new FuncSpec(MergeJoinIndexer.class.getName(), indexerArgs));
+                rightLoader.setLFile(lFile);
+    
+                // Loader of mro will return a tuple of form (key1, key2, ..,filename, offset)
+                // Now set up a POLocalRearrange which has "all" as the key and tuple fetched
+                // by loader as the "value" of POLocalRearrange
+                // Sorting of index can possibly be achieved by using Hadoop sorting between map and reduce instead of Pig doing sort. If that is so, 
+                // it will simplify lot of the code below.
+                
+                PhysicalPlan lrPP = new PhysicalPlan();
+                ConstantExpression ce = new ConstantExpression(new OperatorKey(scope,nig.getNextNodeId(scope)));
+                ce.setValue("all");
+                ce.setResultType(DataType.CHARARRAY);
+                lrPP.add(ce);
+    
+                List<PhysicalPlan> lrInnerPlans = new ArrayList<PhysicalPlan>();
+                lrInnerPlans.add(lrPP);
+    
+                POLocalRearrange lr = new POLocalRearrange(new OperatorKey(scope,nig.getNextNodeId(scope)));
+                lr.setIndex(0);
+                lr.setKeyType(DataType.CHARARRAY);
+                lr.setPlans(lrInnerPlans);
+                lr.setResultType(DataType.TUPLE);
+                rightMROpr.mapPlan.addAsLeaf(lr);
+    
+                rightMROpr.setMapDone(true);
+    
+                // On the reduce side of this indexing job, there will be a global rearrange followed by POSort.
+                // Output of POSort will be index file dumped on the DFS.
+    
+                // First add POPackage.
+                POPackage pkg = new POPackage(new OperatorKey(scope,nig.getNextNodeId(scope)));
+                pkg.setKeyType(DataType.CHARARRAY);
+                pkg.setNumInps(1); 
+                pkg.setInner(new boolean[]{false});
+                rightMROpr.reducePlan.add(pkg);
+    
+                // Next project tuples from the bag created by POPackage.
+                POProject topPrj = new POProject(new OperatorKey(scope,nig.getNextNodeId(scope)));
+                topPrj.setColumn(1);
+                topPrj.setResultType(DataType.TUPLE);
+                topPrj.setOverloaded(true);
+                rightMROpr.reducePlan.add(topPrj);
+                rightMROpr.reducePlan.connect(pkg, topPrj);
+    
+                // Now create and add POSort. Sort plan is project *.
+                List<PhysicalPlan> sortPlans = new ArrayList<PhysicalPlan>(1);
+                PhysicalPlan innerSortPlan = new PhysicalPlan();
+                POProject prj = new POProject(new OperatorKey(scope,nig.getNextNodeId(scope)));
+                prj.setStar(true);
+                prj.setOverloaded(false);
+                prj.setResultType(DataType.TUPLE);
+                innerSortPlan.add(prj);
+                sortPlans.add(innerSortPlan);
+    
+                // Currently we assume all columns are in asc order.
+                // Add two because filename and offset are added by Indexer in addition to keys.
+                List<Boolean>  mAscCols = new ArrayList<Boolean>(rightInpPlans.size()+2);
+                for(int i=0; i< rightInpPlans.size()+2; i++)
+                    mAscCols.add(true);
+    
+                POSort sortOp = new POSort(new OperatorKey(scope,nig.getNextNodeId(scope)),1, null, sortPlans, mAscCols, null);
+                rightMROpr.reducePlan.add(sortOp);
+                rightMROpr.reducePlan.connect(topPrj, sortOp);
+    
+                POStore st = getStore();
+                FileSpec strFile = getTempFileSpec();
+                st.setSFile(strFile);
+                rightMROpr.reducePlan.addAsLeaf(st);
+                rightMROpr.setReduceDone(true);
+                
+                // set up the DefaultIndexableLoader for the join operator
+                String[] defaultIndexableLoaderArgs = new String[4];
+                defaultIndexableLoaderArgs[0] = origRightLoaderFileSpec.getFuncSpec().toString();
+                defaultIndexableLoaderArgs[1] = strFile.getFileName();
+                defaultIndexableLoaderArgs[2] = strFile.getFuncSpec().toString();
+                defaultIndexableLoaderArgs[3] = joinOp.getOperatorKey().scope;
+                joinOp.setRightLoaderFuncSpec((new FuncSpec(DefaultIndexableLoader.class.getName(), defaultIndexableLoaderArgs)));
+                joinOp.setRightInputFileName(origRightLoaderFileSpec.getFileName());    
+                 
+            }
+            
    
-            joinOp.setIndexFile(strFile);
+//            joinOp.setIndexFile(strFile);
             
             // We are done with right side. Lets work on left now.
             // Join will be materialized in leftMROper.
@@ -1254,9 +1296,10 @@
                 String msg = "Both map and reduce phases have been done. This is unexpected while compiling.";
                 throw new PlanException(msg, errCode, PigException.BUG);
             }
-
-            // We want to ensure indexing job runs prior to actual join job. So, connect them in order.
-            MRPlan.connect(rightMROpr, curMROp);
+            if(rightMROpr != null) {
+                // We want to ensure indexing job runs prior to actual join job. So, connect them in order.
+                MRPlan.connect(rightMROpr, curMROp);
+            }
             phyToMROpMap.put(joinOp, curMROp);
         }
         catch(PlanException e){

Modified: hadoop/pig/branches/load-store-redesign/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReducePOStoreImpl.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReducePOStoreImpl.java?rev=831443&r1=831442&r2=831443&view=diff
==============================================================================
--- hadoop/pig/branches/load-store-redesign/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReducePOStoreImpl.java (original)
+++ hadoop/pig/branches/load-store-redesign/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReducePOStoreImpl.java Fri Oct 30 20:23:54 2009
@@ -33,6 +33,7 @@
 import org.apache.hadoop.mapreduce.TaskAttemptContext;
 import org.apache.hadoop.util.ReflectionUtils;
 import org.apache.pig.ResourceSchema;
+import org.apache.pig.SortInfo;
 import org.apache.pig.StoreConfig;
 import org.apache.pig.StoreFunc;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStoreImpl;
@@ -73,7 +74,7 @@
     
     @SuppressWarnings("unchecked")
     @Override
-    public StoreFunc createStoreFunc(FileSpec sFile, Schema schema) 
+    public StoreFunc createStoreFunc(FileSpec sFile, Schema schema, SortInfo sortInfo) 
         throws IOException {
 
         Configuration outputConf = context.getConfiguration();

Modified: hadoop/pig/branches/load-store-redesign/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigMapReduce.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigMapReduce.java?rev=831443&r1=831442&r2=831443&view=diff
==============================================================================
--- hadoop/pig/branches/load-store-redesign/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigMapReduce.java (original)
+++ hadoop/pig/branches/load-store-redesign/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigMapReduce.java Fri Oct 30 20:23:54 2009
@@ -299,8 +299,6 @@
                     rp.explain(baos);
                     log.debug(baos.toString());
                 }
-                // till here
-                
                 pigReporter = new ProgressableReporter();
                 if(!(rp.isEmpty())) {
                     roots = rp.getRoots().toArray(new PhysicalOperator[1]);

Modified: hadoop/pig/branches/load-store-redesign/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigOutputCommitter.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigOutputCommitter.java?rev=831443&r1=831442&r2=831443&view=diff
==============================================================================
--- hadoop/pig/branches/load-store-redesign/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigOutputCommitter.java (original)
+++ hadoop/pig/branches/load-store-redesign/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigOutputCommitter.java Fri Oct 30 20:23:54 2009
@@ -58,7 +58,6 @@
     List<POStore> reduceStores;
     
     /**
-     * @param outputPath
      * @param context
      * @throws IOException
      */
@@ -110,14 +109,11 @@
      */
     @Override
     public void cleanupJob(JobContext context) throws IOException {
-        System.err.println("XXX: IN CLEANUPJOB");
         // call clean up on all map and reduce committers
         for (OutputCommitter mapCommitter : mapOutputCommitters) {
-            System.err.println("XXX: calling cleanup on map committers");
             mapCommitter.cleanupJob(context);
         }
         for (OutputCommitter reduceCommitter : reduceOutputCommitters) {
-            System.err.println("XXX: calling cleanup on reduce committers");
             reduceCommitter.cleanupJob(context);
         }
        
@@ -130,12 +126,10 @@
     public void abortTask(TaskAttemptContext context) throws IOException {
         if(context.getTaskAttemptID().isMap()) {
             for (OutputCommitter mapCommitter : mapOutputCommitters) {
-                System.err.println("XXX: calling abort task on map committers");
                 mapCommitter.abortTask(context);
             } 
         } else {
             for (OutputCommitter reduceCommitter : reduceOutputCommitters) {
-                System.err.println("XXX: calling abort task on reduce committers");
                 reduceCommitter.abortTask(context);
             } 
         }
@@ -148,12 +142,10 @@
     public void commitTask(TaskAttemptContext context) throws IOException {
         if(context.getTaskAttemptID().isMap()) {
             for (OutputCommitter mapCommitter : mapOutputCommitters) {
-                System.err.println("XXX: calling commitTask on map committers");
                 mapCommitter.commitTask(context);
             } 
         } else {
             for (OutputCommitter reduceCommitter : reduceOutputCommitters) {
-                System.err.println("XXX: calling commitTask on reduce committers");
                 reduceCommitter.commitTask(context);
             } 
         }
@@ -168,17 +160,13 @@
         boolean needCommit = false;
         if(context.getTaskAttemptID().isMap()) {
             for (OutputCommitter mapCommitter : mapOutputCommitters) {
-                System.err.println("XXX: calling needsTaskCommit on map committers");
                 needCommit = needCommit || mapCommitter.needsTaskCommit(context);
             } 
-            System.err.println("XXX: returning " + needCommit + " for needscommmut");
             return needCommit;
         } else {
             for (OutputCommitter reduceCommitter : reduceOutputCommitters) {
-                System.err.println("XXX: calling needsTaskCommit on reduce committers");
                 needCommit = needCommit || reduceCommitter.needsTaskCommit(context);
             } 
-            System.err.println("XXX: returning " + needCommit + " for needscommmut");
             return needCommit;
         }
     }
@@ -188,14 +176,11 @@
      */
     @Override
     public void setupJob(JobContext context) throws IOException {
-        System.err.println("XXX: in SETUPJOB");
         // call set up on all map and reduce committers
         for (OutputCommitter mapCommitter : mapOutputCommitters) {
-            System.err.println("XXX: calling setup on map committers");
             mapCommitter.setupJob(context);
         }
         for (OutputCommitter reduceCommitter : reduceOutputCommitters) {
-            System.err.println("XXX: calling setup on reduce committers");
             reduceCommitter.setupJob(context);
         }
     }
@@ -207,15 +192,12 @@
     public void setupTask(TaskAttemptContext context) throws IOException {
         if(context.getTaskAttemptID().isMap()) {
             for (OutputCommitter mapCommitter : mapOutputCommitters) {
-                System.err.println("XXX: calling setupTask on map committers");
                 mapCommitter.setupTask(context);
             } 
         } else {
             for (OutputCommitter reduceCommitter : reduceOutputCommitters) {
-                System.err.println("XXX: calling setupTask on reduce committers");
                 reduceCommitter.setupTask(context);
             } 
         }
     }
 }
-

Modified: hadoop/pig/branches/load-store-redesign/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigOutputFormat.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigOutputFormat.java?rev=831443&r1=831442&r2=831443&view=diff
==============================================================================
--- hadoop/pig/branches/load-store-redesign/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigOutputFormat.java (original)
+++ hadoop/pig/branches/load-store-redesign/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigOutputFormat.java Fri Oct 30 20:23:54 2009
@@ -31,6 +31,7 @@
 import org.apache.pig.StoreFunc;
 import org.apache.pig.backend.hadoop.datastorage.ConfigurationUtil;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStore;
+import org.apache.pig.backend.hadoop.executionengine.util.MapRedUtil;
 import org.apache.pig.data.Tuple;
 import org.apache.pig.impl.util.ObjectSerializer;
 

Modified: hadoop/pig/branches/load-store-redesign/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/plans/DotMRPrinter.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/plans/DotMRPrinter.java?rev=831443&r1=831442&r2=831443&view=diff
==============================================================================
--- hadoop/pig/branches/load-store-redesign/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/plans/DotMRPrinter.java (original)
+++ hadoop/pig/branches/load-store-redesign/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/plans/DotMRPrinter.java Fri Oct 30 20:23:54 2009
@@ -187,7 +187,7 @@
         @Override
         protected String[] getAttributes(InnerOperator op) {
             String[] attributes = new String[3];
-            attributes[0] = "label=\""+getName(op)+"\"";
+            attributes[0] = "label=\""+super.getName(op)+"\"";
             attributes[1] = "style=\"filled\"";
             attributes[2] = "fillcolor=\"white\"";
             return attributes;

Modified: hadoop/pig/branches/load-store-redesign/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/LogToPhyTranslationVisitor.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/LogToPhyTranslationVisitor.java?rev=831443&r1=831442&r2=831443&view=diff
==============================================================================
--- hadoop/pig/branches/load-store-redesign/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/LogToPhyTranslationVisitor.java (original)
+++ hadoop/pig/branches/load-store-redesign/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/LogToPhyTranslationVisitor.java Fri Oct 30 20:23:54 2009
@@ -33,6 +33,7 @@
 import org.apache.pig.FuncSpec;
 import org.apache.pig.LoadFunc;
 import org.apache.pig.PigException;
+import org.apache.pig.SortInfo;
 import org.apache.pig.data.DataBag;
 import org.apache.pig.data.DataType;
 import org.apache.pig.data.NonSpillableDataBag;
@@ -1365,6 +1366,11 @@
         }
 
         sort.setResultType(s.getType());
+        try {
+            sort.setSortInfo(s.getSortInfo());
+        } catch (FrontendException e) {
+            throw new LogicalToPhysicalTranslatorException(e);
+        }
 
     }
 
@@ -1556,6 +1562,7 @@
     @Override
     public void visit(LOStore loStore) throws VisitorException {
         String scope = loStore.getOperatorKey().scope;
+        
         POStore store = new POStore(new OperatorKey(scope, nodeGen
                 .getNextNodeId(scope)));
         store.setSFile(loStore.getOutputFile());
@@ -1581,6 +1588,19 @@
         
         if(op != null) {
             from = logToPhyMap.get(op.get(0));
+            SortInfo sortInfo = null;
+            // if store's predecessor is limit,
+            // check limit's predecessor
+            if(op.get(0) instanceof LOLimit) {
+                op = loStore.getPlan().getPredecessors(op.get(0));
+            }
+            PhysicalOperator sortPhyOp = logToPhyMap.get(op.get(0));
+            // if this predecessor is a sort, get
+            // the sort info.
+            if(op.get(0) instanceof LOSort) {
+                sortInfo = ((POSort)sortPhyOp).getSortInfo();
+            }
+            store.setSortInfo(sortInfo);
         } else {
             int errCode = 2051;
             String msg = "Did not find a predecessor for Store." ;

Modified: hadoop/pig/branches/load-store-redesign/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/PhysicalOperator.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/PhysicalOperator.java?rev=831443&r1=831442&r2=831443&view=diff
==============================================================================
--- hadoop/pig/branches/load-store-redesign/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/PhysicalOperator.java (original)
+++ hadoop/pig/branches/load-store-redesign/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/PhysicalOperator.java Fri Oct 30 20:23:54 2009
@@ -276,7 +276,7 @@
     }
 
     public Result getNext(DataBag db) throws ExecException {
-        Result ret = new Result();
+        Result ret = null;
         DataBag tmpBag = BagFactory.getInstance().newDefaultBag();
         for(ret = getNext(dummyTuple);ret.returnStatus!=POStatus.STATUS_EOP;ret=getNext(dummyTuple)){
             if(ret.returnStatus == POStatus.STATUS_ERR) return ret;

Modified: hadoop/pig/branches/load-store-redesign/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/plans/PhysicalPlan.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/plans/PhysicalPlan.java?rev=831443&r1=831442&r2=831443&view=diff
==============================================================================
--- hadoop/pig/branches/load-store-redesign/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/plans/PhysicalPlan.java (original)
+++ hadoop/pig/branches/load-store-redesign/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/plans/PhysicalPlan.java Fri Oct 30 20:23:54 2009
@@ -257,13 +257,13 @@
                 new ArrayList<PhysicalOperator>(inputs.size());
             PhysicalOperator cloneOp = matches.get(op);
             if (cloneOp == null) {
-                String msg = "Unable to find clone for op " + cloneOp.name();
+                String msg = "Unable to find clone for op " + op.name();
                 throw new CloneNotSupportedException(msg);
             }
             for (PhysicalOperator iOp : inputs) {
                 PhysicalOperator cloneIOp = matches.get(iOp);
                 if (cloneIOp == null) {
-                    String msg = "Unable to find clone for op " + cloneIOp.name();
+                    String msg = "Unable to find clone for op " + iOp.name();
                     throw new CloneNotSupportedException(msg);
                 }
                 newInputs.add(cloneIOp);

Modified: hadoop/pig/branches/load-store-redesign/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POCombinerPackage.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POCombinerPackage.java?rev=831443&r1=831442&r2=831443&view=diff
==============================================================================
--- hadoop/pig/branches/load-store-redesign/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POCombinerPackage.java (original)
+++ hadoop/pig/branches/load-store-redesign/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POCombinerPackage.java Fri Oct 30 20:23:54 2009
@@ -25,6 +25,7 @@
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigMapReduce;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.POStatus;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.Result;
@@ -32,6 +33,7 @@
 import org.apache.pig.data.BagFactory;
 import org.apache.pig.data.DataBag;
 import org.apache.pig.data.DataType;
+import org.apache.pig.data.InternalCachedBag;
 import org.apache.pig.data.NonSpillableDataBag;
 import org.apache.pig.data.Tuple;
 import org.apache.pig.data.TupleFactory;
@@ -120,13 +122,25 @@
         keyLookup = lrKeyInfo.second;
     }
 
+    private DataBag createDataBag() {
+    	String bagType = null;
+        if (PigMapReduce.sJobConf != null) {
+   			bagType = PigMapReduce.sJobConf.get("pig.cachedbag.type");       			
+   	    }
+                		          	           		
+    	if (bagType != null && bagType.equalsIgnoreCase("default")) {
+    		return new NonSpillableDataBag();
+    	}
+    	return new InternalCachedBag();  	
+    }
+    
     @Override
     public Result getNext(Tuple t) throws ExecException {
         int keyField = -1;
         //Create numInputs bags
         Object[] fields = new Object[mBags.length];
         for (int i = 0; i < mBags.length; i++) {
-            if (mBags[i]) fields[i] = new NonSpillableDataBag();
+            if (mBags[i]) fields[i] = createDataBag();            
         }
         
         // For each indexed tup in the inp, split them up and place their

Modified: hadoop/pig/branches/load-store-redesign/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/PODistinct.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/PODistinct.java?rev=831443&r1=831442&r2=831443&view=diff
==============================================================================
--- hadoop/pig/branches/load-store-redesign/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/PODistinct.java (original)
+++ hadoop/pig/branches/load-store-redesign/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/PODistinct.java Fri Oct 30 20:23:54 2009
@@ -24,6 +24,7 @@
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigMapReduce;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.POStatus;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.Result;
@@ -31,6 +32,8 @@
 import org.apache.pig.data.BagFactory;
 import org.apache.pig.data.DataBag;
 import org.apache.pig.data.DataType;
+import org.apache.pig.data.InternalDistinctBag;
+import org.apache.pig.data.InternalSortedBag;
 import org.apache.pig.data.Tuple;
 import org.apache.pig.impl.plan.NodeIdGenerator;
 import org.apache.pig.impl.plan.OperatorKey;
@@ -73,9 +76,21 @@
 
     @Override
     public Result getNext(Tuple t) throws ExecException {
-        if (!inputsAccumulated) {
-            Result in = processInput();
-            distinctBag = BagFactory.getInstance().newDistinctBag();
+         if (!inputsAccumulated) {
+            Result in = processInput();    
+            
+            // by default, we create InternalSortedBag, unless user configures
+			// explicitly to use old bag
+           	String bagType = null;
+            if (PigMapReduce.sJobConf != null) {
+       			bagType = PigMapReduce.sJobConf.get("pig.cachedbag.distinct.type");       			
+       	    }            
+            if (bagType != null && bagType.equalsIgnoreCase("default")) {        	    	
+            	distinctBag = BagFactory.getInstance().newDistinctBag();    			
+       	    } else {
+       	    	distinctBag = new InternalDistinctBag(3);
+    	    }
+            
             while (in.returnStatus != POStatus.STATUS_EOP) {
                 if (in.returnStatus == POStatus.STATUS_ERR) {
                     log.error("Error in reading from inputs");

Modified: hadoop/pig/branches/load-store-redesign/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POFRJoin.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POFRJoin.java?rev=831443&r1=831442&r2=831443&view=diff
==============================================================================
--- hadoop/pig/branches/load-store-redesign/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POFRJoin.java (original)
+++ hadoop/pig/branches/load-store-redesign/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POFRJoin.java Fri Oct 30 20:23:54 2009
@@ -95,22 +95,6 @@
     private transient BagFactory mBagFactory;
     private boolean setUp;
     
-    public POFRJoin(OperatorKey k) throws PlanException, ExecException {
-        this(k,-1,null, null, null, null, -1);
-    }
-
-    public POFRJoin(OperatorKey k, int rp) throws PlanException, ExecException {
-        this(k, rp, null, null, null, null, -1);
-    }
-
-    public POFRJoin(OperatorKey k, List<PhysicalOperator> inp) throws PlanException, ExecException {
-        this(k, -1, inp, null, null, null, -1);
-    }
-
-    public POFRJoin(OperatorKey k, int rp, List<PhysicalOperator> inp) throws PlanException, ExecException {
-        this(k,rp,inp,null, null, null, -1);
-    }
-    
     public POFRJoin(OperatorKey k, int rp, List<PhysicalOperator> inp, List<List<PhysicalPlan>> ppLists, List<List<Byte>> keyTypes, FileSpec[] replFiles, int fragment) throws ExecException{
         super(k,rp,inp);