You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by ol...@apache.org on 2009/07/30 02:30:26 UTC

svn commit: r799141 [2/2] - in /hadoop/pig/trunk: ./ src/org/apache/pig/ src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/ src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/partitioners/ src/org/apache/pig/backend/hadoop/...

Added: hadoop/pig/trunk/src/org/apache/pig/impl/builtin/PartitionSkewedKeys.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/impl/builtin/PartitionSkewedKeys.java?rev=799141&view=auto
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/impl/builtin/PartitionSkewedKeys.java (added)
+++ hadoop/pig/trunk/src/org/apache/pig/impl/builtin/PartitionSkewedKeys.java Thu Jul 30 00:30:25 2009
@@ -0,0 +1,322 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.impl.builtin;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.pig.EvalFunc;
+import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.data.BagFactory;
+import org.apache.pig.data.DataBag;
+import org.apache.pig.data.DataType;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.data.TupleFactory;
+import org.apache.pig.impl.io.FileLocalizer;
+import org.apache.pig.impl.util.Pair;
+
+/**
+ * Partition reducers for skewed keys. This is used in skewed join during
+ * sampling process. It figures out how many reducers required to process a
+ * skewed key without causing spill and allocate this number of reducers to this
+ * key. This UDF outputs a map which contains 2 keys:
+ * 
+ * <li>&quot;totalreducers&quot;: the value is an integer wich indicates the
+ *         number of total reducers for this join job </li>
+ * <li>&quot;partition.list&quot;: the value is a bag which contains a
+ * list of tuples with each tuple representing partitions for a skewed key.
+ * The tuple has format of &lt;join key&gt;,&lt;min index of reducer&gt;, 
+ * &lt;max index of reducer&gt; </li>
+ * 
+ * For example, a join job configures 10 reducers, and the sampling process 
+ * finds out 2 skewed keys, &quot;swpv&quot; needs 4 reducers and &quot;swps&quot;
+ * needs 2 reducers. The output file would be like following:
+ * 
+ * {totalreducers=10, partition.list={(swpv,0,3), (swps,4,5)}}
+ *
+ * The name of this file is set into next MR job which does the actual join. 
+ * That job uses this information to partition skewed keys properly
+ * 
+ */
+
+public class PartitionSkewedKeys extends EvalFunc<Map<String, Object>> {
+
+	public static final String PARTITION_LIST = "partition.list";
+
+	public static final String TOTAL_REDUCERS = "totalreducers";
+
+	private Log log = LogFactory.getLog(getClass());
+
+	BagFactory mBagFactory = BagFactory.getInstance();
+
+	TupleFactory mTupleFactory = TupleFactory.getInstance();
+
+	private int currentIndex_;
+
+	private int totalReducers_;
+
+	private long totalMemory_;
+
+	private String inputFile_;
+
+	private long inputFileSize_;
+
+	private long totalSampleCount_;
+
+	private double heapPercentage_;
+	
+    // specify how many tuple a reducer can hold for a key
+    // this is for testing purpose. If not specified, then
+    // it is calculated based on memory size and size of tuple
+	private int tupleMCount_; 
+
+	public PartitionSkewedKeys() {
+		this(null);
+	}
+
+	public PartitionSkewedKeys(String[] args) {
+		totalReducers_ = -1;
+		currentIndex_ = 0;
+		inputFileSize_ = -1;
+
+		if (args != null && args.length > 0) {
+			heapPercentage_ = Double.parseDouble(args[0]);
+			tupleMCount_ = Integer.parseInt(args[1]);
+			inputFile_ = args[2];			
+		} else {
+			heapPercentage_ = 0.5;
+		}
+		
+		if (log.isDebugEnabled()) {
+			log.debug("pig.skewedjoin.reduce.memusage=" + heapPercentage_);
+			log.debug("input file: " + inputFile_);
+		}
+		
+		log.info("input file: " + inputFile_);
+
+	}
+
+	/**
+	 * first field in the input tuple is the number of reducers
+	 * 
+	 * second field is the *sorted* bag of samples
+	 */
+	public Map<String, Object> exec(Tuple in) throws IOException {
+		// get size of input file in bytes
+		if (inputFileSize_ == -1) {
+			try {
+				inputFileSize_ = FileLocalizer.getSize(inputFile_);
+			} catch (Exception e) {
+				throw new RuntimeException(e);
+			}
+		}
+
+		Map<String, Object> output = new HashMap<String, Object>();
+
+		if (in == null || in.size() == 0) {			
+			return null;
+		}
+
+		totalMemory_ = (long) (Runtime.getRuntime().maxMemory() * heapPercentage_);
+		log.info("Maximum of available memory is " + totalMemory_);
+
+		ArrayList<Tuple> reducerList = new ArrayList<Tuple>();
+
+		Tuple currentTuple = null;
+		long count = 0;
+		long totalMSize = 0;
+		long totalDSize = 0;
+		try {
+			totalReducers_ = (Integer) in.get(0);
+			DataBag samples = (DataBag) in.get(1);
+
+			totalSampleCount_ = samples.size();
+			
+			log.info("inputFileSize: " + inputFileSize_);
+			log.info("totalSample: " + totalSampleCount_);
+			log.info("totalReducers: " + totalReducers_);			
+
+			int maxReducers = 0;
+			Iterator<Tuple> iter = samples.iterator();
+			while (iter.hasNext()) {
+				Tuple t = iter.next();
+				if (hasSameKey(currentTuple, t) || currentTuple == null) {
+					count++;
+					totalMSize += getMemorySize(t);
+					totalDSize += getDiskSize(t);
+				} else {
+					Pair<Tuple, Integer> p = calculateReducers(currentTuple,
+							count, totalMSize, totalDSize);
+					Tuple rt = p.first;
+					if (rt != null) {
+						reducerList.add(rt);
+					}
+					if (maxReducers < p.second) {
+						maxReducers = p.second;
+					}
+					count = 1;
+					totalMSize = getMemorySize(t);
+					totalDSize = getDiskSize(t);
+				}
+
+				currentTuple = t;
+			}
+
+			// add last key
+			if (count > 0) {
+				Pair<Tuple, Integer> p = calculateReducers(currentTuple, count,
+						totalMSize, totalDSize);
+				Tuple rt = p.first;
+				if (rt != null) {
+					reducerList.add(rt);
+				}
+				if (maxReducers < p.second) {
+					maxReducers = p.second;
+				}
+			}
+
+			if (maxReducers > totalReducers_) {
+				throw new RuntimeException("You need at least " + maxReducers
+						+ " reducers to run this job.");
+			}
+
+			output.put(PARTITION_LIST, mBagFactory.newDefaultBag(reducerList));
+			output.put(TOTAL_REDUCERS, Integer.valueOf(totalReducers_));
+			
+			log.info(output.toString());
+			if (log.isDebugEnabled()) {
+				log.debug(output.toString());
+			}
+
+			return output;
+		} catch (Exception e) {
+			e.printStackTrace();
+			throw new RuntimeException(e);
+		}
+	}
+
+	private Pair<Tuple, Integer> calculateReducers(Tuple currentTuple,
+			long count, long totalMSize, long totalDSize) {
+		// get average memory size per tuple
+		double avgM = totalMSize / (double) count;
+		// get average disk size per tuple
+		double avgD = totalDSize / (double) count;
+
+		// get the number of tuples that can fit into memory
+		long tupleMCount = (tupleMCount_ <= 0)?(long) (totalMemory_ / avgM): tupleMCount_;
+
+		// get the number of total tuples for this key
+		long tupleCount = (long) (((double) count) / totalSampleCount_
+				* inputFileSize_ / avgD);	
+
+
+		int redCount = (int) Math.round(Math.ceil((double) tupleCount
+				/ tupleMCount));
+
+		if (log.isDebugEnabled()) 
+		{
+			log.debug("avgM: " + avgM);
+			log.debug("avgD: " + avgD);
+			log.debug("count: " + count);
+			log.debug("A reducer can take " + tupleMCount + " tuples and "
+					+ tupleCount + " tuples are find for " + currentTuple);
+			log.debug("key " + currentTuple + " need " + redCount + " reducers");
+		}
+
+		// this is not a skewed key
+		if (redCount == 1) {
+			return new Pair<Tuple, Integer>(null, 1);
+		}
+
+		Tuple t = this.mTupleFactory.newTuple(currentTuple.size());
+		int i = 0;
+		try {
+			// set keys
+			for (; i < currentTuple.size() - 2; i++) {
+				t.set(i, currentTuple.get(i));
+			}
+
+			// set the min index of reducer for this key
+			t.set(i++, currentIndex_);
+			currentIndex_ = (currentIndex_ + redCount) % totalReducers_ - 1;
+			if (currentIndex_ < 0) {
+				currentIndex_ += totalReducers_;
+			}
+			// set the max index of reducer for this key
+			t.set(i++, currentIndex_);
+		} catch (ExecException e) {
+			throw new RuntimeException("Failed to set value to tuple." + e);
+		}
+
+		currentIndex_ = (currentIndex_ + 1) % totalReducers_;
+
+		Pair<Tuple, Integer> p = new Pair<Tuple, Integer>(t, redCount);
+
+		return p;
+	}
+
+	// the last field of the tuple is a tuple for memory size and disk size
+	private long getMemorySize(Tuple t) {
+		int s = t.size();
+		try {
+			return (Long) t.get(s - 2);
+		} catch (ExecException e) {
+			throw new RuntimeException(
+					"Unable to retrive the size field from tuple.", e);
+		}
+	}
+
+	// the last field of the tuple is a tuple for memory size and disk size
+	private long getDiskSize(Tuple t) {
+		int s = t.size();
+		try {
+			return (Long) t.get(s - 1);
+		} catch (ExecException e) {
+			throw new RuntimeException(
+					"Unable to retrive the size field from tuple.", e);
+		}
+	}
+
+	private boolean hasSameKey(Tuple t1, Tuple t2) {
+		// Have to break the tuple down and compare it field to field.
+		int sz1 = t1 == null ? 0 : t1.size();
+		int sz2 = t2 == null ? 0 : t2.size();
+		if (sz2 != sz1) {
+			return false;
+		}
+
+		for (int i = 0; i < sz1 - 2; i++) {
+			try {
+				int c = DataType.compare(t1.get(i), t2.get(i));
+				if (c != 0) {
+					return false;
+				}
+			} catch (ExecException e) {
+				throw new RuntimeException("Unable to compare tuples", e);
+			}
+		}
+
+		return true;
+	}
+
+}

