You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by ol...@apache.org on 2009/07/30 02:30:26 UTC
svn commit: r799141 [2/2] - in /hadoop/pig/trunk: ./ src/org/apache/pig/
src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/
src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/partitioners/
src/org/apache/pig/backend/hadoop/...
Added: hadoop/pig/trunk/src/org/apache/pig/impl/builtin/PartitionSkewedKeys.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/impl/builtin/PartitionSkewedKeys.java?rev=799141&view=auto
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/impl/builtin/PartitionSkewedKeys.java (added)
+++ hadoop/pig/trunk/src/org/apache/pig/impl/builtin/PartitionSkewedKeys.java Thu Jul 30 00:30:25 2009
@@ -0,0 +1,322 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.impl.builtin;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.pig.EvalFunc;
+import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.data.BagFactory;
+import org.apache.pig.data.DataBag;
+import org.apache.pig.data.DataType;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.data.TupleFactory;
+import org.apache.pig.impl.io.FileLocalizer;
+import org.apache.pig.impl.util.Pair;
+
+/**
+ * Partition reducers for skewed keys. This is used in skewed join during
+ * sampling process. It figures out how many reducers required to process a
+ * skewed key without causing spill and allocate this number of reducers to this
+ * key. This UDF outputs a map which contains 2 keys:
+ *
+ * <li>"totalreducers": the value is an integer wich indicates the
+ * number of total reducers for this join job </li>
+ * <li>"partition.list": the value is a bag which contains a
+ * list of tuples with each tuple representing partitions for a skewed key.
+ * The tuple has format of <join key>,<min index of reducer>,
+ * <max index of reducer> </li>
+ *
+ * For example, a join job configures 10 reducers, and the sampling process
+ * finds out 2 skewed keys, "swpv" needs 4 reducers and "swps"
+ * needs 2 reducers. The output file would be like following:
+ *
+ * {totalreducers=10, partition.list={(swpv,0,3), (swps,4,5)}}
+ *
+ * The name of this file is set into next MR job which does the actual join.
+ * That job uses this information to partition skewed keys properly
+ *
+ */
+
+public class PartitionSkewedKeys extends EvalFunc<Map<String, Object>> {
+
+ public static final String PARTITION_LIST = "partition.list";
+
+ public static final String TOTAL_REDUCERS = "totalreducers";
+
+ private Log log = LogFactory.getLog(getClass());
+
+ BagFactory mBagFactory = BagFactory.getInstance();
+
+ TupleFactory mTupleFactory = TupleFactory.getInstance();
+
+ private int currentIndex_;
+
+ private int totalReducers_;
+
+ private long totalMemory_;
+
+ private String inputFile_;
+
+ private long inputFileSize_;
+
+ private long totalSampleCount_;
+
+ private double heapPercentage_;
+
+ // specify how many tuple a reducer can hold for a key
+ // this is for testing purpose. If not specified, then
+ // it is calculated based on memory size and size of tuple
+ private int tupleMCount_;
+
+ public PartitionSkewedKeys() {
+ this(null);
+ }
+
+ public PartitionSkewedKeys(String[] args) {
+ totalReducers_ = -1;
+ currentIndex_ = 0;
+ inputFileSize_ = -1;
+
+ if (args != null && args.length > 0) {
+ heapPercentage_ = Double.parseDouble(args[0]);
+ tupleMCount_ = Integer.parseInt(args[1]);
+ inputFile_ = args[2];
+ } else {
+ heapPercentage_ = 0.5;
+ }
+
+ if (log.isDebugEnabled()) {
+ log.debug("pig.skewedjoin.reduce.memusage=" + heapPercentage_);
+ log.debug("input file: " + inputFile_);
+ }
+
+ log.info("input file: " + inputFile_);
+
+ }
+
+ /**
+ * first field in the input tuple is the number of reducers
+ *
+ * second field is the *sorted* bag of samples
+ */
+ public Map<String, Object> exec(Tuple in) throws IOException {
+ // get size of input file in bytes
+ if (inputFileSize_ == -1) {
+ try {
+ inputFileSize_ = FileLocalizer.getSize(inputFile_);
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ Map<String, Object> output = new HashMap<String, Object>();
+
+ if (in == null || in.size() == 0) {
+ return null;
+ }
+
+ totalMemory_ = (long) (Runtime.getRuntime().maxMemory() * heapPercentage_);
+ log.info("Maximum of available memory is " + totalMemory_);
+
+ ArrayList<Tuple> reducerList = new ArrayList<Tuple>();
+
+ Tuple currentTuple = null;
+ long count = 0;
+ long totalMSize = 0;
+ long totalDSize = 0;
+ try {
+ totalReducers_ = (Integer) in.get(0);
+ DataBag samples = (DataBag) in.get(1);
+
+ totalSampleCount_ = samples.size();
+
+ log.info("inputFileSize: " + inputFileSize_);
+ log.info("totalSample: " + totalSampleCount_);
+ log.info("totalReducers: " + totalReducers_);
+
+ int maxReducers = 0;
+ Iterator<Tuple> iter = samples.iterator();
+ while (iter.hasNext()) {
+ Tuple t = iter.next();
+ if (hasSameKey(currentTuple, t) || currentTuple == null) {
+ count++;
+ totalMSize += getMemorySize(t);
+ totalDSize += getDiskSize(t);
+ } else {
+ Pair<Tuple, Integer> p = calculateReducers(currentTuple,
+ count, totalMSize, totalDSize);
+ Tuple rt = p.first;
+ if (rt != null) {
+ reducerList.add(rt);
+ }
+ if (maxReducers < p.second) {
+ maxReducers = p.second;
+ }
+ count = 1;
+ totalMSize = getMemorySize(t);
+ totalDSize = getDiskSize(t);
+ }
+
+ currentTuple = t;
+ }
+
+ // add last key
+ if (count > 0) {
+ Pair<Tuple, Integer> p = calculateReducers(currentTuple, count,
+ totalMSize, totalDSize);
+ Tuple rt = p.first;
+ if (rt != null) {
+ reducerList.add(rt);
+ }
+ if (maxReducers < p.second) {
+ maxReducers = p.second;
+ }
+ }
+
+ if (maxReducers > totalReducers_) {
+ throw new RuntimeException("You need at least " + maxReducers
+ + " reducers to run this job.");
+ }
+
+ output.put(PARTITION_LIST, mBagFactory.newDefaultBag(reducerList));
+ output.put(TOTAL_REDUCERS, Integer.valueOf(totalReducers_));
+
+ log.info(output.toString());
+ if (log.isDebugEnabled()) {
+ log.debug(output.toString());
+ }
+
+ return output;
+ } catch (Exception e) {
+ e.printStackTrace();
+ throw new RuntimeException(e);
+ }
+ }
+
+ private Pair<Tuple, Integer> calculateReducers(Tuple currentTuple,
+ long count, long totalMSize, long totalDSize) {
+ // get average memory size per tuple
+ double avgM = totalMSize / (double) count;
+ // get average disk size per tuple
+ double avgD = totalDSize / (double) count;
+
+ // get the number of tuples that can fit into memory
+ long tupleMCount = (tupleMCount_ <= 0)?(long) (totalMemory_ / avgM): tupleMCount_;
+
+ // get the number of total tuples for this key
+ long tupleCount = (long) (((double) count) / totalSampleCount_
+ * inputFileSize_ / avgD);
+
+
+ int redCount = (int) Math.round(Math.ceil((double) tupleCount
+ / tupleMCount));
+
+ if (log.isDebugEnabled())
+ {
+ log.debug("avgM: " + avgM);
+ log.debug("avgD: " + avgD);
+ log.debug("count: " + count);
+ log.debug("A reducer can take " + tupleMCount + " tuples and "
+ + tupleCount + " tuples are find for " + currentTuple);
+ log.debug("key " + currentTuple + " need " + redCount + " reducers");
+ }
+
+ // this is not a skewed key
+ if (redCount == 1) {
+ return new Pair<Tuple, Integer>(null, 1);
+ }
+
+ Tuple t = this.mTupleFactory.newTuple(currentTuple.size());
+ int i = 0;
+ try {
+ // set keys
+ for (; i < currentTuple.size() - 2; i++) {
+ t.set(i, currentTuple.get(i));
+ }
+
+ // set the min index of reducer for this key
+ t.set(i++, currentIndex_);
+ currentIndex_ = (currentIndex_ + redCount) % totalReducers_ - 1;
+ if (currentIndex_ < 0) {
+ currentIndex_ += totalReducers_;
+ }
+ // set the max index of reducer for this key
+ t.set(i++, currentIndex_);
+ } catch (ExecException e) {
+ throw new RuntimeException("Failed to set value to tuple." + e);
+ }
+
+ currentIndex_ = (currentIndex_ + 1) % totalReducers_;
+
+ Pair<Tuple, Integer> p = new Pair<Tuple, Integer>(t, redCount);
+
+ return p;
+ }
+
+ // the last field of the tuple is a tuple for memory size and disk size
+ private long getMemorySize(Tuple t) {
+ int s = t.size();
+ try {
+ return (Long) t.get(s - 2);
+ } catch (ExecException e) {
+ throw new RuntimeException(
+ "Unable to retrive the size field from tuple.", e);
+ }
+ }
+
+ // the last field of the tuple is a tuple for memory size and disk size
+ private long getDiskSize(Tuple t) {
+ int s = t.size();
+ try {
+ return (Long) t.get(s - 1);
+ } catch (ExecException e) {
+ throw new RuntimeException(
+ "Unable to retrive the size field from tuple.", e);
+ }
+ }
+
+ private boolean hasSameKey(Tuple t1, Tuple t2) {
+ // Have to break the tuple down and compare it field to field.
+ int sz1 = t1 == null ? 0 : t1.size();
+ int sz2 = t2 == null ? 0 : t2.size();
+ if (sz2 != sz1) {
+ return false;
+ }
+
+ for (int i = 0; i < sz1 - 2; i++) {
+ try {
+ int c = DataType.compare(t1.get(i), t2.get(i));
+ if (c != 0) {
+ return false;
+ }
+ } catch (ExecException e) {
+ throw new RuntimeException("Unable to compare tuples", e);
+ }
+ }
+
+ return true;
+ }
+
+}
Modified: hadoop/pig/trunk/src/org/apache/pig/impl/builtin/RandomSampleLoader.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/impl/builtin/RandomSampleLoader.java?rev=799141&r1=799140&r2=799141&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/impl/builtin/RandomSampleLoader.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/impl/builtin/RandomSampleLoader.java Thu Jul 30 00:30:25 2009
@@ -15,9 +15,9 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.pig.impl.builtin;
-
-import java.io.IOException;
+package org.apache.pig.impl.builtin;
+
+import java.io.IOException;
import java.util.Map;
import org.apache.pig.ExecType;
@@ -26,20 +26,22 @@
import org.apache.pig.backend.datastorage.DataStorage;
import org.apache.pig.data.DataBag;
import org.apache.pig.data.Tuple;
+import org.apache.pig.data.TupleFactory;
import org.apache.pig.impl.PigContext;
import org.apache.pig.impl.io.BufferedPositionedInputStream;
import org.apache.pig.impl.logicalLayer.schema.Schema;
-
-
+
+
/**
* A loader that samples the data. This loader can subsume loader that
* can handle starting in the middle of a record. Loaders that can
* handle this should implement the SamplableLoader interface.
*/
public class RandomSampleLoader implements LoadFunc {
-
+
private int numSamples;
- private long skipInterval;
+ private long skipInterval;
+ private TupleFactory factory;
private SamplableLoader loader;
/**
@@ -54,21 +56,27 @@
loader = (SamplableLoader)PigContext.instantiateFuncFromSpec(funcSpec);
numSamples = Integer.valueOf(ns);
}
-
- @Override
- public void bindTo(String fileName, BufferedPositionedInputStream is, long offset, long end) throws IOException {
- skipInterval = (end - offset)/numSamples;
+
+
+ public void bindTo(String fileName, BufferedPositionedInputStream is, long offset, long end) throws IOException {
+ skipInterval = (end - offset)/numSamples;
loader.bindTo(fileName, is, offset, end);
- }
-
- @Override
- public Tuple getNext() throws IOException {
+ }
+
+
+ public Tuple getNext() throws IOException {
long initialPos = loader.getPosition();
- Tuple t = loader.getSampledTuple();
+
+ // make sure we move to a boundry of a record
+ Tuple t = loader.getSampledTuple();
+ long middlePos = loader.getPosition();
+
+ // we move to next boundry
+ t = loader.getSampledTuple();
long finalPos = loader.getPosition();
-
- long toSkip = skipInterval - (finalPos - initialPos);
- if (toSkip > 0) {
+
+ long toSkip = skipInterval - (finalPos - initialPos);
+ if (toSkip > 0) {
long rc = loader.skip(toSkip);
// if we did not skip enough
@@ -84,13 +92,31 @@
}
remainingSkip -= rc;
}
- }
- return t;
- }
-
+ }
+
+ if (t == null) {
+ return null;
+ }
+
+ if (factory == null) {
+ factory = TupleFactory.getInstance();
+ }
+
+ // copy existing field
+ Tuple m = factory.newTuple(t.size()+1);
+ for(int i=0; i<t.size(); i++) {
+ m.set(i, t.get(i));
+ }
+
+ // add size of the tuple at the end
+ m.set(t.size(), (finalPos-middlePos));
+
+ return m;
+ }
+
public Integer bytesToInteger(byte[] b) throws IOException {
return loader.bytesToInteger(b);
- }
+ }
public Long bytesToLong(byte[] b) throws IOException {
return loader.bytesToLong(b);
@@ -130,4 +156,4 @@
DataStorage storage) throws IOException {
return loader.determineSchema(fileName, execType, storage);
}
-}
\ No newline at end of file
+}
Added: hadoop/pig/trunk/src/org/apache/pig/impl/builtin/TupleSize.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/impl/builtin/TupleSize.java?rev=799141&view=auto
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/impl/builtin/TupleSize.java (added)
+++ hadoop/pig/trunk/src/org/apache/pig/impl/builtin/TupleSize.java Thu Jul 30 00:30:25 2009
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.impl.builtin;
+
+import java.io.IOException;
+import java.lang.reflect.Type;
+
+import org.apache.pig.EvalFunc;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.data.TupleFactory;
+
+/**
+ * UDF to get memory and disk size of a tuple.
+ * It is used by skewed join.
+ *
+ */
+
+public class TupleSize extends EvalFunc<Tuple>{
+
+ private TupleFactory factory;
+
+ public TupleSize() {
+ factory = TupleFactory.getInstance();
+ }
+
+ /**
+ * Get memory size and disk size of input tuple
+ */
+ public Tuple exec(Tuple in) throws IOException {
+ if (in == null) {
+ return null;
+ }
+
+ Tuple t = factory.newTuple(2);
+ t.set(0, in.getMemorySize());
+ t.set(1, in.get(in.size()-1));
+
+ return t;
+ }
+
+ public Type getReturnType(){
+ return Tuple.class;
+ }
+}
Modified: hadoop/pig/trunk/src/org/apache/pig/impl/io/FileLocalizer.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/impl/io/FileLocalizer.java?rev=799141&r1=799140&r2=799141&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/impl/io/FileLocalizer.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/impl/io/FileLocalizer.java Thu Jul 30 00:30:25 2009
@@ -28,6 +28,7 @@
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
+import java.util.Map;
import java.util.Random;
import java.util.Stack;
import java.util.Properties ;
@@ -42,6 +43,7 @@
import org.apache.pig.backend.datastorage.ElementDescriptor;
import org.apache.pig.backend.hadoop.datastorage.HDataStorage;
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigInputFormat;
+import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigMapReduce;
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.SliceWrapper;
import org.apache.pig.impl.PigContext;
import org.apache.pig.impl.util.WrappedIOException;
@@ -151,11 +153,19 @@
public static InputStream openDFSFile(String fileName) throws IOException {
SliceWrapper wrapper = PigInputFormat.getActiveSplit();
- if (wrapper == null)
+ JobConf conf = null;
+ if (wrapper == null) {
+ conf = PigMapReduce.sJobConf;
+ }else{
+ conf = wrapper.getJobConf();
+ }
+
+ if (conf == null) {
throw new RuntimeException(
"can't open DFS file while executing locally");
+ }
- return openDFSFile(fileName, ConfigurationUtil.toProperties(wrapper.getJobConf()));
+ return openDFSFile(fileName, ConfigurationUtil.toProperties(conf));
}
@@ -165,6 +175,42 @@
return openDFSFile(elem);
}
+ public static long getSize(String fileName) throws IOException {
+ SliceWrapper wrapper = PigInputFormat.getActiveSplit();
+
+ JobConf conf = null;
+ if (wrapper == null) {
+ conf = PigMapReduce.sJobConf;
+ }else{
+ conf = wrapper.getJobConf();
+ }
+
+ if (conf == null) {
+ throw new RuntimeException(
+ "can't open DFS file while executing locally");
+ }
+
+ return getSize(fileName, ConfigurationUtil.toProperties(conf));
+ }
+
+ public static long getSize(String fileName, Properties properties) throws IOException {
+ DataStorage dds = new HDataStorage(properties);
+ ElementDescriptor elem = dds.asElement(fileName);
+
+ // recursively get all the files under this path
+ ElementDescriptor[] allElems = getFileElementDescriptors(elem);
+
+ long size = 0;
+
+ // add up the sizes of all files found
+ for (int i=0; i<allElems.length; i++) {
+ Map<String, Object> stats = allElems[i].getStatistics();
+ size += (Long) (stats.get(ElementDescriptor.LENGTH_KEY));
+ }
+
+ return size;
+ }
+
private static InputStream openDFSFile(ElementDescriptor elem) throws IOException{
ElementDescriptor[] elements = null;
Added: hadoop/pig/trunk/src/org/apache/pig/impl/io/NullablePartitionWritable.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/impl/io/NullablePartitionWritable.java?rev=799141&view=auto
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/impl/io/NullablePartitionWritable.java (added)
+++ hadoop/pig/trunk/src/org/apache/pig/impl/io/NullablePartitionWritable.java Thu Jul 30 00:30:25 2009
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.impl.io;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+/**
+ * NullablePartitionWritable is an adaptor class around PigNullableWritable that adds a partition
+ * index to the class.
+ */
+public class NullablePartitionWritable extends PigNullableWritable{
+ private byte partitionIndex;
+ private PigNullableWritable key;
+
+ public NullablePartitionWritable() {
+
+ }
+
+ public NullablePartitionWritable(PigNullableWritable k) {
+ setKey(k);
+ }
+
+ public void setKey(PigNullableWritable k) {
+ key = k;
+ }
+
+ public PigNullableWritable getKey() {
+ return key;
+ }
+
+ public void setPartition(byte n) {
+ partitionIndex = n;
+ }
+
+ public byte getPartition() {
+ return partitionIndex;
+ }
+
+ public int compareTo(Object o) {
+ return key.compareTo(((NullablePartitionWritable)o).getKey());
+ }
+
+ public void readFields(DataInput in) throws IOException {
+ String c = in.readUTF();
+ try{
+ key = (PigNullableWritable)Class.forName(c).newInstance();
+ }catch(Exception e) {
+ throw new IOException(e);
+ }
+ key.readFields(in);
+ }
+
+ public void write(DataOutput out) throws IOException {
+ out.writeUTF(key.getClass().getName());
+ key.write(out);
+ }
+
+ public boolean isNull() {
+ return key.isNull();
+ }
+
+ public void setNull(boolean isNull) {
+ key.setNull(isNull);
+ }
+
+ public byte getIndex() {
+ return key.getIndex();
+ }
+
+ public void setIndex(byte index) {
+ key.setIndex(index);
+ }
+
+ public Object getValueAsPigType() {
+ return key.getValueAsPigType();
+ }
+
+ public int hashCode() {
+ return key.hashCode();
+ }
+
+ public String toString() {
+ return "Partition: " + partitionIndex + " " + key.toString();
+ }
+}
Modified: hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/DotLOPrinter.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/DotLOPrinter.java?rev=799141&r1=799140&r2=799141&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/DotLOPrinter.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/DotLOPrinter.java Thu Jul 30 00:30:25 2009
@@ -90,6 +90,9 @@
else if(op instanceof LOFRJoin){
return ((LOFRJoin)op).getJoinColPlans();
}
+ else if(op instanceof LOJoin){
+ return ((LOJoin)op).getJoinPlans();
+ }
return new MultiMap<LogicalOperator, LogicalPlan>();
}
Added: hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/LOJoin.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/LOJoin.java?rev=799141&view=auto
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/LOJoin.java (added)
+++ hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/LOJoin.java Thu Jul 30 00:30:25 2009
@@ -0,0 +1,499 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.impl.logicalLayer;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.HashSet;
+import java.util.HashMap;
+import java.util.Hashtable;
+import java.util.Iterator;
+import java.util.Map.Entry;
+
+import org.apache.pig.PigException;
+import org.apache.pig.data.DataType;
+import org.apache.pig.impl.plan.Operator;
+import org.apache.pig.impl.plan.OperatorKey;
+import org.apache.pig.impl.plan.PlanException;
+import org.apache.pig.impl.plan.VisitorException;
+import org.apache.pig.impl.logicalLayer.parser.ParseException;
+import org.apache.pig.impl.logicalLayer.schema.Schema;
+import org.apache.pig.impl.logicalLayer.schema.Schema.FieldSchema;
+import org.apache.pig.impl.logicalLayer.optimizer.SchemaRemover;
+import org.apache.pig.impl.logicalLayer.schema.SchemaMergeException;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.pig.impl.util.MultiMap;
+import org.apache.pig.impl.util.Pair;
+import org.apache.pig.impl.plan.RequiredFields;
+import org.apache.pig.impl.plan.ProjectionMap;
+
+public class LOJoin extends LogicalOperator {
+ private static final long serialVersionUID = 2L;
+
+ /**
+ * Enum for the type of join
+ */
+ public static enum JOINTYPE {
+ REGULAR, /// Regular join
+ REPLICATED, /// Fragment Replicated join
+ SKEWED /// Skewed Join
+ };
+
+ /**
+ * LOJoin contains a list of logical operators corresponding to the
+ * relational operators and a list of generates for each relational
+ * operator. Each generate operator in turn contains a list of expressions
+ * for the columns that are projected
+ */
+ private static Log log = LogFactory.getLog(LOJoin.class);
+ private MultiMap<LogicalOperator, LogicalPlan> mJoinPlans;
+
+ private JOINTYPE mJoinType; // Retains the type of the join
+
+ /**
+ *
+ * @param plan
+ * LogicalPlan this operator is a part of.
+ * @param k
+ * OperatorKey for this operator
+ * @param joinPlans
+ * the join columns
+ * @param jt
+ * indicates the type of join - regular, skewed or fragment replicated
+ */
+ public LOJoin(
+ LogicalPlan plan,
+ OperatorKey k,
+ MultiMap<LogicalOperator, LogicalPlan> joinPlans,
+ JOINTYPE jt) {
+ super(plan, k);
+ mJoinPlans = joinPlans;
+ mJoinType = jt;
+ }
+
+ public List<LogicalOperator> getInputs() {
+ return mPlan.getPredecessors(this);
+ }
+
+ public MultiMap<LogicalOperator, LogicalPlan> getJoinPlans() {
+ return mJoinPlans;
+ }
+
+ public void setJoinPlans(MultiMap<LogicalOperator, LogicalPlan> joinPlans) {
+System.out.println("#@ resetting join plans");
+ mJoinPlans = joinPlans;
+ }
+
+ /**
+ * Returns the type of join.
+ */
+ public JOINTYPE getJoinType() {
+ return mJoinType;
+ }
+
+ @Override
+ public String name() {
+ return "LOJoin " + mKey.scope + "-" + mKey.id;
+ }
+
+ @Override
+ public boolean supportsMultipleInputs() {
+ return true;
+ }
+
+ @Override
+ public Schema getSchema() throws FrontendException {
+ List<LogicalOperator> inputs = mPlan.getPredecessors(this);
+ mType = DataType.BAG;//mType is from the super class
+ Hashtable<String, Integer> nonDuplicates = new Hashtable<String, Integer>();
+ if(!mIsSchemaComputed){
+ List<Schema.FieldSchema> fss = new ArrayList<Schema.FieldSchema>();
+ int i=-1;
+ for (LogicalOperator op : inputs) {
+ try {
+ Schema cSchema = op.getSchema();
+ if(cSchema!=null){
+
+ for (FieldSchema schema : cSchema.getFields()) {
+ ++i;
+ if(nonDuplicates.containsKey(schema.alias))
+ {
+ if(nonDuplicates.get(schema.alias)!=-1) {
+ nonDuplicates.remove(schema.alias);
+ nonDuplicates.put(schema.alias, -1);
+ }
+ }
+ else
+ nonDuplicates.put(schema.alias, i);
+ FieldSchema newFS = new FieldSchema(op.getAlias()+"::"+schema.alias,schema.schema,schema.type);
+ newFS.setParent(schema.canonicalName, op);
+ fss.add(newFS);
+ }
+ }
+ else
+ fss.add(new FieldSchema(null,DataType.BYTEARRAY));
+ } catch (FrontendException ioe) {
+ mIsSchemaComputed = false;
+ mSchema = null;
+ throw ioe;
+ }
+ }
+ mIsSchemaComputed = true;
+ for (Entry<String, Integer> ent : nonDuplicates.entrySet()) {
+ int ind = ent.getValue();
+ if(ind==-1) continue;
+ FieldSchema prevSch = fss.get(ind);
+ fss.set(ind, new FieldSchema(ent.getKey(),prevSch.schema,prevSch.type));
+ }
+ mSchema = new Schema(fss);
+ }
+ return mSchema;
+ }
+
+ public boolean isTupleJoinCol() {
+ List<LogicalOperator> inputs = mPlan.getPredecessors(this);
+ if (inputs == null || inputs.size() == 0) {
+ throw new AssertionError("LOJoin.isTupleJoinCol() can only becalled "
+ + "after it has an input ") ;
+ }
+ return mJoinPlans.get(inputs.get(0)).size() > 1 ;
+ }
+
+ @Override
+ public void visit(LOVisitor v) throws VisitorException {
+ v.visit(this);
+ }
+
+ /***
+ *
+ * This does switch the mapping
+ *
+ * oldOp -> List of inner plans
+ * to
+ * newOp -> List of inner plans
+ *
+ * which is useful when there is a structural change in LogicalPlan
+ *
+ * @param oldOp the old operator
+ * @param newOp the new operator
+ */
+ public void switchJoinColPlanOp(LogicalOperator oldOp,
+ LogicalOperator newOp) {
+ Collection<LogicalPlan> innerPlans = mJoinPlans.removeKey(oldOp) ;
+ mJoinPlans.put(newOp, innerPlans);
+ }
+
+ public void unsetSchema() throws VisitorException{
+ for(LogicalOperator input: getInputs()) {
+ Collection<LogicalPlan> joinPlans = mJoinPlans.get(input);
+ if(joinPlans!=null)
+ for(LogicalPlan plan : joinPlans) {
+ SchemaRemover sr = new SchemaRemover(plan);
+ sr.visit();
+ }
+ }
+ super.unsetSchema();
+ }
+
+ /**
+ * This can be used to get the merged type of output join col
+ * only when the join col is of atomic type
+ * @return The type of the join col
+ */
+ public byte getAtomicJoinColType() throws FrontendException {
+ if (isTupleJoinCol()) {
+ int errCode = 1010;
+ String msg = "getAtomicJoinColType is used only when"
+ + " dealing with atomic group col";
+ throw new FrontendException(msg, errCode, PigException.INPUT, false, null) ;
+ }
+
+ byte groupType = DataType.BYTEARRAY ;
+ // merge all the inner plan outputs so we know what type
+ // our group column should be
+ for(int i=0;i < getInputs().size(); i++) {
+ LogicalOperator input = getInputs().get(i) ;
+ List<LogicalPlan> innerPlans
+ = new ArrayList<LogicalPlan>(getJoinPlans().get(input)) ;
+ if (innerPlans.size() != 1) {
+ int errCode = 1012;
+ String msg = "Each COGroup input has to have "
+ + "the same number of inner plans";
+ throw new FrontendException(msg, errCode, PigException.INPUT, false, null) ;
+ }
+ byte innerType = innerPlans.get(0).getSingleLeafPlanOutputType() ;
+ groupType = DataType.mergeType(groupType, innerType) ;
+ }
+
+ return groupType ;
+ }
+
+ /*
+ This implementation is based on the assumption that all the
+ inputs have the same join col tuple arity.
+ */
+ public Schema getTupleJoinSchema() throws FrontendException {
+ if (!isTupleJoinCol()) {
+ int errCode = 1011;
+ String msg = "getTupleJoinSchema is used only when"
+ + " dealing with tuple join col";
+ throw new FrontendException(msg, errCode, PigException.INPUT, false, null) ;
+ }
+
+ // this fsList represents all the columns in group tuple
+ List<Schema.FieldSchema> fsList = new ArrayList<Schema.FieldSchema>() ;
+
+ int outputSchemaSize = getJoinPlans().get(getInputs().get(0)).size() ;
+
+ // by default, they are all bytearray
+ // for type checking, we don't care about aliases
+ for(int i=0; i<outputSchemaSize; i++) {
+ fsList.add(new Schema.FieldSchema(null, DataType.BYTEARRAY)) ;
+ }
+
+ // merge all the inner plan outputs so we know what type
+ // our group column should be
+ for(int i=0;i < getInputs().size(); i++) {
+ LogicalOperator input = getInputs().get(i) ;
+ List<LogicalPlan> innerPlans
+ = new ArrayList<LogicalPlan>(getJoinPlans().get(input)) ;
+
+ boolean seenProjectStar = false;
+ for(int j=0;j < innerPlans.size(); j++) {
+ byte innerType = innerPlans.get(j).getSingleLeafPlanOutputType() ;
+ ExpressionOperator eOp = (ExpressionOperator)innerPlans.get(j).getSingleLeafPlanOutputOp();
+
+ if(eOp instanceof LOProject) {
+ if(((LOProject)eOp).isStar()) {
+ seenProjectStar = true;
+ }
+ }
+
+ Schema.FieldSchema groupFs = fsList.get(j);
+ groupFs.type = DataType.mergeType(groupFs.type, innerType) ;
+ Schema.FieldSchema fs = eOp.getFieldSchema();
+ if(null != fs) {
+ groupFs.setParent(eOp.getFieldSchema().canonicalName, eOp);
+ } else {
+ groupFs.setParent(null, eOp);
+ }
+ }
+
+ if(seenProjectStar) {
+ int errCode = 1013;
+ String msg = "Grouping attributes can either be star (*) or a list of expressions, but not both.";
+ throw new FrontendException(msg, errCode, PigException.INPUT, false, null);
+ }
+
+ }
+
+ return new Schema(fsList) ;
+ }
+
+ /**
+ * @see org.apache.pig.impl.logicalLayer.LogicalOperator#clone()
+ * Do not use the clone method directly. Operators are cloned when logical plans
+ * are cloned using {@link LogicalPlanCloner}
+ */
+ @Override
+ protected Object clone() throws CloneNotSupportedException {
+
+ // first start with LogicalOperator clone
+ LOJoin joinClone = (LOJoin)super.clone();
+
+ // create deep copy of other cogroup specific members
+ joinClone.mJoinPlans = new MultiMap<LogicalOperator, LogicalPlan>();
+ for (Iterator<LogicalOperator> it = mJoinPlans.keySet().iterator(); it.hasNext();) {
+ LogicalOperator relOp = it.next();
+ Collection<LogicalPlan> values = mJoinPlans.get(relOp);
+ for (Iterator<LogicalPlan> planIterator = values.iterator(); planIterator.hasNext();) {
+ LogicalPlanCloneHelper lpCloneHelper = new LogicalPlanCloneHelper(planIterator.next());
+ joinClone.mJoinPlans.put(relOp, lpCloneHelper.getClonedPlan());
+ }
+ }
+
+ return joinClone;
+ }
+
+ @Override
+ public ProjectionMap getProjectionMap() {
+
+ if(mIsProjectionMapComputed) return mProjectionMap;
+ mIsProjectionMapComputed = true;
+
+ Schema outputSchema;
+
+ try {
+ outputSchema = getSchema();
+ } catch (FrontendException fee) {
+ mProjectionMap = null;
+ return mProjectionMap;
+ }
+
+ if(outputSchema == null) {
+ mProjectionMap = null;
+ return mProjectionMap;
+ }
+
+ List<LogicalOperator> predecessors = (ArrayList<LogicalOperator>)mPlan.getPredecessors(this);
+ if(predecessors == null) {
+ mProjectionMap = null;
+ return mProjectionMap;
+ }
+
+ MultiMap<Integer, ProjectionMap.Column> mapFields = new MultiMap<Integer, ProjectionMap.Column>();
+ List<Integer> addedFields = new ArrayList<Integer>();
+ boolean[] unknownSchema = new boolean[predecessors.size()];
+ boolean anyUnknownInputSchema = false;
+ int outputColumnNum = 0;
+
+ for(int inputNum = 0; inputNum < predecessors.size(); ++inputNum) {
+ LogicalOperator predecessor = predecessors.get(inputNum);
+ Schema inputSchema = null;
+
+ try {
+ inputSchema = predecessor.getSchema();
+ } catch (FrontendException fee) {
+ mProjectionMap = null;
+ return mProjectionMap;
+ }
+
+ if(inputSchema == null) {
+ unknownSchema[inputNum] = true;
+ outputColumnNum++;
+ addedFields.add(inputNum);
+ anyUnknownInputSchema = true;
+ } else {
+ unknownSchema[inputNum] = false;
+ for(int inputColumn = 0; inputColumn < inputSchema.size(); ++inputColumn) {
+ mapFields.put(outputColumnNum++,
+ new ProjectionMap.Column(new Pair<Integer, Integer>(inputNum, inputColumn)));
+ }
+ }
+ }
+
+ //TODO
+ /*
+ * For now, if there is any input that has an unknown schema
+ * flag it and return a null ProjectionMap.
+ * In the future, when unknown schemas are handled
+ * mark inputs that have unknown schemas as output columns
+ * that have been added.
+ */
+
+ if(anyUnknownInputSchema) {
+ mProjectionMap = null;
+ return mProjectionMap;
+ }
+
+ if(addedFields.size() == 0) {
+ addedFields = null;
+ }
+
+ mProjectionMap = new ProjectionMap(mapFields, null, addedFields);
+ return mProjectionMap;
+ }
+
+ @Override
+ public List<RequiredFields> getRequiredFields() {
+ List<LogicalOperator> predecessors = mPlan.getPredecessors(this);
+
+ if(predecessors == null) {
+ return null;
+ }
+
+ List<RequiredFields> requiredFields = new ArrayList<RequiredFields>();
+
+ for(int inputNum = 0; inputNum < predecessors.size(); ++inputNum) {
+ Set<Pair<Integer, Integer>> fields = new HashSet<Pair<Integer, Integer>>();
+ Set<LOProject> projectSet = new HashSet<LOProject>();
+ boolean groupByStar = false;
+
+ for (LogicalPlan plan : this.getJoinPlans().get(predecessors.get(inputNum))) {
+ TopLevelProjectFinder projectFinder = new TopLevelProjectFinder(plan);
+ try {
+ projectFinder.visit();
+ } catch (VisitorException ve) {
+ requiredFields.clear();
+ requiredFields.add(null);
+ return requiredFields;
+ }
+ projectSet.addAll(projectFinder.getProjectSet());
+ if(projectFinder.getProjectStarSet() != null) {
+ groupByStar = true;
+ }
+ }
+
+ if(groupByStar) {
+ requiredFields.add(new RequiredFields(true));
+ } else {
+ for (LOProject project : projectSet) {
+ for (int inputColumn : project.getProjection()) {
+ fields.add(new Pair<Integer, Integer>(inputNum, inputColumn));
+ }
+ }
+
+ if(fields.size() == 0) {
+ requiredFields.add(new RequiredFields(false, true));
+ } else {
+ requiredFields.add(new RequiredFields(new ArrayList<Pair<Integer, Integer>>(fields)));
+ }
+ }
+ }
+
+ return (requiredFields.size() == 0? null: requiredFields);
+ }
+
+ /* (non-Javadoc)
+ * @see org.apache.pig.impl.plan.Operator#rewire(org.apache.pig.impl.plan.Operator, org.apache.pig.impl.plan.Operator)
+ */
+ @Override
+ @SuppressWarnings("unchecked")
+ public void rewire(Operator oldPred, int oldPredIndex, Operator newPred, boolean useOldPred) throws PlanException {
+ super.rewire(oldPred, oldPredIndex, newPred, useOldPred);
+ LogicalOperator previous = (LogicalOperator) oldPred;
+ LogicalOperator current = (LogicalOperator) newPred;
+ Set<LogicalOperator> joinInputs = new HashSet<LogicalOperator>(mJoinPlans.keySet());
+ for(LogicalOperator input: joinInputs) {
+ if(input.equals(previous)) {
+ //replace the references to the key(i.e., previous) in the values with current
+ for(LogicalPlan plan: mJoinPlans.get(input)) {
+ try {
+ ProjectFixerUpper projectFixer = new ProjectFixerUpper(
+ plan, previous, oldPredIndex, current, useOldPred, this);
+ projectFixer.visit();
+ } catch (VisitorException ve) {
+ int errCode = 2144;
+ String msg = "Problem while fixing project inputs during rewiring.";
+ throw new PlanException(msg, errCode, PigException.BUG, ve);
+ }
+ }
+ //remove the key and the values
+ List<LogicalPlan> plans = (List<LogicalPlan>)mJoinPlans.get(previous);
+ mJoinPlans.removeKey(previous);
+
+ //reinsert new key and values
+ mJoinPlans.put(current, plans);
+ }
+ }
+ }
+}
Modified: hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/LOPrinter.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/LOPrinter.java?rev=799141&r1=799140&r2=799141&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/LOPrinter.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/LOPrinter.java Thu Jul 30 00:30:25 2009
@@ -161,6 +161,15 @@
}
}
}
+ else if(node instanceof LOJoin){
+ MultiMap<LogicalOperator, LogicalPlan> plans = ((LOJoin)node).getJoinPlans();
+ for (LogicalOperator lo : plans.keySet()) {
+ // Visit the associated plans
+ for (LogicalPlan plan : plans.get(lo)) {
+ sb.append(planString(plan));
+ }
+ }
+ }
else if(node instanceof LOSort){
sb.append(planString(((LOSort)node).getSortColPlans()));
}
Modified: hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/LOVisitor.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/LOVisitor.java?rev=799141&r1=799140&r2=799141&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/LOVisitor.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/LOVisitor.java Thu Jul 30 00:30:25 2009
@@ -22,6 +22,7 @@
import java.util.Set;
import java.util.Map;
import java.util.ArrayList;
+import java.util.Collection;
import org.apache.pig.impl.plan.PlanVisitor;
import org.apache.pig.impl.plan.PlanWalker;
@@ -154,6 +155,32 @@
}
}
+
+ /**
+ *
+ * @param loj
+ * the logical join operator that has to be visited
+ * @throws VisitorException
+ */
+ @SuppressWarnings("unchecked")
+ protected void visit(LOJoin loj) throws VisitorException {
+ // Visit each of the inputs of cogroup.
+ MultiMap<LogicalOperator, LogicalPlan> mapJoinPlans = loj.getJoinPlans();
+ for(LogicalOperator op: loj.getInputs()) {
+ for(LogicalPlan lp: mapJoinPlans.get(op)) {
+ if (null != lp) {
+ // TODO FIX - How do we know this should be a
+ // DependencyOrderWalker? We should be replicating the
+ // walker the current visitor is using.
+ PlanWalker w = new DependencyOrderWalker(lp);
+ pushWalker(w);
+ w.walk(this);
+ popWalker();
+ }
+ }
+ }
+ }
+
/**
*
* @param forEach
Modified: hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/PlanSetter.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/PlanSetter.java?rev=799141&r1=799140&r2=799141&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/PlanSetter.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/PlanSetter.java Thu Jul 30 00:30:25 2009
@@ -63,6 +63,11 @@
super.visit(op);
}
+ public void visit(LOJoin op) throws VisitorException {
+ op.setPlan(mCurrentWalker.getPlan());
+ super.visit(op);
+ }
+
public void visit(LOConst op) throws VisitorException {
op.setPlan(mCurrentWalker.getPlan());
}
Modified: hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/ProjectStarTranslator.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/ProjectStarTranslator.java?rev=799141&r1=799140&r2=799141&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/ProjectStarTranslator.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/ProjectStarTranslator.java Thu Jul 30 00:30:25 2009
@@ -101,6 +101,32 @@
}
}
+ /* (non-Javadoc)
+ * @see org.apache.pig.impl.logicalLayer.LOVisitor#visit(org.apache.pig.impl.logicalLayer.LOJoin)
+ */
+ @Override
+ protected void visit(LOJoin frj) throws VisitorException {
+ //get the attributes of LOJoin that are modified during the translation
+
+ MultiMap<LogicalOperator, LogicalPlan> joinColPlans = frj.getJoinPlans();
+
+ for(LogicalOperator op: frj.getInputs()) {
+ ArrayList<LogicalPlan> newPlansAfterTranslation = new ArrayList<LogicalPlan>();
+ for(LogicalPlan lp: joinColPlans.get(op)) {
+ if (checkPlanForProjectStar(lp)) {
+ ArrayList<LogicalPlan> translatedPlans = translateProjectStarInPlan(lp);
+ for(int j = 0; j < translatedPlans.size(); ++j) {
+ newPlansAfterTranslation.add(translatedPlans.get(j));
+ }
+ } else {
+ newPlansAfterTranslation.add(lp);
+ }
+ }
+ joinColPlans.removeKey(op);
+ joinColPlans.put(op, newPlansAfterTranslation);
+ }
+ }
+
/**
*
* @param forEach
Modified: hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/optimizer/LogicalTransformer.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/optimizer/LogicalTransformer.java?rev=799141&r1=799140&r2=799141&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/optimizer/LogicalTransformer.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/optimizer/LogicalTransformer.java Thu Jul 30 00:30:25 2009
@@ -31,6 +31,7 @@
import org.apache.pig.impl.plan.optimizer.Transformer;
import org.apache.pig.impl.logicalLayer.FrontendException;
import org.apache.pig.impl.logicalLayer.LOFRJoin;
+import org.apache.pig.impl.logicalLayer.LOJoin;
import org.apache.pig.impl.logicalLayer.LogicalOperator;
import org.apache.pig.impl.logicalLayer.LogicalPlan;
import org.apache.pig.impl.logicalLayer.LOCogroup;
@@ -135,6 +136,10 @@
LOFRJoin frj = (LOFRJoin) before ;
frj.switchJoinColPlanOp(after, newNode);
}
+ if (before instanceof LOJoin) {
+ LOJoin frj = (LOJoin) before ;
+ frj.switchJoinColPlanOp(after, newNode);
+ }
// Visit all the inner plans of before and change their projects to
// connect to newNode instead of after.
@@ -144,6 +149,8 @@
plans.addAll((((LOCogroup)before).getGroupByPlans()).values());
} else if (before instanceof LOFRJoin) {
plans.addAll((((LOFRJoin)before).getJoinColPlans()).values());
+ } else if (before instanceof LOJoin) {
+ plans.addAll((((LOJoin)before).getJoinPlans()).values());
}
else if (before instanceof LOSort) {
plans.addAll(((LOSort)before).getSortColPlans());
Modified: hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/optimizer/OpLimitOptimizer.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/optimizer/OpLimitOptimizer.java?rev=799141&r1=799140&r2=799141&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/optimizer/OpLimitOptimizer.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/optimizer/OpLimitOptimizer.java Thu Jul 30 00:30:25 2009
@@ -37,6 +37,7 @@
import org.apache.pig.impl.logicalLayer.LOSplitOutput;
import org.apache.pig.impl.logicalLayer.LOUnion;
import org.apache.pig.impl.logicalLayer.LOFRJoin;
+import org.apache.pig.impl.logicalLayer.LOJoin;
import org.apache.pig.impl.logicalLayer.LogicalOperator;
import org.apache.pig.impl.logicalLayer.LogicalPlan;
import org.apache.pig.impl.plan.DepthFirstWalker;
@@ -137,7 +138,7 @@
// Limit cannot be pushed up
if (predecessor instanceof LOCogroup || predecessor instanceof LOFilter ||
predecessor instanceof LOLoad || predecessor instanceof LOSplit ||
- predecessor instanceof LOSplitOutput || predecessor instanceof LODistinct || predecessor instanceof LOFRJoin)
+ predecessor instanceof LOSplitOutput || predecessor instanceof LODistinct || predecessor instanceof LOFRJoin || predecessor instanceof LOJoin)
{
return;
}
Modified: hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/optimizer/SchemaRemover.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/optimizer/SchemaRemover.java?rev=799141&r1=799140&r2=799141&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/optimizer/SchemaRemover.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/optimizer/SchemaRemover.java Thu Jul 30 00:30:25 2009
@@ -211,4 +211,8 @@
super.visit(frj);
}
+ protected void visit(LOJoin frj) throws VisitorException {
+ frj.unsetSchema();
+ super.visit(frj);
+ }
}
Modified: hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/parser/QueryParser.jjt
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/parser/QueryParser.jjt?rev=799141&r1=799140&r2=799141&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/parser/QueryParser.jjt (original)
+++ hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/parser/QueryParser.jjt Thu Jul 30 00:30:25 2009
@@ -299,6 +299,63 @@
}
/**
+ * Join parser. Currently can only handle skewed joins.
+ */
+ LogicalOperator parseJoin(ArrayList<CogroupInput> gis, LogicalPlan lp, LOJoin.JOINTYPE jt) throws ParseException, PlanException{
+ log.trace("Entering parseJoin");
+ // Skewed Join behaves as regular join in local mode
+ if (pigContext.getExecType() == ExecType.LOCAL && jt == LOJoin.JOINTYPE.SKEWED) {
+ return rewriteJoin(gis,lp);
+ }
+
+ int n = gis.size();
+
+ if (jt == LOJoin.JOINTYPE.SKEWED && n != 2) {
+ throw new ParseException("Skewed join can only be applied for 2-way joins");
+ }
+
+ ArrayList<LogicalOperator> los = new ArrayList<LogicalOperator>();
+ ArrayList<ArrayList<LogicalPlan>> plans = new ArrayList<ArrayList<LogicalPlan>>();
+ MultiMap<LogicalOperator, LogicalPlan> joinPlans = new MultiMap<LogicalOperator, LogicalPlan>();
+ boolean[] isInner = new boolean[n];
+
+ int arity = gis.get(0).plans.size();
+
+ for (int i = 0; i < n ; i++){
+
+ CogroupInput gi = gis.get(i);
+ los.add(gi.op);
+ ArrayList<LogicalPlan> planList = gi.plans;
+ plans.add(gi.plans);
+ int numJoinOps = planList.size();
+ log.debug("Number of join operators = " + numJoinOps);
+
+ if(arity != numJoinOps) {
+ throw new ParseException("The arity of the join columns do not match.");
+ }
+ for(int j = 0; j < numJoinOps; ++j) {
+ joinPlans.put(gi.op, planList.get(j));
+ for(LogicalOperator root: planList.get(j).getRoots()) {
+ log.debug("Join input plan root: " + root);
+ }
+ }
+ isInner[i] = gi.isInner;
+ }
+
+ LogicalOperator loj = new LOJoin(lp, new OperatorKey(scope, getNextId()), joinPlans, jt);
+ lp.add(loj);
+ log.debug("Added operator " + loj.getClass().getName() + " object " + loj + " to the logical plan " + lp);
+
+ for(LogicalOperator op: los) {
+ lp.connect(op, loj);
+ log.debug("Connected operator " + op.getClass().getName() + " to " + loj.getClass().getName() + " in the logical plan");
+ }
+
+ log.trace("Exiting parseJoin");
+ return loj;
+ }
+
+ /**
* Mimicing parseCogroup as the parsing logic for FRJoin remains exactly the same.
*/
LogicalOperator parseFRJoin(ArrayList<CogroupInput> gis, LogicalPlan lp) throws ParseException, PlanException{
@@ -1855,13 +1912,25 @@
log.trace("Entering JoinClause");
log.debug("LogicalPlan: " + lp);
LogicalOperator frj = null;
+ LogicalOperator skj = null;
}
{
(gi = GroupItem(lp) { gis.add(gi); }
("," gi = GroupItem(lp) { gis.add(gi); })+
// The addition of using replicated to indicate FRJoin
- [<USING> ("\"replicated\"" | "\"repl\"") { frj = parseFRJoin(gis, lp); }] )
- {log.trace("Exiting JoinClause"); return (frj==null) ? rewriteJoin(gis, lp) : frj;}
+ ([<USING> ("\"replicated\"" { frj = parseFRJoin(gis, lp); } | "\"repl\"" { frj=parseFRJoin(gis,lp);}
+ |"\"skewed\"" { skj = parseJoin(gis, lp, LOJoin.JOINTYPE.SKEWED); })] ))
+
+ {log.trace("Exiting JoinClause");
+ if (frj!=null) {
+ return frj;
+ }
+ else if (skj!=null) {
+ return skj;
+ }
+ else {
+ return rewriteJoin(gis,lp);
+ }}
}
Modified: hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/validators/TypeCheckingVisitor.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/validators/TypeCheckingVisitor.java?rev=799141&r1=799140&r2=799141&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/validators/TypeCheckingVisitor.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/validators/TypeCheckingVisitor.java Thu Jul 30 00:30:25 2009
@@ -2245,6 +2245,131 @@
}
}
+ /**
+ * LOJoin visitor
+ */
+ protected void visit(LOJoin frj) throws VisitorException {
+ try {
+ frj.regenerateSchema();
+ } catch (FrontendException fe) {
+ int errCode = 1060;
+ String msg = "Cannot resolve Join output schema" ;
+ msgCollector.collect(msg, MessageType.Error) ;
+ throw new TypeCheckerException(msg, errCode, PigException.INPUT, fe) ;
+ }
+
+ MultiMap<LogicalOperator, LogicalPlan> joinColPlans
+ = frj.getJoinPlans() ;
+ List<LogicalOperator> inputs = frj.getInputs() ;
+
+ // Type checking internal plans.
+ for(int i=0;i < inputs.size(); i++) {
+ LogicalOperator input = inputs.get(i) ;
+ List<LogicalPlan> innerPlans
+ = new ArrayList<LogicalPlan>(joinColPlans.get(input)) ;
+
+ for(int j=0; j < innerPlans.size(); j++) {
+
+ LogicalPlan innerPlan = innerPlans.get(j) ;
+
+ // Check that the inner plan has only 1 output port
+ if (!innerPlan.isSingleLeafPlan()) {
+ int errCode = 1057;
+ String msg = "Join's inner plans can only"
+ + " have one output (leaf)" ;
+ msgCollector.collect(msg, MessageType.Error) ;
+ throw new TypeCheckerException(msg, errCode, PigException.INPUT) ;
+ }
+
+ checkInnerPlan(innerPlans.get(j)) ;
+ }
+ }
+
+ try {
+
+ if (!frj.isTupleJoinCol()) {
+ // merge all the inner plan outputs so we know what type
+ // our group column should be
+
+ // TODO: Don't recompute schema here
+ //byte groupType = schema.getField(0).type ;
+ byte groupType = frj.getAtomicJoinColType() ;
+
+ // go through all inputs again to add cast if necessary
+ for(int i=0;i < inputs.size(); i++) {
+ LogicalOperator input = inputs.get(i) ;
+ List<LogicalPlan> innerPlans
+ = new ArrayList<LogicalPlan>(joinColPlans.get(input)) ;
+ // Checking innerPlan size already done above
+ byte innerType = innerPlans.get(0).getSingleLeafPlanOutputType() ;
+ if (innerType != groupType) {
+ insertAtomicCastForJoinInnerPlan(innerPlans.get(0),
+ frj,
+ groupType) ;
+ }
+ }
+ }
+ else {
+
+ // TODO: Don't recompute schema here
+ //Schema groupBySchema = schema.getField(0).schema ;
+ Schema groupBySchema = frj.getTupleJoinSchema() ;
+
+ // go through all inputs again to add cast if necessary
+ for(int i=0;i < inputs.size(); i++) {
+ LogicalOperator input = inputs.get(i) ;
+ List<LogicalPlan> innerPlans
+ = new ArrayList<LogicalPlan>(joinColPlans.get(input)) ;
+ for(int j=0;j < innerPlans.size(); j++) {
+ LogicalPlan innerPlan = innerPlans.get(j) ;
+ byte innerType = innerPlan.getSingleLeafPlanOutputType() ;
+ byte expectedType = DataType.BYTEARRAY ;
+
+ if (!DataType.isAtomic(innerType) && (DataType.TUPLE != innerType)) {
+ int errCode = 1057;
+ String msg = "Join's inner plans can only"
+ + "have one output (leaf)" ;
+ msgCollector.collect(msg, MessageType.Error) ;
+ throw new TypeCheckerException(msg, errCode, PigException.INPUT) ;
+ }
+
+ try {
+ expectedType = groupBySchema.getField(j).type ;
+ }
+ catch(FrontendException fee) {
+ int errCode = 1060;
+ String msg = "Cannot resolve Join output schema" ;
+ msgCollector.collect(msg, MessageType.Error) ;
+ throw new TypeCheckerException(msg, errCode, PigException.INPUT, fee) ;
+ }
+
+ if (innerType != expectedType) {
+ insertAtomicCastForJoinInnerPlan(innerPlan,
+ frj,
+ expectedType) ;
+ }
+ }
+ }
+ }
+ }
+ catch (FrontendException fe) {
+ int errCode = 1060;
+ String msg = "Cannot resolve Join output schema" ;
+ msgCollector.collect(msg, MessageType.Error) ;
+ throw new TypeCheckerException(msg, errCode, PigException.INPUT, fe) ;
+ }
+
+ try {
+ Schema outputSchema = frj.regenerateSchema() ;
+ }
+ catch (FrontendException fe) {
+ int errCode = 1060;
+ String msg = "Cannot resolve Join output schema" ;
+ msgCollector.collect(msg, MessageType.Error) ;
+ throw new TypeCheckerException(msg, errCode, PigException.INPUT, fe) ;
+ }
+ }
+
/**
* COGroup
* All group by cols from all inputs have to be of the
@@ -2407,6 +2532,36 @@
this.visit(cast);
}
+ private void insertAtomicCastForJoinInnerPlan(LogicalPlan innerPlan,
+ LOJoin frj, byte toType) throws VisitorException {
+ if (!DataType.isUsableType(toType)) {
+ int errCode = 1051;
+ String msg = "Cannot cast to "
+ + DataType.findTypeName(toType);
+ throw new TypeCheckerException(msg, errCode, PigException.INPUT);
+ }
+
+ List<LogicalOperator> leaves = innerPlan.getLeaves();
+ if (leaves.size() > 1) {
+ int errCode = 2060;
+ String msg = "Expected one leaf. Found " + leaves.size() + " leaves.";
+ throw new TypeCheckerException(msg, errCode, PigException.BUG);
+ }
+ ExpressionOperator currentOutput = (ExpressionOperator) leaves.get(0);
+ collectCastWarning(frj, currentOutput.getType(), toType);
+ OperatorKey newKey = genNewOperatorKey(currentOutput);
+ LOCast cast = new LOCast(innerPlan, newKey, toType);
+ innerPlan.add(cast);
+ try {
+ innerPlan.connect(currentOutput, cast);
+ } catch (PlanException pe) {
+ int errCode = 2059;
+ String msg = "Problem with inserting cast operator for fragment replicate join in plan.";
+ throw new TypeCheckerException(msg, errCode, PigException.BUG, pe);
+ }
+ this.visit(cast);
+ }
+
// This helps insert casting to atomic types in COGroup's inner plans
// as a new leave of the plan
private void insertAtomicCastForCOGroupInnerPlan(LogicalPlan innerPlan,
Added: hadoop/pig/trunk/test/org/apache/pig/test/TestSkewedJoin.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/test/org/apache/pig/test/TestSkewedJoin.java?rev=799141&view=auto
==============================================================================
--- hadoop/pig/trunk/test/org/apache/pig/test/TestSkewedJoin.java (added)
+++ hadoop/pig/trunk/test/org/apache/pig/test/TestSkewedJoin.java Thu Jul 30 00:30:25 2009
@@ -0,0 +1,180 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.test;
+
+
+import java.io.*;
+import java.util.Iterator;
+
+import junit.framework.Assert;
+import junit.framework.TestCase;
+
+import org.apache.pig.EvalFunc;
+import org.apache.pig.ExecType;
+import org.apache.pig.FuncSpec;
+import org.apache.pig.PigServer;
+import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.data.BagFactory;
+import org.apache.pig.data.DataBag;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.test.utils.TestHelper;
+import org.junit.After;
+import org.junit.Before;
+
+public class TestSkewedJoin extends TestCase{
+ private static final String INPUT_FILE1 = "SkewedJoinInput1.txt";
+ private static final String INPUT_FILE2 = "SkewedJoinInput2.txt";
+ private static final String INPUT_FILE3 = "SkewedJoinInput3.txt";
+
+ private PigServer pigServer;
+ private MiniCluster cluster = MiniCluster.buildCluster();
+
+ public TestSkewedJoin() throws ExecException, IOException{
+ pigServer = new PigServer(ExecType.MAPREDUCE, cluster.getProperties());
+ // pigServer = new PigServer(ExecType.LOCAL);
+ pigServer.getPigContext().getProperties().setProperty("pig.skewedjoin.reduce.maxtuple", "5");
+ pigServer.getPigContext().getProperties().setProperty("pig.skewedjoin.reduce.memusage", "0.1");
+ }
+
+ @Before
+ public void setUp() throws Exception {
+ createFiles();
+ }
+
+ private void createFiles() throws IOException {
+ PrintWriter w = new PrintWriter(new FileWriter(INPUT_FILE1));
+
+ int k = 0;
+ for(int j=0; j<12; j++) {
+ w.println("100\tapple1\taaa" + k);
+ k++;
+ w.println("200\torange1\tbbb" + k);
+ k++;
+ w.println("300\tstrawberry\tccc" + k);
+ k++;
+ }
+
+ w.close();
+
+ PrintWriter w2 = new PrintWriter(new FileWriter(INPUT_FILE2));
+ w2.println("100\tapple1");
+ w2.println("100\tapple2");
+ w2.println("100\tapple2");
+ w2.println("200\torange1");
+ w2.println("200\torange2");
+ w2.println("300\tstrawberry");
+ w2.println("400\tpear");
+
+ w2.close();
+
+ PrintWriter w3 = new PrintWriter(new FileWriter(INPUT_FILE3));
+ w3.println("100\tapple1");
+ w3.println("100\tapple2");
+ w3.println("200\torange1");
+ w3.println("200\torange2");
+ w3.println("300\tstrawberry");
+ w3.println("300\tstrawberry2");
+ w3.println("400\tpear");
+
+ w3.close();
+
+ Util.copyFromLocalToCluster(cluster, INPUT_FILE1, INPUT_FILE1);
+ Util.copyFromLocalToCluster(cluster, INPUT_FILE2, INPUT_FILE2);
+ Util.copyFromLocalToCluster(cluster, INPUT_FILE3, INPUT_FILE3);
+ }
+
+ @After
+ public void tearDown() throws Exception {
+ new File(INPUT_FILE1).delete();
+ new File(INPUT_FILE2).delete();
+ new File(INPUT_FILE3).delete();
+
+ Util.deleteFile(cluster, INPUT_FILE1);
+ Util.deleteFile(cluster, INPUT_FILE2);
+ Util.deleteFile(cluster, INPUT_FILE3);
+ }
+
+
+ public void testSkewedJoinWithGroup() throws IOException{
+ pigServer.registerQuery("A = LOAD '" + INPUT_FILE1 + "' as (id, name, n);");
+ pigServer.registerQuery("B = LOAD '" + INPUT_FILE2 + "' as (id, name);");
+ pigServer.registerQuery("C = GROUP A by id;");
+ pigServer.registerQuery("D = GROUP B by id;");
+
+ DataBag dbfrj = BagFactory.getInstance().newDefaultBag(), dbshj = BagFactory.getInstance().newDefaultBag();
+ {
+ pigServer.registerQuery("E = join C by group, D by group using \"skewed\" parallel 5;");
+ Iterator<Tuple> iter = pigServer.openIterator("E");
+
+ while(iter.hasNext()) {
+ dbfrj.add(iter.next());
+ }
+ }
+ {
+ pigServer.registerQuery("E = join C by group, D by group;");
+ Iterator<Tuple> iter = pigServer.openIterator("E");
+
+ while(iter.hasNext()) {
+ dbshj.add(iter.next());
+ }
+ }
+ Assert.assertTrue(dbfrj.size()>0 && dbshj.size()>0);
+ Assert.assertEquals(true, TestHelper.compareBags(dbfrj, dbshj));
+ }
+
+ public void testSkewedJoinReducers() throws IOException{
+ pigServer.registerQuery("A = LOAD '" + INPUT_FILE1 + "' as (id, name, n);");
+ pigServer.registerQuery("B = LOAD '" + INPUT_FILE2 + "' as (id, name);");
+ try {
+ DataBag dbfrj = BagFactory.getInstance().newDefaultBag();
+ {
+ pigServer.registerQuery("C = join A by id, B by id using \"skewed\" parallel 1;");
+ Iterator<Tuple> iter = pigServer.openIterator("C");
+
+ while(iter.hasNext()) {
+ dbfrj.add(iter.next());
+ }
+ }
+ }catch(Exception e) {
+ return;
+ }
+
+ fail("Should throw exception, not enough reducers");
+ }
+
+ public void testSkewedJoin3Way() throws IOException{
+ pigServer.registerQuery("A = LOAD '" + INPUT_FILE1 + "' as (id, name, n);");
+ pigServer.registerQuery("B = LOAD '" + INPUT_FILE2 + "' as (id, name);");
+ pigServer.registerQuery("C = LOAD '" + INPUT_FILE3 + "' as (id, name);");
+ try {
+ DataBag dbfrj = BagFactory.getInstance().newDefaultBag();
+ {
+ pigServer.registerQuery("D = join A by id, B by id, C by id using \"skewed\" parallel 5;");
+ Iterator<Tuple> iter = pigServer.openIterator("D");
+
+ while(iter.hasNext()) {
+ dbfrj.add(iter.next());
+ }
+ }
+ }catch(Exception e) {
+ return;
+ }
+
+ fail("Should throw exception, do not support 3 way join");
+ }
+}