Modified: hadoop/pig/trunk/src/org/apache/pig/impl/builtin/RandomSampleLoader.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/impl/builtin/RandomSampleLoader.java?rev=799141&r1=799140&r2=799141&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/impl/builtin/RandomSampleLoader.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/impl/builtin/RandomSampleLoader.java Thu Jul 30 00:30:25 2009
@@ -15,9 +15,9 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.pig.impl.builtin;
-
-import java.io.IOException;
+package org.apache.pig.impl.builtin;
+
+import java.io.IOException;
 import java.util.Map;
 
 import org.apache.pig.ExecType;
@@ -26,20 +26,22 @@
 import org.apache.pig.backend.datastorage.DataStorage;
 import org.apache.pig.data.DataBag;
 import org.apache.pig.data.Tuple;
+import org.apache.pig.data.TupleFactory;
 import org.apache.pig.impl.PigContext;
 import org.apache.pig.impl.io.BufferedPositionedInputStream;
 import org.apache.pig.impl.logicalLayer.schema.Schema;
-
-
+
+
 /**
  * A loader that samples the data.  This loader can subsume loader that
  * can handle starting in the middle of a record.  Loaders that can
  * handle this should implement the SamplableLoader interface.
  */
 public class RandomSampleLoader implements LoadFunc {
-    
+    
     private int numSamples;
-    private long skipInterval;
+    private long skipInterval;    
+	private TupleFactory factory;
     private SamplableLoader loader;
     
     /**
@@ -54,21 +56,27 @@
         loader = (SamplableLoader)PigContext.instantiateFuncFromSpec(funcSpec);
         numSamples = Integer.valueOf(ns);
     }
-    
-    @Override
-    public void bindTo(String fileName, BufferedPositionedInputStream is, long offset, long end) throws IOException {
-        skipInterval = (end - offset)/numSamples;
+    
+
+    public void bindTo(String fileName, BufferedPositionedInputStream is, long offset, long end) throws IOException {        
+    	skipInterval = (end - offset)/numSamples;
         loader.bindTo(fileName, is, offset, end);
-    }
-    
-    @Override
-    public Tuple getNext() throws IOException {
+    }
+    
+
+    public Tuple getNext() throws IOException {
         long initialPos = loader.getPosition();
-        Tuple t = loader.getSampledTuple();
+        
+        // make sure we move to a boundry of a record
+        Tuple t = loader.getSampledTuple();        
+        long middlePos = loader.getPosition();
+        
+        // we move to next boundry
+        t = loader.getSampledTuple();        
         long finalPos = loader.getPosition();
-        
-        long toSkip = skipInterval - (finalPos - initialPos);
-        if (toSkip > 0) {
+        
+        long toSkip = skipInterval - (finalPos - initialPos);
+        if (toSkip > 0) {
             long rc = loader.skip(toSkip);
             
             // if we did not skip enough
@@ -84,13 +92,31 @@
                 }
                 remainingSkip -= rc;
             }
-        }
-        return t;
-    }
-    
+        }       
+        
+        if (t == null) {
+        	return null;
+        }
+        
+        if (factory == null) {
+        	factory = TupleFactory.getInstance();
+        }
+
+        // copy existing field 
+        Tuple m = factory.newTuple(t.size()+1);
+        for(int i=0; i<t.size(); i++) {
+        	m.set(i, t.get(i));
+        }
+        
+        // add size of the tuple at the end
+        m.set(t.size(), (finalPos-middlePos));
+        
+        return m;
+    }
+    
     public Integer bytesToInteger(byte[] b) throws IOException {
         return loader.bytesToInteger(b);
-    }
+    }
 
     public Long bytesToLong(byte[] b) throws IOException {
         return loader.bytesToLong(b);
@@ -130,4 +156,4 @@
             DataStorage storage) throws IOException {
         return loader.determineSchema(fileName, execType, storage);
     }
-}
\ No newline at end of file
+}

Added: hadoop/pig/trunk/src/org/apache/pig/impl/builtin/TupleSize.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/impl/builtin/TupleSize.java?rev=799141&view=auto
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/impl/builtin/TupleSize.java (added)
+++ hadoop/pig/trunk/src/org/apache/pig/impl/builtin/TupleSize.java Thu Jul 30 00:30:25 2009
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.impl.builtin;
+
+import java.io.IOException;
+import java.lang.reflect.Type;
+
+import org.apache.pig.EvalFunc;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.data.TupleFactory;
+
+/**
+ * UDF to get memory and disk size of a tuple.
+ * It is used by skewed join.
+ * 
+ */
+
+public class TupleSize extends EvalFunc<Tuple>{          
+
+	private TupleFactory factory;
+	
+    public TupleSize() {
+    	factory = TupleFactory.getInstance();
+    }      
+
+    /**
+     * Get memory size and disk size of input tuple
+     */    
+    public Tuple exec(Tuple in) throws IOException {
+    	if (in == null) {
+    		return null;
+    	}
+    	
+    	Tuple t = factory.newTuple(2);
+    	t.set(0, in.getMemorySize());
+    	t.set(1, in.get(in.size()-1));
+    	    	
+    	return t;
+    }
+    
+    public Type getReturnType(){
+        return Tuple.class;
+    }       
+}

Modified: hadoop/pig/trunk/src/org/apache/pig/impl/io/FileLocalizer.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/impl/io/FileLocalizer.java?rev=799141&r1=799140&r2=799141&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/impl/io/FileLocalizer.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/impl/io/FileLocalizer.java Thu Jul 30 00:30:25 2009
@@ -28,6 +28,7 @@
 import java.util.ArrayList;
 import java.util.Iterator;
 import java.util.List;
+import java.util.Map;
 import java.util.Random;
 import java.util.Stack;
 import java.util.Properties ;
@@ -42,6 +43,7 @@
 import org.apache.pig.backend.datastorage.ElementDescriptor;
 import org.apache.pig.backend.hadoop.datastorage.HDataStorage;
 import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigInputFormat;
+import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigMapReduce;
 import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.SliceWrapper;
 import org.apache.pig.impl.PigContext;
 import org.apache.pig.impl.util.WrappedIOException;
@@ -151,11 +153,19 @@
     public static InputStream openDFSFile(String fileName) throws IOException {
         SliceWrapper wrapper = PigInputFormat.getActiveSplit();
 
-        if (wrapper == null)
+        JobConf conf = null;
+        if (wrapper == null) {
+        	conf = PigMapReduce.sJobConf;
+        }else{
+        	conf = wrapper.getJobConf();
+        }
+        
+        if (conf == null) {
             throw new RuntimeException(
                     "can't open DFS file while executing locally");
+        }
         
-        return openDFSFile(fileName, ConfigurationUtil.toProperties(wrapper.getJobConf()));
+        return openDFSFile(fileName, ConfigurationUtil.toProperties(conf));
 
     }
 
@@ -165,6 +175,42 @@
         return openDFSFile(elem);
     }
     
+    public static long getSize(String fileName) throws IOException {
+    	SliceWrapper wrapper = PigInputFormat.getActiveSplit();
+    	
+    	JobConf conf = null;
+    	if (wrapper == null) {
+    		conf = PigMapReduce.sJobConf;
+    	}else{
+    		conf = wrapper.getJobConf();
+    	}
+
+    	if (conf == null) {
+    		throw new RuntimeException(
+                "can't open DFS file while executing locally");
+    	}
+
+        return getSize(fileName, ConfigurationUtil.toProperties(conf));
+    }
+    
+    public static long getSize(String fileName, Properties properties) throws IOException {
+    	DataStorage dds = new HDataStorage(properties);
+        ElementDescriptor elem = dds.asElement(fileName);
+       
+        // recursively get all the files under this path
+        ElementDescriptor[] allElems = getFileElementDescriptors(elem);
+        
+        long size = 0;
+        
+        // add up the sizes of all files found
+        for (int i=0; i<allElems.length; i++) {
+        	Map<String, Object> stats = allElems[i].getStatistics();
+        	size += (Long) (stats.get(ElementDescriptor.LENGTH_KEY));
+        }
+        
+        return size;
+    }
+    
     private static InputStream openDFSFile(ElementDescriptor elem) throws IOException{
         ElementDescriptor[] elements = null;
         

Added: hadoop/pig/trunk/src/org/apache/pig/impl/io/NullablePartitionWritable.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/impl/io/NullablePartitionWritable.java?rev=799141&view=auto
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/impl/io/NullablePartitionWritable.java (added)
+++ hadoop/pig/trunk/src/org/apache/pig/impl/io/NullablePartitionWritable.java Thu Jul 30 00:30:25 2009
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.impl.io;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+/**
+ * NullablePartitionWritable is an adaptor class around PigNullableWritable that adds a partition
+ * index to the class.
+ */
+public class NullablePartitionWritable extends PigNullableWritable{
+	private byte partitionIndex;
+	private PigNullableWritable key;
+
+	public NullablePartitionWritable() {
+		
+	}
+	
+	public NullablePartitionWritable(PigNullableWritable k) {
+		setKey(k);
+	}
+
+	public void setKey(PigNullableWritable k) {
+		key = k;
+	}
+
+	public PigNullableWritable getKey() {
+		return key;
+	}
+
+	public void setPartition(byte n) {
+		partitionIndex = n;
+	}
+
+	public byte getPartition() {
+		return partitionIndex;
+	}
+
+  	public int compareTo(Object o) {  		
+		return key.compareTo(((NullablePartitionWritable)o).getKey());
+	}
+	
+	public void readFields(DataInput in) throws IOException {
+		String c = in.readUTF();
+		try{
+			key = (PigNullableWritable)Class.forName(c).newInstance();
+		}catch(Exception e) {
+			throw new IOException(e);
+		}
+		key.readFields(in);
+	}
+
+	public void write(DataOutput out) throws IOException {
+		out.writeUTF(key.getClass().getName());
+		key.write(out);
+	}
+
+	public boolean isNull() {
+		return key.isNull();
+	}
+
+	public void setNull(boolean isNull) {
+		key.setNull(isNull);
+	}
+
+	public byte getIndex() {
+		return key.getIndex();
+	}
+
+	public void setIndex(byte index) {
+		key.setIndex(index);
+	}
+
+	public Object getValueAsPigType() {
+		return key.getValueAsPigType();
+	}
+
+	public int hashCode() {
+		return key.hashCode();
+	}
+	
+	public String toString() {
+		return "Partition: " + partitionIndex + " " + key.toString();
+	}
+}

Modified: hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/DotLOPrinter.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/DotLOPrinter.java?rev=799141&r1=799140&r2=799141&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/DotLOPrinter.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/DotLOPrinter.java Thu Jul 30 00:30:25 2009
@@ -90,6 +90,9 @@
         else if(op instanceof LOFRJoin){
             return ((LOFRJoin)op).getJoinColPlans();
         }
+        else if(op instanceof LOJoin){
+            return ((LOJoin)op).getJoinPlans();
+        }
         return new MultiMap<LogicalOperator, LogicalPlan>();
     }
 

Added: hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/LOJoin.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/LOJoin.java?rev=799141&view=auto
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/LOJoin.java (added)
+++ hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/LOJoin.java Thu Jul 30 00:30:25 2009
@@ -0,0 +1,499 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.impl.logicalLayer;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.HashSet;
+import java.util.HashMap;
+import java.util.Hashtable;
+import java.util.Iterator;
+import java.util.Map.Entry;
+
+import org.apache.pig.PigException;
+import org.apache.pig.data.DataType;
+import org.apache.pig.impl.plan.Operator;
+import org.apache.pig.impl.plan.OperatorKey;
+import org.apache.pig.impl.plan.PlanException;
+import org.apache.pig.impl.plan.VisitorException;
+import org.apache.pig.impl.logicalLayer.parser.ParseException;
+import org.apache.pig.impl.logicalLayer.schema.Schema;
+import org.apache.pig.impl.logicalLayer.schema.Schema.FieldSchema;
+import org.apache.pig.impl.logicalLayer.optimizer.SchemaRemover;
+import org.apache.pig.impl.logicalLayer.schema.SchemaMergeException;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.pig.impl.util.MultiMap;
+import org.apache.pig.impl.util.Pair;
+import org.apache.pig.impl.plan.RequiredFields;
+import org.apache.pig.impl.plan.ProjectionMap;
+
+public class LOJoin extends LogicalOperator {
+    private static final long serialVersionUID = 2L;
+
+	/**
+	 * Enum for the type of join
+	 */
+	public static enum JOINTYPE {
+								REGULAR, /// Regular join
+								REPLICATED, /// Fragment Replicated join
+								SKEWED /// Skewed Join
+							  };
+
+    /**
+     * LOJoin contains a list of logical operators corresponding to the
+     * relational operators and a list of generates for each relational
+     * operator. Each generate operator in turn contains a list of expressions
+     * for the columns that are projected
+     */
+    private static Log log = LogFactory.getLog(LOJoin.class);
+    private MultiMap<LogicalOperator, LogicalPlan> mJoinPlans;
+
+	private JOINTYPE mJoinType;	// Retains the type of the join
+
+    /**
+     * 
+     * @param plan
+     *            LogicalPlan this operator is a part of.
+     * @param k
+     *            OperatorKey for this operator
+     * @param joinPlans
+     *            the join columns
+     * @param jt 
+     *            indicates the type of join - regular, skewed or fragment replicated
+     */
+    public LOJoin(
+            LogicalPlan plan,
+            OperatorKey k,
+            MultiMap<LogicalOperator, LogicalPlan> joinPlans,
+            JOINTYPE jt) {
+        super(plan, k);
+        mJoinPlans = joinPlans;
+		mJoinType = jt;
+    }
+
+    public List<LogicalOperator> getInputs() {
+        return mPlan.getPredecessors(this);
+    }
+
+    public MultiMap<LogicalOperator, LogicalPlan> getJoinPlans() {
+        return mJoinPlans;
+    }    
+
+    public void setJoinPlans(MultiMap<LogicalOperator, LogicalPlan> joinPlans) {
+System.out.println("#@ resetting join plans");
+        mJoinPlans = joinPlans;
+    }    
+
+	/**
+     * Returns the type of join.
+     */
+	public JOINTYPE getJoinType() {
+		return mJoinType;
+	}
+
+    @Override
+    public String name() {
+        return "LOJoin " + mKey.scope + "-" + mKey.id;
+    }
+
+    @Override
+    public boolean supportsMultipleInputs() {
+        return true;
+    }
+
+    @Override
+    public Schema getSchema() throws FrontendException {
+        List<LogicalOperator> inputs = mPlan.getPredecessors(this);
+        mType = DataType.BAG;//mType is from the super class
+        Hashtable<String, Integer> nonDuplicates = new Hashtable<String, Integer>();
+        if(!mIsSchemaComputed){
+            List<Schema.FieldSchema> fss = new ArrayList<Schema.FieldSchema>();
+            int i=-1;
+            for (LogicalOperator op : inputs) {
+                try {
+                    Schema cSchema = op.getSchema();
+                    if(cSchema!=null){
+                        
+                        for (FieldSchema schema : cSchema.getFields()) {
+                            ++i;
+                            if(nonDuplicates.containsKey(schema.alias))
+                                {
+                                    if(nonDuplicates.get(schema.alias)!=-1) {
+                                        nonDuplicates.remove(schema.alias);
+                                        nonDuplicates.put(schema.alias, -1);
+                                    }
+                                }
+                            else
+                                nonDuplicates.put(schema.alias, i);
+                            FieldSchema newFS = new FieldSchema(op.getAlias()+"::"+schema.alias,schema.schema,schema.type);
+                            newFS.setParent(schema.canonicalName, op);
+                            fss.add(newFS);
+                        }
+                    }
+                    else
+                        fss.add(new FieldSchema(null,DataType.BYTEARRAY));
+                } catch (FrontendException ioe) {
+                    mIsSchemaComputed = false;
+                    mSchema = null;
+                    throw ioe;
+                }
+            }
+            mIsSchemaComputed = true;
+            for (Entry<String, Integer> ent : nonDuplicates.entrySet()) {
+                int ind = ent.getValue();
+                if(ind==-1) continue;
+                FieldSchema prevSch = fss.get(ind);
+                fss.set(ind, new FieldSchema(ent.getKey(),prevSch.schema,prevSch.type));
+            }
+            mSchema = new Schema(fss);
+        }
+        return mSchema;
+    }
+
+    public boolean isTupleJoinCol() {
+        List<LogicalOperator> inputs = mPlan.getPredecessors(this);
+        if (inputs == null || inputs.size() == 0) {
+            throw new AssertionError("LOJoin.isTupleJoinCol() can only becalled "
+                                     + "after it has an input ") ;
+        }
+        return mJoinPlans.get(inputs.get(0)).size() > 1 ;
+    }
+
+    @Override
+    public void visit(LOVisitor v) throws VisitorException {
+        v.visit(this);
+    }
+
+    /***
+     *
+     * This does switch the mapping
+     *
+     * oldOp -> List of inner plans
+     *         to
+     * newOp -> List of inner plans
+     *
+     * which is useful when there is a structural change in LogicalPlan
+     *
+     * @param oldOp the old operator
+     * @param newOp the new operator
+     */
+    public void switchJoinColPlanOp(LogicalOperator oldOp,
+                                    LogicalOperator newOp) {
+        Collection<LogicalPlan> innerPlans = mJoinPlans.removeKey(oldOp) ;
+        mJoinPlans.put(newOp, innerPlans);
+    }
+
+    public void unsetSchema() throws VisitorException{
+        for(LogicalOperator input: getInputs()) {
+            Collection<LogicalPlan> joinPlans = mJoinPlans.get(input);
+            if(joinPlans!=null)
+                for(LogicalPlan plan : joinPlans) {
+                    SchemaRemover sr = new SchemaRemover(plan);
+                    sr.visit();
+                }
+        }
+        super.unsetSchema();
+    }
+
+    /**
+     * This can be used to get the merged type of output join col
+     * only when the join col is of atomic type
+     * @return The type of the join col 
+     */
+    public byte getAtomicJoinColType() throws FrontendException {
+        if (isTupleJoinCol()) {
+            int errCode = 1010;
+            String msg = "getAtomicJoinColType is used only when"
+                + " dealing with atomic group col";
+            throw new FrontendException(msg, errCode, PigException.INPUT, false, null) ;
+        }
+
+        byte groupType = DataType.BYTEARRAY ;
+        // merge all the inner plan outputs so we know what type
+        // our group column should be
+        for(int i=0;i < getInputs().size(); i++) {
+            LogicalOperator input = getInputs().get(i) ;
+            List<LogicalPlan> innerPlans
+                        = new ArrayList<LogicalPlan>(getJoinPlans().get(input)) ;
+            if (innerPlans.size() != 1) {
+                int errCode = 1012;
+                String msg = "Each COGroup input has to have "
+                + "the same number of inner plans";
+                throw new FrontendException(msg, errCode, PigException.INPUT, false, null) ;
+            }
+            byte innerType = innerPlans.get(0).getSingleLeafPlanOutputType() ;
+            groupType = DataType.mergeType(groupType, innerType) ;
+        }
+
+        return groupType ;
+    }
+
+    /*
+        This implementation is based on the assumption that all the
+        inputs have the same join col tuple arity.
+     */
+    public Schema getTupleJoinSchema() throws FrontendException {
+        if (!isTupleJoinCol()) {
+            int errCode = 1011;
+            String msg = "getTupleJoinSchema is used only when"
+                + " dealing with tuple join col";
+            throw new FrontendException(msg, errCode, PigException.INPUT, false, null) ;
+        }
+
+        // this fsList represents all the columns in group tuple
+        List<Schema.FieldSchema> fsList = new ArrayList<Schema.FieldSchema>() ;
+
+        int outputSchemaSize = getJoinPlans().get(getInputs().get(0)).size() ;
+
+        // by default, they are all bytearray
+        // for type checking, we don't care about aliases
+        for(int i=0; i<outputSchemaSize; i++) {
+            fsList.add(new Schema.FieldSchema(null, DataType.BYTEARRAY)) ;
+        }
+
+        // merge all the inner plan outputs so we know what type
+        // our group column should be
+        for(int i=0;i < getInputs().size(); i++) {
+            LogicalOperator input = getInputs().get(i) ;
+            List<LogicalPlan> innerPlans
+                        = new ArrayList<LogicalPlan>(getJoinPlans().get(input)) ;
+
+            boolean seenProjectStar = false;
+            for(int j=0;j < innerPlans.size(); j++) {
+                byte innerType = innerPlans.get(j).getSingleLeafPlanOutputType() ;
+                ExpressionOperator eOp = (ExpressionOperator)innerPlans.get(j).getSingleLeafPlanOutputOp();
+
+                if(eOp instanceof LOProject) {
+                    if(((LOProject)eOp).isStar()) {
+                        seenProjectStar = true;
+                    }
+                }
+                        
+                Schema.FieldSchema groupFs = fsList.get(j);
+                groupFs.type = DataType.mergeType(groupFs.type, innerType) ;
+                Schema.FieldSchema fs = eOp.getFieldSchema();
+                if(null != fs) {
+                    groupFs.setParent(eOp.getFieldSchema().canonicalName, eOp);
+                } else {
+                    groupFs.setParent(null, eOp);
+                }
+            }
+
+            if(seenProjectStar) {
+                int errCode = 1013;
+                String msg = "Grouping attributes can either be star (*) or a list of expressions, but not both.";
+                throw new FrontendException(msg, errCode, PigException.INPUT, false, null);                
+            }
+
+        }
+
+        return new Schema(fsList) ;
+    }
+
+    /**
+     * @see org.apache.pig.impl.logicalLayer.LogicalOperator#clone()
+     * Do not use the clone method directly. Operators are cloned when logical plans
+     * are cloned using {@link LogicalPlanCloner}
+     */
+    @Override
+    protected Object clone() throws CloneNotSupportedException {
+        
+        // first start with LogicalOperator clone
+        LOJoin  joinClone = (LOJoin)super.clone();
+        
+        // create deep copy of other cogroup specific members
+        joinClone.mJoinPlans = new MultiMap<LogicalOperator, LogicalPlan>();
+        for (Iterator<LogicalOperator> it = mJoinPlans.keySet().iterator(); it.hasNext();) {
+            LogicalOperator relOp = it.next();
+            Collection<LogicalPlan> values = mJoinPlans.get(relOp);
+            for (Iterator<LogicalPlan> planIterator = values.iterator(); planIterator.hasNext();) {
+                LogicalPlanCloneHelper lpCloneHelper = new LogicalPlanCloneHelper(planIterator.next());
+                joinClone.mJoinPlans.put(relOp, lpCloneHelper.getClonedPlan());
+            }
+        }
+        
+        return joinClone;
+    }
+
+    @Override
+    public ProjectionMap getProjectionMap() {
+        
+        if(mIsProjectionMapComputed) return mProjectionMap;
+        mIsProjectionMapComputed = true;
+        
+        Schema outputSchema;
+        
+        try {
+            outputSchema = getSchema();
+        } catch (FrontendException fee) {
+            mProjectionMap = null;
+            return mProjectionMap;
+        }
+        
+        if(outputSchema == null) {
+            mProjectionMap = null;
+            return mProjectionMap;
+        }
+        
+        List<LogicalOperator> predecessors = (ArrayList<LogicalOperator>)mPlan.getPredecessors(this);
+        if(predecessors == null) {
+            mProjectionMap = null;
+            return mProjectionMap;
+        }
+        
+        MultiMap<Integer, ProjectionMap.Column> mapFields = new MultiMap<Integer, ProjectionMap.Column>();
+        List<Integer> addedFields = new ArrayList<Integer>();
+        boolean[] unknownSchema = new boolean[predecessors.size()];
+        boolean anyUnknownInputSchema = false;
+        int outputColumnNum = 0;
+        
+        for(int inputNum = 0; inputNum < predecessors.size(); ++inputNum) {
+            LogicalOperator predecessor = predecessors.get(inputNum);
+            Schema inputSchema = null;        
+            
+            try {
+                inputSchema = predecessor.getSchema();
+            } catch (FrontendException fee) {
+                mProjectionMap = null;
+                return mProjectionMap;
+            }
+            
+            if(inputSchema == null) {
+                unknownSchema[inputNum] = true;
+                outputColumnNum++;
+                addedFields.add(inputNum);
+                anyUnknownInputSchema = true;
+            } else {
+                unknownSchema[inputNum] = false;
+                for(int inputColumn = 0; inputColumn < inputSchema.size(); ++inputColumn) {
+                    mapFields.put(outputColumnNum++, 
+                            new ProjectionMap.Column(new Pair<Integer, Integer>(inputNum, inputColumn)));
+                }
+            }
+        }
+        
+        //TODO
+        /*
+         * For now, if there is any input that has an unknown schema
+         * flag it and return a null ProjectionMap.
+         * In the future, when unknown schemas are handled
+         * mark inputs that have unknown schemas as output columns
+         * that have been added.
+         */
+
+        if(anyUnknownInputSchema) {
+            mProjectionMap = null;
+            return mProjectionMap;
+        }
+        
+        if(addedFields.size() == 0) {
+            addedFields = null;
+        }
+
+        mProjectionMap = new ProjectionMap(mapFields, null, addedFields);
+        return mProjectionMap;
+    }
+
+    @Override
+    public List<RequiredFields> getRequiredFields() {        
+        List<LogicalOperator> predecessors = mPlan.getPredecessors(this);
+        
+        if(predecessors == null) {
+            return null;
+        }
+        
+        List<RequiredFields> requiredFields = new ArrayList<RequiredFields>();
+        
+        for(int inputNum = 0; inputNum < predecessors.size(); ++inputNum) {
+            Set<Pair<Integer, Integer>> fields = new HashSet<Pair<Integer, Integer>>();
+            Set<LOProject> projectSet = new HashSet<LOProject>();
+            boolean groupByStar = false;
+
+            for (LogicalPlan plan : this.getJoinPlans().get(predecessors.get(inputNum))) {
+                TopLevelProjectFinder projectFinder = new TopLevelProjectFinder(plan);
+                try {
+                    projectFinder.visit();
+                } catch (VisitorException ve) {
+                    requiredFields.clear();
+                    requiredFields.add(null);
+                    return requiredFields;
+                }
+                projectSet.addAll(projectFinder.getProjectSet());
+                if(projectFinder.getProjectStarSet() != null) {
+                    groupByStar = true;
+                }
+            }
+
+            if(groupByStar) {
+                requiredFields.add(new RequiredFields(true));
+            } else {                
+                for (LOProject project : projectSet) {
+                    for (int inputColumn : project.getProjection()) {
+                        fields.add(new Pair<Integer, Integer>(inputNum, inputColumn));
+                    }
+                }
+        
+                if(fields.size() == 0) {
+                    requiredFields.add(new RequiredFields(false, true));
+                } else {                
+                    requiredFields.add(new RequiredFields(new ArrayList<Pair<Integer, Integer>>(fields)));
+                }
+            }
+        }
+        
+        return (requiredFields.size() == 0? null: requiredFields);
+    }
+
+    /* (non-Javadoc)
+     * @see org.apache.pig.impl.plan.Operator#rewire(org.apache.pig.impl.plan.Operator, org.apache.pig.impl.plan.Operator)
+     */
+    @Override
+    @SuppressWarnings("unchecked")
+    public void rewire(Operator oldPred, int oldPredIndex, Operator newPred, boolean useOldPred) throws PlanException {
+        super.rewire(oldPred, oldPredIndex, newPred, useOldPred);
+        LogicalOperator previous = (LogicalOperator) oldPred;
+        LogicalOperator current = (LogicalOperator) newPred;
+        Set<LogicalOperator> joinInputs = new HashSet<LogicalOperator>(mJoinPlans.keySet()); 
+        for(LogicalOperator input: joinInputs) {
+            if(input.equals(previous)) {
+                //replace the references to the key(i.e., previous) in the values with current
+                for(LogicalPlan plan: mJoinPlans.get(input)) {
+                    try {
+                        ProjectFixerUpper projectFixer = new ProjectFixerUpper(
+                                plan, previous, oldPredIndex, current, useOldPred, this);
+                        projectFixer.visit();
+                    } catch (VisitorException ve) {
+                        int errCode = 2144;
+                        String msg = "Problem while fixing project inputs during rewiring.";
+                        throw new PlanException(msg, errCode, PigException.BUG, ve);
+                    }
+                }
+                //remove the key and the values
+                List<LogicalPlan> plans = (List<LogicalPlan>)mJoinPlans.get(previous);
+                mJoinPlans.removeKey(previous);
+                
+                //reinsert new key and values
+                mJoinPlans.put(current, plans);
+            }
+        }
+    }
+}

Modified: hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/LOPrinter.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/LOPrinter.java?rev=799141&r1=799140&r2=799141&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/LOPrinter.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/LOPrinter.java Thu Jul 30 00:30:25 2009
@@ -161,6 +161,15 @@
                     }
                 }
             }
+            else if(node instanceof LOJoin){
+                MultiMap<LogicalOperator, LogicalPlan> plans = ((LOJoin)node).getJoinPlans();
+                for (LogicalOperator lo : plans.keySet()) {
+                    // Visit the associated plans
+                    for (LogicalPlan plan : plans.get(lo)) {
+                        sb.append(planString(plan));
+                    }
+                }
+            }
             else if(node instanceof LOSort){
                 sb.append(planString(((LOSort)node).getSortColPlans())); 
             }

Modified: hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/LOVisitor.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/LOVisitor.java?rev=799141&r1=799140&r2=799141&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/LOVisitor.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/LOVisitor.java Thu Jul 30 00:30:25 2009
@@ -22,6 +22,7 @@
 import java.util.Set;
 import java.util.Map;
 import java.util.ArrayList;
+import java.util.Collection;
 
 import org.apache.pig.impl.plan.PlanVisitor;
 import org.apache.pig.impl.plan.PlanWalker;
@@ -154,6 +155,32 @@
         }
     }
 
+
+    /**
+     * 
+     * @param loj 
+     *            the logical join operator that has to be visited
+     * @throws VisitorException
+     */
+	@SuppressWarnings("unchecked")
+    protected void visit(LOJoin loj) throws VisitorException {
+        // Visit each of the inputs of cogroup.
+        MultiMap<LogicalOperator, LogicalPlan> mapJoinPlans = loj.getJoinPlans();
+        for(LogicalOperator op: loj.getInputs()) {
+            for(LogicalPlan lp: mapJoinPlans.get(op)) {
+                if (null != lp) {
+                    // TODO FIX - How do we know this should be a
+                    // DependencyOrderWalker?  We should be replicating the
+                    // walker the current visitor is using.
+                    PlanWalker w = new DependencyOrderWalker(lp);
+                    pushWalker(w);
+                    w.walk(this);
+                    popWalker();
+                }
+            }
+        }
+    }
+
     /**
      * 
      * @param forEach

Modified: hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/PlanSetter.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/PlanSetter.java?rev=799141&r1=799140&r2=799141&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/PlanSetter.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/PlanSetter.java Thu Jul 30 00:30:25 2009
@@ -63,6 +63,11 @@
         super.visit(op);
     }
 
+    public void visit(LOJoin op) throws VisitorException {
+        op.setPlan(mCurrentWalker.getPlan());
+        super.visit(op);
+    }
+
     public void visit(LOConst op) throws VisitorException {
         op.setPlan(mCurrentWalker.getPlan());
     }

Modified: hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/ProjectStarTranslator.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/ProjectStarTranslator.java?rev=799141&r1=799140&r2=799141&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/ProjectStarTranslator.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/ProjectStarTranslator.java Thu Jul 30 00:30:25 2009
@@ -101,6 +101,32 @@
         }
     }
 
+    /* (non-Javadoc)
+     * @see org.apache.pig.impl.logicalLayer.LOVisitor#visit(org.apache.pig.impl.logicalLayer.LOJoin)
+     */
+    @Override
+    protected void visit(LOJoin frj) throws VisitorException {
+        //get the attributes of LOJoin that are modified during the translation
+        
+        MultiMap<LogicalOperator, LogicalPlan> joinColPlans = frj.getJoinPlans();
+
+        for(LogicalOperator op: frj.getInputs()) {
+            ArrayList<LogicalPlan> newPlansAfterTranslation = new ArrayList<LogicalPlan>();
+            for(LogicalPlan lp: joinColPlans.get(op)) {
+                if (checkPlanForProjectStar(lp)) {
+                    ArrayList<LogicalPlan> translatedPlans = translateProjectStarInPlan(lp);
+                    for(int j = 0; j < translatedPlans.size(); ++j) {
+                        newPlansAfterTranslation.add(translatedPlans.get(j));
+                    }
+                } else {
+                    newPlansAfterTranslation.add(lp);
+                }
+            }
+            joinColPlans.removeKey(op);
+            joinColPlans.put(op, newPlansAfterTranslation);
+        }
+    }
+
     /**
      * 
      * @param forEach

Modified: hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/optimizer/LogicalTransformer.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/optimizer/LogicalTransformer.java?rev=799141&r1=799140&r2=799141&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/optimizer/LogicalTransformer.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/optimizer/LogicalTransformer.java Thu Jul 30 00:30:25 2009
@@ -31,6 +31,7 @@
 import org.apache.pig.impl.plan.optimizer.Transformer;
 import org.apache.pig.impl.logicalLayer.FrontendException;
 import org.apache.pig.impl.logicalLayer.LOFRJoin;
+import org.apache.pig.impl.logicalLayer.LOJoin;
 import org.apache.pig.impl.logicalLayer.LogicalOperator;
 import org.apache.pig.impl.logicalLayer.LogicalPlan;
 import org.apache.pig.impl.logicalLayer.LOCogroup;
@@ -135,6 +136,10 @@
             LOFRJoin frj = (LOFRJoin) before ;
             frj.switchJoinColPlanOp(after, newNode);
         }
+        if (before instanceof LOJoin) {
+            LOJoin frj = (LOJoin) before ;
+            frj.switchJoinColPlanOp(after, newNode);
+        }
 
         // Visit all the inner plans of before and change their projects to
         // connect to newNode instead of after.
@@ -144,6 +149,8 @@
             plans.addAll((((LOCogroup)before).getGroupByPlans()).values());
         } else if (before instanceof LOFRJoin) {
             plans.addAll((((LOFRJoin)before).getJoinColPlans()).values());
+        } else if (before instanceof LOJoin) {
+            plans.addAll((((LOJoin)before).getJoinPlans()).values());
         }
         else if (before instanceof LOSort) {
             plans.addAll(((LOSort)before).getSortColPlans());

Modified: hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/optimizer/OpLimitOptimizer.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/optimizer/OpLimitOptimizer.java?rev=799141&r1=799140&r2=799141&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/optimizer/OpLimitOptimizer.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/optimizer/OpLimitOptimizer.java Thu Jul 30 00:30:25 2009
@@ -37,6 +37,7 @@
 import org.apache.pig.impl.logicalLayer.LOSplitOutput;
 import org.apache.pig.impl.logicalLayer.LOUnion;
 import org.apache.pig.impl.logicalLayer.LOFRJoin;
+import org.apache.pig.impl.logicalLayer.LOJoin;
 import org.apache.pig.impl.logicalLayer.LogicalOperator;
 import org.apache.pig.impl.logicalLayer.LogicalPlan;
 import org.apache.pig.impl.plan.DepthFirstWalker;
@@ -137,7 +138,7 @@
             // Limit cannot be pushed up
             if (predecessor instanceof LOCogroup || predecessor instanceof LOFilter ||
             		predecessor instanceof LOLoad || predecessor instanceof LOSplit ||
-            		predecessor instanceof LOSplitOutput || predecessor instanceof LODistinct || predecessor instanceof LOFRJoin)
+            		predecessor instanceof LOSplitOutput || predecessor instanceof LODistinct || predecessor instanceof LOFRJoin || predecessor instanceof LOJoin)
             {
             	return;
             }

Modified: hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/optimizer/SchemaRemover.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/optimizer/SchemaRemover.java?rev=799141&r1=799140&r2=799141&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/optimizer/SchemaRemover.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/optimizer/SchemaRemover.java Thu Jul 30 00:30:25 2009
@@ -211,4 +211,8 @@
         super.visit(frj);
     }
     
+    protected void visit(LOJoin frj) throws VisitorException {
+        frj.unsetSchema();
+        super.visit(frj);
+    }
 }

Modified: hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/parser/QueryParser.jjt
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/parser/QueryParser.jjt?rev=799141&r1=799140&r2=799141&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/parser/QueryParser.jjt (original)
+++ hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/parser/QueryParser.jjt Thu Jul 30 00:30:25 2009
@@ -299,6 +299,63 @@
 	}
 	
 	/**
+	 * Join parser. Currently can only handle skewed joins. 
+	 */
+	LogicalOperator parseJoin(ArrayList<CogroupInput> gis, LogicalPlan lp, LOJoin.JOINTYPE jt) throws ParseException, PlanException{
+		log.trace("Entering parseJoin");
+		// Skewed Join behaves as regular join in local mode
+		if (pigContext.getExecType() == ExecType.LOCAL && jt == LOJoin.JOINTYPE.SKEWED) {
+			return rewriteJoin(gis,lp);
+		} 
+		
+		int n = gis.size();
+
+		if (jt == LOJoin.JOINTYPE.SKEWED && n != 2) {
+			throw new ParseException("Skewed join can only be applied for 2-way joins");
+		}
+
+		ArrayList<LogicalOperator> los = new ArrayList<LogicalOperator>();
+		ArrayList<ArrayList<LogicalPlan>> plans = new ArrayList<ArrayList<LogicalPlan>>();
+		MultiMap<LogicalOperator, LogicalPlan> joinPlans = new MultiMap<LogicalOperator, LogicalPlan>();
+		boolean[] isInner = new boolean[n];
+		
+		int arity = gis.get(0).plans.size();
+		
+		for (int i = 0; i < n ; i++){
+			
+			CogroupInput gi = gis.get(i);
+			los.add(gi.op);
+			ArrayList<LogicalPlan> planList = gi.plans;
+			plans.add(gi.plans);
+			int numJoinOps = planList.size();
+			log.debug("Number of join operators = " + numJoinOps);
+
+			if(arity != numJoinOps) {
+				throw new ParseException("The arity of the join columns do not match.");
+			}
+			for(int j = 0; j < numJoinOps; ++j) {
+			    joinPlans.put(gi.op, planList.get(j));
+				for(LogicalOperator root: planList.get(j).getRoots()) {
+					log.debug("Join input plan root: " + root);
+				}
+			}
+			isInner[i] = gi.isInner;
+		}
+
+		LogicalOperator loj  = new LOJoin(lp, new OperatorKey(scope, getNextId()), joinPlans, jt);
+		lp.add(loj);
+		log.debug("Added operator " + loj.getClass().getName() + " object " + loj + " to the logical plan " + lp);
+		
+		for(LogicalOperator op: los) {
+			lp.connect(op, loj);
+			log.debug("Connected operator " + op.getClass().getName() + " to " + loj.getClass().getName() + " in the logical plan");
+		}
+
+		log.trace("Exiting parseJoin");
+		return loj;
+	}
+
+	/**
 	 * Mimicing parseCogroup as the parsing logic for FRJoin remains exactly the same.
 	 */
 	LogicalOperator parseFRJoin(ArrayList<CogroupInput> gis, LogicalPlan lp) throws ParseException, PlanException{
@@ -1855,13 +1912,25 @@
 	log.trace("Entering JoinClause");
 	log.debug("LogicalPlan: " + lp);
 	LogicalOperator frj = null;
+	LogicalOperator skj = null;
 }
 {
 	(gi = GroupItem(lp) { gis.add(gi); }
 	("," gi = GroupItem(lp) { gis.add(gi); })+
 	// The addition of using replicated to indicate FRJoin
-	[<USING> ("\"replicated\"" | "\"repl\"") { frj = parseFRJoin(gis, lp); }] )
-	{log.trace("Exiting JoinClause"); return (frj==null) ? rewriteJoin(gis, lp) : frj;}
+	([<USING> ("\"replicated\"" { frj = parseFRJoin(gis, lp); } | "\"repl\"" { frj=parseFRJoin(gis,lp);}
+    |"\"skewed\"" { skj = parseJoin(gis, lp, LOJoin.JOINTYPE.SKEWED); })] ))
+
+	{log.trace("Exiting JoinClause");
+	if (frj!=null) {
+		return frj;
+	}
+	else if (skj!=null) {
+		return skj;
+	}
+	else {
+		return rewriteJoin(gis,lp);
+	}}
 	
 }
 

Modified: hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/validators/TypeCheckingVisitor.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/validators/TypeCheckingVisitor.java?rev=799141&r1=799140&r2=799141&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/validators/TypeCheckingVisitor.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/validators/TypeCheckingVisitor.java Thu Jul 30 00:30:25 2009
@@ -2245,6 +2245,131 @@
         }
     }
 
+	/**
+     * LOJoin visitor
+	 */
+    protected void visit(LOJoin frj) throws VisitorException {
+        try {
+            frj.regenerateSchema();
+        } catch (FrontendException fe) {
+            int errCode = 1060;
+            String msg = "Cannot resolve Join output schema" ;
+            msgCollector.collect(msg, MessageType.Error) ;
+            throw new TypeCheckerException(msg, errCode, PigException.INPUT, fe) ;
+        }
+        
+        MultiMap<LogicalOperator, LogicalPlan> joinColPlans
+                                                    = frj.getJoinPlans() ;
+        List<LogicalOperator> inputs = frj.getInputs() ;
+        
+        // Type checking internal plans.
+        for(int i=0;i < inputs.size(); i++) {
+            LogicalOperator input = inputs.get(i) ;
+            List<LogicalPlan> innerPlans
+                        = new ArrayList<LogicalPlan>(joinColPlans.get(input)) ;
+
+            for(int j=0; j < innerPlans.size(); j++) {
+
+                LogicalPlan innerPlan = innerPlans.get(j) ;
+                
+                // Check that the inner plan has only 1 output port
+                if (!innerPlan.isSingleLeafPlan()) {
+                    int errCode = 1057;
+                    String msg = "Join's inner plans can only"
+                                 + " have one output (leaf)" ;
+                    msgCollector.collect(msg, MessageType.Error) ;
+                    throw new TypeCheckerException(msg, errCode, PigException.INPUT) ;
+                }
+
+                checkInnerPlan(innerPlans.get(j)) ;
+            }
+        }
+        
+        try {
+
+            if (!frj.isTupleJoinCol()) {
+                // merge all the inner plan outputs so we know what type
+                // our group column should be
+
+                // TODO: Don't recompute schema here
+                //byte groupType = schema.getField(0).type ;
+                byte groupType = frj.getAtomicJoinColType() ;
+
+                // go through all inputs again to add cast if necessary
+                for(int i=0;i < inputs.size(); i++) {
+                    LogicalOperator input = inputs.get(i) ;
+                    List<LogicalPlan> innerPlans
+                                = new ArrayList<LogicalPlan>(joinColPlans.get(input)) ;
+                    // Checking innerPlan size already done above
+                    byte innerType = innerPlans.get(0).getSingleLeafPlanOutputType() ;
+                    if (innerType != groupType) {
+                        insertAtomicCastForJoinInnerPlan(innerPlans.get(0),
+                                                            frj,
+                                                            groupType) ;
+                    }
+                }
+            }
+            else {
+
+                // TODO: Don't recompute schema here
+                //Schema groupBySchema = schema.getField(0).schema ;
+                Schema groupBySchema = frj.getTupleJoinSchema() ;
+
+                // go through all inputs again to add cast if necessary
+                for(int i=0;i < inputs.size(); i++) {
+                    LogicalOperator input = inputs.get(i) ;
+                    List<LogicalPlan> innerPlans
+                                = new ArrayList<LogicalPlan>(joinColPlans.get(input)) ;
+                    for(int j=0;j < innerPlans.size(); j++) {
+                        LogicalPlan innerPlan = innerPlans.get(j) ;
+                        byte innerType = innerPlan.getSingleLeafPlanOutputType() ;
+                        byte expectedType = DataType.BYTEARRAY ;
+
+                        if (!DataType.isAtomic(innerType) && (DataType.TUPLE != innerType)) {
+                            int errCode = 1057;
+                            String msg = "Join's inner plans can only"
+                                         + "have one output (leaf)" ;
+                            msgCollector.collect(msg, MessageType.Error) ;
+                            throw new TypeCheckerException(msg, errCode, PigException.INPUT) ;
+                        }
+
+                        try {
+                            expectedType = groupBySchema.getField(j).type ;
+                        }
+                        catch(FrontendException fee) {
+                            int errCode = 1060;
+                            String msg = "Cannot resolve Join output schema" ;
+                            msgCollector.collect(msg, MessageType.Error) ;
+                            throw new TypeCheckerException(msg, errCode, PigException.INPUT, fee) ;
+                        }
+
+                        if (innerType != expectedType) {
+                            insertAtomicCastForJoinInnerPlan(innerPlan,
+                                                                frj,
+                                                                expectedType) ;
+                        }
+                    }
+                }
+            }
+        }
+        catch (FrontendException fe) {
+            int errCode = 1060;
+            String msg = "Cannot resolve Join output schema" ;
+            msgCollector.collect(msg, MessageType.Error) ;
+            throw new TypeCheckerException(msg, errCode, PigException.INPUT, fe) ;
+        }
+
+        try {
+            Schema outputSchema = frj.regenerateSchema() ;
+        }
+        catch (FrontendException fe) {
+            int errCode = 1060;
+            String msg = "Cannot resolve Join output schema" ;
+            msgCollector.collect(msg, MessageType.Error) ;
+            throw new TypeCheckerException(msg, errCode, PigException.INPUT, fe) ;
+        }
+    }
+
     /**
      * COGroup
      * All group by cols from all inputs have to be of the
@@ -2407,6 +2532,36 @@
         this.visit(cast);
     }
 
+    private void insertAtomicCastForJoinInnerPlan(LogicalPlan innerPlan,
+            LOJoin frj, byte toType) throws VisitorException {
+        if (!DataType.isUsableType(toType)) {
+            int errCode = 1051;
+            String msg = "Cannot cast to "
+                + DataType.findTypeName(toType);
+            throw new TypeCheckerException(msg, errCode, PigException.INPUT);
+        }
+
+        List<LogicalOperator> leaves = innerPlan.getLeaves();
+        if (leaves.size() > 1) {
+            int errCode = 2060;
+            String msg = "Expected one leaf. Found " + leaves.size() + " leaves.";
+            throw new TypeCheckerException(msg, errCode, PigException.BUG);
+        }
+        ExpressionOperator currentOutput = (ExpressionOperator) leaves.get(0);
+        collectCastWarning(frj, currentOutput.getType(), toType);
+        OperatorKey newKey = genNewOperatorKey(currentOutput);
+        LOCast cast = new LOCast(innerPlan, newKey, toType);
+        innerPlan.add(cast);
+        try {
+            innerPlan.connect(currentOutput, cast);
+        } catch (PlanException pe) {
+            int errCode = 2059;
+            String msg = "Problem with inserting cast operator for fragment replicate join in plan.";
+            throw new TypeCheckerException(msg, errCode, PigException.BUG, pe);
+        }
+        this.visit(cast);
+    }
+
     // This helps insert casting to atomic types in COGroup's inner plans
     // as a new leave of the plan
     private void insertAtomicCastForCOGroupInnerPlan(LogicalPlan innerPlan,

Added: hadoop/pig/trunk/test/org/apache/pig/test/TestSkewedJoin.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/test/org/apache/pig/test/TestSkewedJoin.java?rev=799141&view=auto
==============================================================================
--- hadoop/pig/trunk/test/org/apache/pig/test/TestSkewedJoin.java (added)
+++ hadoop/pig/trunk/test/org/apache/pig/test/TestSkewedJoin.java Thu Jul 30 00:30:25 2009
@@ -0,0 +1,180 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.test;
+
+
+import java.io.*;
+import java.util.Iterator;
+
+import junit.framework.Assert;
+import junit.framework.TestCase;
+
+import org.apache.pig.EvalFunc;
+import org.apache.pig.ExecType;
+import org.apache.pig.FuncSpec;
+import org.apache.pig.PigServer;
+import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.data.BagFactory;
+import org.apache.pig.data.DataBag;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.test.utils.TestHelper;
+import org.junit.After;
+import org.junit.Before;
+
+public class TestSkewedJoin extends TestCase{
+    private static final String INPUT_FILE1 = "SkewedJoinInput1.txt";
+    private static final String INPUT_FILE2 = "SkewedJoinInput2.txt";
+    private static final String INPUT_FILE3 = "SkewedJoinInput3.txt";
+    
+    private PigServer pigServer;
+    private MiniCluster cluster = MiniCluster.buildCluster();
+    
+    public TestSkewedJoin() throws ExecException, IOException{
+        pigServer = new PigServer(ExecType.MAPREDUCE, cluster.getProperties());
+        // pigServer = new PigServer(ExecType.LOCAL);
+        pigServer.getPigContext().getProperties().setProperty("pig.skewedjoin.reduce.maxtuple", "5");     
+        pigServer.getPigContext().getProperties().setProperty("pig.skewedjoin.reduce.memusage", "0.1");
+    }
+    
+    @Before
+    public void setUp() throws Exception {
+        createFiles();
+    }
+
+    private void createFiles() throws IOException {
+    	PrintWriter w = new PrintWriter(new FileWriter(INPUT_FILE1));
+    	    	
+    	int k = 0;
+    	for(int j=0; j<12; j++) {   	           	        
+   	        w.println("100\tapple1\taaa" + k);
+    	    k++;
+    	    w.println("200\torange1\tbbb" + k);
+    	    k++;
+    	    w.println("300\tstrawberry\tccc" + k);
+    	    k++;    	        	    
+    	}
+    	
+    	w.close();
+
+    	PrintWriter w2 = new PrintWriter(new FileWriter(INPUT_FILE2));
+    	w2.println("100\tapple1");
+    	w2.println("100\tapple2");
+    	w2.println("100\tapple2");
+    	w2.println("200\torange1");
+    	w2.println("200\torange2");
+    	w2.println("300\tstrawberry");    	
+    	w2.println("400\tpear");
+
+    	w2.close();
+    	
+    	PrintWriter w3 = new PrintWriter(new FileWriter(INPUT_FILE3));
+    	w3.println("100\tapple1");
+    	w3.println("100\tapple2");
+    	w3.println("200\torange1");
+    	w3.println("200\torange2");
+    	w3.println("300\tstrawberry");
+    	w3.println("300\tstrawberry2");
+    	w3.println("400\tpear");
+
+    	w3.close();
+    	
+    	Util.copyFromLocalToCluster(cluster, INPUT_FILE1, INPUT_FILE1);
+    	Util.copyFromLocalToCluster(cluster, INPUT_FILE2, INPUT_FILE2);
+    	Util.copyFromLocalToCluster(cluster, INPUT_FILE3, INPUT_FILE3);
+    }
+    
+    @After
+    public void tearDown() throws Exception {
+    	new File(INPUT_FILE1).delete();
+    	new File(INPUT_FILE2).delete();
+    	new File(INPUT_FILE3).delete();
+    	
+        Util.deleteFile(cluster, INPUT_FILE1);
+        Util.deleteFile(cluster, INPUT_FILE2);
+        Util.deleteFile(cluster, INPUT_FILE3);
+    }
+    
+    
+    public void testSkewedJoinWithGroup() throws IOException{
+        pigServer.registerQuery("A = LOAD '" + INPUT_FILE1 + "' as (id, name, n);");
+        pigServer.registerQuery("B = LOAD '" + INPUT_FILE2 + "' as (id, name);");
+        pigServer.registerQuery("C = GROUP A by id;");
+        pigServer.registerQuery("D = GROUP B by id;");
+        
+        DataBag dbfrj = BagFactory.getInstance().newDefaultBag(), dbshj = BagFactory.getInstance().newDefaultBag();
+        {
+            pigServer.registerQuery("E = join C by group, D by group using \"skewed\" parallel 5;");
+            Iterator<Tuple> iter = pigServer.openIterator("E");
+            
+            while(iter.hasNext()) {
+                dbfrj.add(iter.next());
+            }
+        }
+        {
+            pigServer.registerQuery("E = join C by group, D by group;");
+            Iterator<Tuple> iter = pigServer.openIterator("E");
+            
+            while(iter.hasNext()) {
+                dbshj.add(iter.next());
+            }
+        }
+        Assert.assertTrue(dbfrj.size()>0 && dbshj.size()>0);
+        Assert.assertEquals(true, TestHelper.compareBags(dbfrj, dbshj));
+    }      
+    
+    public void testSkewedJoinReducers() throws IOException{
+        pigServer.registerQuery("A = LOAD '" + INPUT_FILE1 + "' as (id, name, n);");
+        pigServer.registerQuery("B = LOAD '" + INPUT_FILE2 + "' as (id, name);");
+        try {
+            DataBag dbfrj = BagFactory.getInstance().newDefaultBag();
+            {
+                pigServer.registerQuery("C = join A by id, B by id using \"skewed\" parallel 1;");
+                Iterator<Tuple> iter = pigServer.openIterator("C");
+                
+                while(iter.hasNext()) {
+                    dbfrj.add(iter.next());
+                }
+            }
+        }catch(Exception e) {
+        	return;
+        }
+        
+        fail("Should throw exception, not enough reducers");
+    }
+    
+    public void testSkewedJoin3Way() throws IOException{
+        pigServer.registerQuery("A = LOAD '" + INPUT_FILE1 + "' as (id, name, n);");
+        pigServer.registerQuery("B = LOAD '" + INPUT_FILE2 + "' as (id, name);");
+        pigServer.registerQuery("C = LOAD '" + INPUT_FILE3 + "' as (id, name);");
+        try {
+            DataBag dbfrj = BagFactory.getInstance().newDefaultBag();
+            {
+                pigServer.registerQuery("D = join A by id, B by id, C by id using \"skewed\" parallel 5;");
+                Iterator<Tuple> iter = pigServer.openIterator("D");
+                
+                while(iter.hasNext()) {
+                    dbfrj.add(iter.next());
+                }
+            }
+        }catch(Exception e) {
+        	return;
+        }
+        
+        fail("Should throw exception, do not support 3 way join");
+    }       
+}