You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by pr...@apache.org on 2009/10/30 21:23:56 UTC
svn commit: r831443 [1/4] - in /hadoop/pig/branches/load-store-redesign: ./
lib-src/bzip2/org/apache/tools/bzip2r/ src/org/apache/pig/
src/org/apache/pig/backend/hadoop/executionengine/
src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/ ...
Author: pradeepkth
Date: Fri Oct 30 20:23:54 2009
New Revision: 831443
URL: http://svn.apache.org/viewvc?rev=831443&view=rev
Log:
svn merge -r829891:831153 http://svn.apache.org/repos/asf/hadoop/pig/trunk
Added:
hadoop/pig/branches/load-store-redesign/src/org/apache/pig/CommittableStoreFunc.java
hadoop/pig/branches/load-store-redesign/src/org/apache/pig/IndexableLoadFunc.java
hadoop/pig/branches/load-store-redesign/src/org/apache/pig/SortColInfo.java
hadoop/pig/branches/load-store-redesign/src/org/apache/pig/SortInfo.java
hadoop/pig/branches/load-store-redesign/src/org/apache/pig/data/InternalDistinctBag.java
hadoop/pig/branches/load-store-redesign/src/org/apache/pig/data/InternalSortedBag.java
hadoop/pig/branches/load-store-redesign/src/org/apache/pig/impl/builtin/DefaultIndexableLoader.java
hadoop/pig/branches/load-store-redesign/src/org/apache/pig/impl/util/Utils.java
Modified:
hadoop/pig/branches/load-store-redesign/CHANGES.txt
hadoop/pig/branches/load-store-redesign/lib-src/bzip2/org/apache/tools/bzip2r/CBZip2OutputStream.java
hadoop/pig/branches/load-store-redesign/src/org/apache/pig/LoadMetadata.java
hadoop/pig/branches/load-store-redesign/src/org/apache/pig/Main.java
hadoop/pig/branches/load-store-redesign/src/org/apache/pig/PigServer.java
hadoop/pig/branches/load-store-redesign/src/org/apache/pig/StandAloneParser.java
hadoop/pig/branches/load-store-redesign/src/org/apache/pig/StoreConfig.java
hadoop/pig/branches/load-store-redesign/src/org/apache/pig/StoreFunc.java
hadoop/pig/branches/load-store-redesign/src/org/apache/pig/backend/hadoop/executionengine/HExecutionEngine.java
hadoop/pig/branches/load-store-redesign/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/CombinerOptimizer.java
hadoop/pig/branches/load-store-redesign/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java
hadoop/pig/branches/load-store-redesign/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/Launcher.java
hadoop/pig/branches/load-store-redesign/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java
hadoop/pig/branches/load-store-redesign/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReducePOStoreImpl.java
hadoop/pig/branches/load-store-redesign/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigMapReduce.java
hadoop/pig/branches/load-store-redesign/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigOutputCommitter.java
hadoop/pig/branches/load-store-redesign/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigOutputFormat.java
hadoop/pig/branches/load-store-redesign/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/plans/DotMRPrinter.java
hadoop/pig/branches/load-store-redesign/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/LogToPhyTranslationVisitor.java
hadoop/pig/branches/load-store-redesign/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/PhysicalOperator.java
hadoop/pig/branches/load-store-redesign/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/plans/PhysicalPlan.java
hadoop/pig/branches/load-store-redesign/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POCombinerPackage.java
hadoop/pig/branches/load-store-redesign/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/PODistinct.java
hadoop/pig/branches/load-store-redesign/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POFRJoin.java
hadoop/pig/branches/load-store-redesign/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POMergeJoin.java
hadoop/pig/branches/load-store-redesign/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POOptimizedForEach.java
hadoop/pig/branches/load-store-redesign/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POSort.java
hadoop/pig/branches/load-store-redesign/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POStore.java
hadoop/pig/branches/load-store-redesign/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POStoreImpl.java
hadoop/pig/branches/load-store-redesign/src/org/apache/pig/backend/local/executionengine/LocalPOStoreImpl.java
hadoop/pig/branches/load-store-redesign/src/org/apache/pig/builtin/Distinct.java
hadoop/pig/branches/load-store-redesign/src/org/apache/pig/builtin/IntSum.java
hadoop/pig/branches/load-store-redesign/src/org/apache/pig/builtin/MAX.java
hadoop/pig/branches/load-store-redesign/src/org/apache/pig/builtin/MIN.java
hadoop/pig/branches/load-store-redesign/src/org/apache/pig/builtin/PigStorage.java
hadoop/pig/branches/load-store-redesign/src/org/apache/pig/builtin/SUM.java
hadoop/pig/branches/load-store-redesign/src/org/apache/pig/data/DataType.java
hadoop/pig/branches/load-store-redesign/src/org/apache/pig/data/DefaultAbstractBag.java
hadoop/pig/branches/load-store-redesign/src/org/apache/pig/data/DefaultTuple.java
hadoop/pig/branches/load-store-redesign/src/org/apache/pig/data/InternalCachedBag.java
hadoop/pig/branches/load-store-redesign/src/org/apache/pig/data/NonSpillableDataBag.java
hadoop/pig/branches/load-store-redesign/src/org/apache/pig/impl/io/BufferedPositionedInputStream.java
hadoop/pig/branches/load-store-redesign/src/org/apache/pig/impl/logicalLayer/LOCross.java
hadoop/pig/branches/load-store-redesign/src/org/apache/pig/impl/logicalLayer/LOForEach.java
hadoop/pig/branches/load-store-redesign/src/org/apache/pig/impl/logicalLayer/LOProject.java
hadoop/pig/branches/load-store-redesign/src/org/apache/pig/impl/logicalLayer/LOSort.java
hadoop/pig/branches/load-store-redesign/src/org/apache/pig/impl/logicalLayer/optimizer/OpLimitOptimizer.java
hadoop/pig/branches/load-store-redesign/src/org/apache/pig/impl/logicalLayer/validators/TypeCheckingVisitor.java
hadoop/pig/branches/load-store-redesign/src/org/apache/pig/impl/plan/CompilationMessageCollector.java
hadoop/pig/branches/load-store-redesign/src/org/apache/pig/impl/plan/PlanPrinter.java
hadoop/pig/branches/load-store-redesign/src/org/apache/pig/impl/plan/optimizer/RulePlanPrinter.java
hadoop/pig/branches/load-store-redesign/src/org/apache/pig/impl/util/JarManager.java
hadoop/pig/branches/load-store-redesign/src/org/apache/pig/tools/parameters/PreprocessorContext.java
hadoop/pig/branches/load-store-redesign/src/org/apache/pig/tools/streams/StreamGenerator.java
hadoop/pig/branches/load-store-redesign/test/findbugsExcludeFile.xml
hadoop/pig/branches/load-store-redesign/test/org/apache/pig/test/TestCombiner.java
hadoop/pig/branches/load-store-redesign/test/org/apache/pig/test/TestDataBag.java
hadoop/pig/branches/load-store-redesign/test/org/apache/pig/test/TestLogToPhyCompiler.java
hadoop/pig/branches/load-store-redesign/test/org/apache/pig/test/TestMergeJoin.java
Modified: hadoop/pig/branches/load-store-redesign/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/CHANGES.txt?rev=831443&r1=831442&r2=831443&view=diff
==============================================================================
--- hadoop/pig/branches/load-store-redesign/CHANGES.txt (original)
+++ hadoop/pig/branches/load-store-redesign/CHANGES.txt Fri Oct 30 20:23:54 2009
@@ -26,6 +26,20 @@
IMPROVEMENTS
+PIG-953: Enable merge join in pig to work with loaders and store functions
+which can internally index sorted data (pradeepkth)
+
+PIG-1055: FINDBUGS: remaining "Dodgy Warnings" (olgan)
+
+PIG-1052: FINDBUGS: remaining performance warningse(olgan)
+
+PIG-1037: Converted sorted and distinct bags to use the new active spilling
+ paradigm (yinghe via gates)
+
+PIG-1051: FINDBUGS: NP_NULL_ON_SOME_PATH_FROM_RETURN_VALUE (olgan)
+
+PIG-1050: FINDBUGS: DLS_DEAD_LOCAL_STORE: Dead store to local variable (olgan)
+
PIG-1045: Integration with Hadoop 20 New API (rding via pradeepkth)
PIG-1043: FINDBUGS: SIC_INNER_SHOULD_BE_STATIC: Should be a static inner class
@@ -91,6 +105,9 @@
BUG FIXES
+PIG-746: Works in --exectype local, fails on grid - ERROR 2113: SingleTupleBag
+should never be serialized (rding via pradeepkth)
+
PIG-1027: Number of bytes written are always zero in local mode (zjffdu via gates)
PIG-976: Multi-query optimization throws ClassCastException (rding via
@@ -130,6 +147,8 @@
PIG-927: null should be handled consistently in Join (daijy)
+PIG-790: Error message should indicate in which line number in the Pig script the error occured (debugging BinCond) (daijy)
+
Release 0.5.0 - Unreleased
INCOMPATIBLE CHANGES
Modified: hadoop/pig/branches/load-store-redesign/lib-src/bzip2/org/apache/tools/bzip2r/CBZip2OutputStream.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/lib-src/bzip2/org/apache/tools/bzip2r/CBZip2OutputStream.java?rev=831443&r1=831442&r2=831443&view=diff
==============================================================================
--- hadoop/pig/branches/load-store-redesign/lib-src/bzip2/org/apache/tools/bzip2r/CBZip2OutputStream.java (original)
+++ hadoop/pig/branches/load-store-redesign/lib-src/bzip2/org/apache/tools/bzip2r/CBZip2OutputStream.java Fri Oct 30 20:23:54 2009
@@ -1050,9 +1050,7 @@
b = t;
}
if (b > c) {
- t = b;
b = c;
- c = t;
}
if (a > b) {
b = a;
Added: hadoop/pig/branches/load-store-redesign/src/org/apache/pig/CommittableStoreFunc.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/src/org/apache/pig/CommittableStoreFunc.java?rev=831443&view=auto
==============================================================================
--- hadoop/pig/branches/load-store-redesign/src/org/apache/pig/CommittableStoreFunc.java (added)
+++ hadoop/pig/branches/load-store-redesign/src/org/apache/pig/CommittableStoreFunc.java Fri Oct 30 20:23:54 2009
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.pig;
+
+import java.io.IOException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.pig.backend.hadoop.executionengine.util.MapRedUtil;
+
+/**
+ * A storefunc which has an extra commit() method which is called
+ * when all mappers (when the storefunc is part of map) or reducers (when the
+ * storefunc is part of reduce) are finished. Currently this will allow storefuncs
+ * to do any cleanup/finalizing activities knowing that all the maps/reducers
+ * have finished - one such use case is for zebra storage to build an index
+ * for sorted files once all writes are done.
+ */
+public interface CommittableStoreFunc extends StoreFunc {
+ /**
+ * This method is called when all mappers (when the storefunc is part of
+ * map) or reducers (when the storefunc is part of reduce) are finished.
+ * This allows the storeFunc to do any global commit actions - only called
+ * when all mappers/reducers successfully complete.
+ *
+ * If the StoreFunc needs to get hold of StoreConfig object for the store
+ * it can call {@link MapRedUtil#getStoreConfig(Configuration)} where
+ * conf is the Configuration object passed in the commit() call.
+ *
+ * @param conf Configuration object for the job
+ */
+ public void commit(Configuration conf) throws IOException;
+}
Added: hadoop/pig/branches/load-store-redesign/src/org/apache/pig/IndexableLoadFunc.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/src/org/apache/pig/IndexableLoadFunc.java?rev=831443&view=auto
==============================================================================
--- hadoop/pig/branches/load-store-redesign/src/org/apache/pig/IndexableLoadFunc.java (added)
+++ hadoop/pig/branches/load-store-redesign/src/org/apache/pig/IndexableLoadFunc.java Fri Oct 30 20:23:54 2009
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig;
+
+import java.io.IOException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.pig.data.Tuple;
+
+/**
+ * This interface is intended for use by LoadFunc implementations
+ * which have an internal index for sorted data and can use the index
+ * to support merge join in pig. Interaction with the index
+ * is abstracted away by the methods in this interface which the pig
+ * runtime will call in a particular sequence to get the records it
+ * needs to perform the merge based join.
+ *
+ * The sequence of calls made from the pig runtime are:
+ *
+ * {@link IndexableLoadFunc#initialize(Configuration)}
+ * IndexableLoadFunc.bindTo(filename, bufferedPositionedInputStream, 0, LONG.MAX_VALUE);
+ * (the bufferedPositionedInputStream is a decorator around the underlying
+ * DFS input stream)
+ * IndexableLoadFunc.seekNear(keys);
+ * A series of IndexableLoadFunc.getNext(); calls to perform the join
+ * IndexableLoadFunc.close();
+ *
+ */
+public interface IndexableLoadFunc extends LoadFunc {
+
+ /**
+ * This method is called by pig run time to allow the
+ * IndexableLoadFunc to perform any initialization actions
+ * @param conf The job configuration object
+ */
+ public void initialize(Configuration conf) throws IOException;
+
+ /**
+ * This method is called by the pig runtime to indicate
+ * to the LoadFunc to position its underlying input stream
+ * near the keys supplied as the argument. Specifically:
+ * 1) if the keys are present in the input stream, the loadfunc
+ * implementation should position its read position to
+ * a record where the key(s) is/are the biggest key(s) less than
+ * the key(s) supplied in the argument OR to the record with the
+ * first occurrence of the keys(s) supplied.
+ * 2) if the key(s) are absent in the input stream, the implementation
+ * should position its read position to a record where the key(s)
+ * is/are the biggest key(s) less than the key(s) supplied OR to the
+ * first record where the key(s) is/are the smallest key(s) greater
+ * than the keys(s) supplied.
+ * The description above holds for descending order data in
+ * a similar manner with "biggest" and "less than" replaced with
+ * "smallest" and "greater than" and vice versa.
+ *
+ * @param keys Tuple with join keys (which are a prefix of the sort
+ * keys of the input data). For example if the data is sorted on
+ * columns in position 2,4,5 any of the following Tuples are
+ * valid as an argument value:
+ * (fieldAt(2))
+ * (fieldAt(2), fieldAt(4))
+ * (fieldAt(2), fieldAt(4), fieldAt(5))
+ *
+ * The following are some invalid cases:
+ * (fieldAt(4))
+ * (fieldAt(2), fieldAt(5))
+ * (fieldAt(4), fieldAt(5))
+ *
+ * @throws IOException When the loadFunc is unable to position
+ * to the required point in its input stream
+ */
+ public void seekNear(Tuple keys) throws IOException;
+
+
+ /**
+ * A method called by the pig runtime to give an opportunity
+ * for implementations to perform cleanup actions like closing
+ * the underlying input stream. This is necessary since while
+ * performing a join the pig run time may determine than no further
+ * join is possible with remaining records and may indicate to the
+ * IndexableLoader to cleanup by calling this method.
+ *
+ * @throws IOException if the loadfunc is unable to perform
+ * its close actions.
+ */
+ public void close() throws IOException;
+}
Modified: hadoop/pig/branches/load-store-redesign/src/org/apache/pig/LoadMetadata.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/src/org/apache/pig/LoadMetadata.java?rev=831443&r1=831442&r2=831443&view=diff
==============================================================================
--- hadoop/pig/branches/load-store-redesign/src/org/apache/pig/LoadMetadata.java (original)
+++ hadoop/pig/branches/load-store-redesign/src/org/apache/pig/LoadMetadata.java Fri Oct 30 20:23:54 2009
@@ -34,7 +34,7 @@
* not possible to return a schema that represents all returned data,
* then null should be returned.
* This method will be called after a
- * {@link LoadFunc#setLocation(String, org.apache.hadoop.conf.Configuration)}
+ * {@link LoadFunc#setLocation(String, org.apache.hadoop.mapreduce.Job)}
* call is made on the Loader implementing {@link LoadFunc} and {@link LoadMetadata}
*/
ResourceSchema getSchema();
Modified: hadoop/pig/branches/load-store-redesign/src/org/apache/pig/Main.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/src/org/apache/pig/Main.java?rev=831443&r1=831442&r2=831443&view=diff
==============================================================================
--- hadoop/pig/branches/load-store-redesign/src/org/apache/pig/Main.java (original)
+++ hadoop/pig/branches/load-store-redesign/src/org/apache/pig/Main.java Fri Oct 30 20:23:54 2009
@@ -532,7 +532,6 @@
private static String getVersionString() {
String findContainingJar = JarManager.findContainingJar(Main.class);
try {
- StringBuffer buffer = new StringBuffer();
JarFile jar = new JarFile(findContainingJar);
final Manifest manifest = jar.getManifest();
final Map <String,Attributes> attrs = manifest.getEntries();
Modified: hadoop/pig/branches/load-store-redesign/src/org/apache/pig/PigServer.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/src/org/apache/pig/PigServer.java?rev=831443&r1=831442&r2=831443&view=diff
==============================================================================
--- hadoop/pig/branches/load-store-redesign/src/org/apache/pig/PigServer.java (original)
+++ hadoop/pig/branches/load-store-redesign/src/org/apache/pig/PigServer.java Fri Oct 30 20:23:54 2009
@@ -320,14 +320,17 @@
}
if (urls.hasMoreElements()) {
- String logMessage = "Found multiple resources that match "
- + jarName + ": " + resourceLocation;
+ StringBuffer sb = new StringBuffer("Found multiple resources that match ");
+ sb.append(jarName);
+ sb.append(": ");
+ sb.append(resourceLocation);
while (urls.hasMoreElements()) {
- logMessage += (logMessage + urls.nextElement() + "; ");
+ sb.append(urls.nextElement());
+ sb.append("; ");
}
- log.debug(logMessage);
+ log.debug(sb.toString());
}
return resourceLocation;
Added: hadoop/pig/branches/load-store-redesign/src/org/apache/pig/SortColInfo.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/src/org/apache/pig/SortColInfo.java?rev=831443&view=auto
==============================================================================
--- hadoop/pig/branches/load-store-redesign/src/org/apache/pig/SortColInfo.java (added)
+++ hadoop/pig/branches/load-store-redesign/src/org/apache/pig/SortColInfo.java Fri Oct 30 20:23:54 2009
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig;
+
+import java.io.Serializable;
+
+import org.apache.pig.impl.util.Utils;
+
+/**
+ * A class representing information about a sort column to pass
+ * in {@link SortInfo} to storage functions in {@link StoreConfig}
+ */
+public class SortColInfo implements Serializable {
+
+ /**
+ *
+ */
+ private static final long serialVersionUID = 1L;
+
+ // name of sort column
+ private String colName;
+
+ // index position (0 based) of sort column
+ private int colIndex;
+
+ public enum Order { ASCENDING, DESCENDING }
+
+ private Order sortOrder;
+
+
+ /**
+ * @param colName sort column name
+ * @param colIndex index position (0 based) of sort column
+ * @param orderingType whether the column is sorted ascending or descending
+ */
+ public SortColInfo(String colName, int colIndex, Order orderingType) {
+ this.colName = colName;
+ this.colIndex = colIndex;
+ this.sortOrder = orderingType;
+ }
+
+ /**
+ * @return the sort column name - could be null or empty string if
+ * column name could not be determined either because of the absence of
+ * a schema or because the schema had the column name as null or empty
+ * string - caller should check for these conditions.
+ */
+ public String getColName() {
+ return colName;
+ }
+
+ /**
+ * @return index position (0 based) of sort column
+ */
+ public int getColIndex() {
+ return colIndex;
+ }
+
+ /**
+ * @return whether the column is sorted ascending or descending
+ */
+ public Order getSortOrder() {
+ return sortOrder;
+ }
+
+ /* (non-Javadoc)
+ * @see java.lang.Object#hashCode()
+ */
+ @Override
+ public int hashCode() {
+ final int prime = 31;
+ int result = 1;
+ result = prime * result + ((colName == null) ? 0 : colName.hashCode());
+ result = prime * result + colIndex;
+ result = prime * result + ((sortOrder == Order.ASCENDING) ? 1 : 2);
+ return result;
+ }
+
+ /* (non-Javadoc)
+ * @see java.lang.Object#equals(java.lang.Object)
+ */
+ @Override
+ public boolean equals(Object obj) {
+ if(!Utils.checkNullAndClass(this, obj)) {
+ return false;
+ }
+ SortColInfo other = (SortColInfo)obj;
+ return Utils.checkNullEquals(this.colName, other.colName, true) &&
+ this.colIndex == other.colIndex &&
+ this.sortOrder == other.sortOrder;
+ }
+
+ /* (non-Javadoc)
+ * @see java.lang.Object#toString()
+ */
+ @Override
+ public String toString() {
+ return "colname:" + colName +",colindex:" + colIndex + ",orderingType:"
+ + (sortOrder == Order.ASCENDING ? "ascending" : "descending");
+ }
+
+
+
+
+}
Added: hadoop/pig/branches/load-store-redesign/src/org/apache/pig/SortInfo.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/src/org/apache/pig/SortInfo.java?rev=831443&view=auto
==============================================================================
--- hadoop/pig/branches/load-store-redesign/src/org/apache/pig/SortInfo.java (added)
+++ hadoop/pig/branches/load-store-redesign/src/org/apache/pig/SortInfo.java Fri Oct 30 20:23:54 2009
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.pig;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.List;
+import org.apache.pig.impl.util.Utils;
+
+/**
+ * Class to communicate sort column information based on
+ * order by statment's sort columns and schema
+ */
+public class SortInfo implements Serializable {
+
+ /**
+ *
+ */
+ private static final long serialVersionUID = 1L;
+
+ boolean isGloballySorted = true; // in pig this is the default
+
+ List<SortColInfo> sortColInfoList;
+
+ /**
+ * @param sortColInfoList
+ */
+ public SortInfo(List<SortColInfo> sortColInfoList){
+ this.sortColInfoList = sortColInfoList;
+ }
+
+ /**
+ * @return the sortColInfoList
+ */
+ public List<SortColInfo> getSortColInfoList() {
+ return new ArrayList<SortColInfo>(sortColInfoList);
+ }
+
+ /* (non-Javadoc)
+ * @see java.lang.Object#hashCode()
+ */
+ @Override
+ public int hashCode() {
+ final int prime = 31;
+ int result = 1;
+ result = prime * result + ((sortColInfoList == null) ? 0 :
+ sortColInfoList.hashCode());
+ result = prime * result + (isGloballySorted ? 1: 0);
+ return result;
+ }
+
+ /**
+ * @return the isGloballySorted
+ */
+ public boolean isGloballySorted() {
+ return isGloballySorted;
+ }
+
+
+ /* (non-Javadoc)
+ * @see java.lang.Object#equals(java.lang.Object)
+ */
+ @Override
+ public boolean equals(Object obj) {
+ if(!Utils.checkNullAndClass(this, obj)) {
+ return false;
+ }
+ SortInfo other = (SortInfo)obj;
+ return (
+ isGloballySorted == other.isGloballySorted &&
+ Utils.checkNullEquals(sortColInfoList, other.sortColInfoList, true));
+ }
+
+ /* (non-Javadoc)
+ * @see java.lang.Object#toString()
+ */
+ @Override
+ public String toString() {
+ return "GlobalSort:" + isGloballySorted +", sort column info list:" + sortColInfoList;
+ }
+
+
+
+}
Modified: hadoop/pig/branches/load-store-redesign/src/org/apache/pig/StandAloneParser.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/src/org/apache/pig/StandAloneParser.java?rev=831443&r1=831442&r2=831443&view=diff
==============================================================================
--- hadoop/pig/branches/load-store-redesign/src/org/apache/pig/StandAloneParser.java (original)
+++ hadoop/pig/branches/load-store-redesign/src/org/apache/pig/StandAloneParser.java Fri Oct 30 20:23:54 2009
@@ -54,9 +54,11 @@
return;
}
- if (line.toLowerCase().equals("quit")) break;
- if (line.toLowerCase().startsWith("#")) continue;
- else tryParse(line);
+ if (line != null){
+ if (line.toLowerCase().equals("quit")) break;
+ if (line.toLowerCase().startsWith("#")) continue;
+ else tryParse(line);
+ }
}
@@ -70,10 +72,10 @@
pig.registerQuery(query);
System.out.print("Current aliases: ");
Map<String, LogicalPlan> aliasPlan = pig.getAliases();
- for (Iterator<String> it = aliasPlan.keySet().iterator(); it.hasNext(); ) {
- String alias = it.next();
- LogicalPlan lp = aliasPlan.get(alias);
- System.out.print(alias + "->" + lp.getLeaves().get(0).getSchema());
+ for (Iterator<Map.Entry<String,LogicalPlan>> it = aliasPlan.entrySet().iterator(); it.hasNext(); ) {
+ Map.Entry<String, LogicalPlan> e = it.next();
+ LogicalPlan lp = e.getValue();
+ System.out.print(e.getKey() + "->" + lp.getLeaves().get(0).getSchema());
if (it.hasNext()) System.out.print(", \n");
else System.out.print("\n");
}
Modified: hadoop/pig/branches/load-store-redesign/src/org/apache/pig/StoreConfig.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/src/org/apache/pig/StoreConfig.java?rev=831443&r1=831442&r2=831443&view=diff
==============================================================================
--- hadoop/pig/branches/load-store-redesign/src/org/apache/pig/StoreConfig.java (original)
+++ hadoop/pig/branches/load-store-redesign/src/org/apache/pig/StoreConfig.java Fri Oct 30 20:23:54 2009
@@ -20,15 +20,17 @@
private static final long serialVersionUID = 1L;
private String location;
private Schema schema;
+ private SortInfo sortInfo;
/**
* @param location
* @param schema
*/
- public StoreConfig(String location, Schema schema) {
+ public StoreConfig(String location, Schema schema, SortInfo sortInfo) {
this.location = location;
this.schema = schema;
+ this.sortInfo = sortInfo;
}
/**
@@ -64,4 +66,33 @@
return "{location:" + location + ", schema:" + schema + "}";
}
+ /**
+ * @param sortInfo the sortInfo to set
+ */
+ public void setSortInfo(SortInfo sortInfo) {
+ this.sortInfo = sortInfo;
+ }
+
+ /**
+ * This method returns a {@link SortInfo} object giving
+ * information on the column names in the output schema which
+ * correspond to the sort columns and which columns are
+ * ascending and those which are descending
+ * @return the sortInfo object if one could be determined else null
+ * null is returned in the following scenarios (wherein
+ * the sortInfo could not be determined):
+ * 1) the store does not follow an order by
+ * 2) There are operators other than limit between "order by"
+ * and store. If there is a limit between order by and store and
+ * if non of the above conditions are true, then sortInfo will be
+ * non-null.
+ *
+ * IMPORTANT NOTE:
+ * The caller should check if the return value is null and
+ * take appropriate action
+ */
+ public SortInfo getSortInfo() {
+ return sortInfo;
+ }
+
}
Modified: hadoop/pig/branches/load-store-redesign/src/org/apache/pig/StoreFunc.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/src/org/apache/pig/StoreFunc.java?rev=831443&r1=831442&r2=831443&view=diff
==============================================================================
--- hadoop/pig/branches/load-store-redesign/src/org/apache/pig/StoreFunc.java (original)
+++ hadoop/pig/branches/load-store-redesign/src/org/apache/pig/StoreFunc.java Fri Oct 30 20:23:54 2009
@@ -70,8 +70,8 @@
* can check that the correct partition keys are included;
* a storage function to be written directly to an OutputFormat can
* make sure the schema will translate in a well defined way.
- * @param schema to be checked/set
- * @throw IOException if this schema is not acceptable. It should include
+ * @param s to be checked/set
+ * @throws IOException if this schema is not acceptable. It should include
* a detailed error message indicating what is wrong with the schema.
*/
void setSchema(ResourceSchema s) throws IOException;
@@ -95,7 +95,7 @@
* Write a tuple the output stream to which this instance was
* previously bound.
*
- * @param f the tuple to store.
+ * @param t the tuple to store.
* @throws IOException
*/
void putNext(Tuple t) throws IOException;
Modified: hadoop/pig/branches/load-store-redesign/src/org/apache/pig/backend/hadoop/executionengine/HExecutionEngine.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/src/org/apache/pig/backend/hadoop/executionengine/HExecutionEngine.java?rev=831443&r1=831442&r2=831443&view=diff
==============================================================================
--- hadoop/pig/branches/load-store-redesign/src/org/apache/pig/backend/hadoop/executionengine/HExecutionEngine.java (original)
+++ hadoop/pig/branches/load-store-redesign/src/org/apache/pig/backend/hadoop/executionengine/HExecutionEngine.java Fri Oct 30 20:23:54 2009
@@ -353,7 +353,6 @@
return new String[] {hodHDFS, hodMapRed};
}
- try {
// first, create temp director to store the configuration
hodConfDir = createTempDir(server);
@@ -431,12 +430,6 @@
hodMapRed = mapred;
return new String[] {hdfs, mapred};
- }
- catch (Exception e) {
- int errCode = 6010;
- String msg = "Could not connect to HOD";
- throw new ExecException(msg, errCode, PigException.REMOTE_ENVIRONMENT, e);
- }
}
private synchronized void closeHod(String server){
Modified: hadoop/pig/branches/load-store-redesign/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/CombinerOptimizer.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/CombinerOptimizer.java?rev=831443&r1=831442&r2=831443&view=diff
==============================================================================
--- hadoop/pig/branches/load-store-redesign/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/CombinerOptimizer.java (original)
+++ hadoop/pig/branches/load-store-redesign/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/CombinerOptimizer.java Fri Oct 30 20:23:54 2009
@@ -406,6 +406,11 @@
// the reduce side.
if (pp.getPredecessors(proj) != null) return ExprType.NOT_ALGEBRAIC;
+ // Check if it's a projection of bag. Currently we can't use combiner
+ // for statement like c = foreach b generate group, SUM(a), a;
+ // where a is a bag.
+ if (proj.getResultType() == DataType.BAG) return ExprType.NOT_ALGEBRAIC;
+
// Check to see if this is a projection of the grouping column.
// If so, it will be a projection of col 0 and will have no
// predecessors (to avoid things like group.$0, which isn't what we
Modified: hadoop/pig/branches/load-store-redesign/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java?rev=831443&r1=831442&r2=831443&view=diff
==============================================================================
--- hadoop/pig/branches/load-store-redesign/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java (original)
+++ hadoop/pig/branches/load-store-redesign/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java Fri Oct 30 20:23:54 2009
@@ -435,7 +435,8 @@
// hadoop from the jobconf.
conf.set("pig.storeFunc", ObjectSerializer.serialize(outputFuncSpec.toString()));
conf.set(PIG_STORE_CONFIG,
- ObjectSerializer.serialize(new StoreConfig(outputPath, st.getSchema())));
+ ObjectSerializer.serialize(
+ new StoreConfig(outputPath, st.getSchema(), st.getSortInfo())));
conf.set("pig.streaming.log.dir",
new Path(outputPath, LOG_DIR).toString());
Modified: hadoop/pig/branches/load-store-redesign/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/Launcher.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/Launcher.java?rev=831443&r1=831442&r2=831443&view=diff
==============================================================================
--- hadoop/pig/branches/load-store-redesign/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/Launcher.java (original)
+++ hadoop/pig/branches/load-store-redesign/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/Launcher.java Fri Oct 30 20:23:54 2009
@@ -60,7 +60,7 @@
String newLine = "\n";
boolean pigException = false;
boolean outOfMemory = false;
- final String OOM_ERR = "OutOfMemoryError";
+ static final String OOM_ERR = "OutOfMemoryError";
protected List<FileSpec> succeededStores = null;
protected List<FileSpec> failedStores = null;
@@ -470,7 +470,11 @@
}
//could receive a number format exception here but it will be propagated up the stack
- int errCode = Integer.parseInt(code);
+ int errCode;
+ if (code != null)
+ errCode = Integer.parseInt(code);
+ else
+ errCode = 2998;
//create the exception with the message and then set the error code and error source
FuncSpec funcSpec = new FuncSpec(exceptionName, exceptionMessage);
@@ -570,12 +574,15 @@
String declaringClass = items[0];
//the last member is always the method name
String methodName = items[items.length - 1];
+ StringBuilder sb = new StringBuilder();
//concatenate the names by adding the dot (.) between the members till the penultimate member
for(int i = 1; i < items.length - 1; ++i) {
- declaringClass += ".";
- declaringClass += items[i];
+ sb.append('.');
+ sb.append(items[i]);
}
+
+ declaringClass = sb.toString();
//from the file details extract the file name and the line number
//PigMapOnly.java:65
Modified: hadoop/pig/branches/load-store-redesign/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java?rev=831443&r1=831442&r2=831443&view=diff
==============================================================================
--- hadoop/pig/branches/load-store-redesign/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java (original)
+++ hadoop/pig/branches/load-store-redesign/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java Fri Oct 30 20:23:54 2009
@@ -32,12 +32,15 @@
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.mapred.JobConf;
import org.apache.pig.FuncSpec;
+import org.apache.pig.IndexableLoadFunc;
+import org.apache.pig.LoadFunc;
import org.apache.pig.PigException;
import org.apache.pig.PigWarning;
import org.apache.pig.SamplableLoader;
import org.apache.pig.builtin.BinStorage;
import org.apache.pig.data.DataType;
import org.apache.pig.impl.PigContext;
+import org.apache.pig.impl.builtin.DefaultIndexableLoader;
import org.apache.pig.impl.builtin.FindQuantiles;
import org.apache.pig.impl.builtin.PoissonSampleLoader;
import org.apache.pig.impl.builtin.MergeJoinIndexer;
@@ -1148,92 +1151,131 @@
rightMROpr.requestedParallelism = 1; // we need exactly one reducer for indexing job.
// At this point, we must be operating on map plan of right input and it would contain nothing else other then a POLoad.
- POLoad rightLoader = (POLoad)rightMROpr.mapPlan.getRoots().get(0);
- joinOp.setRightLoaderFuncSpec(rightLoader.getLFile().getFuncSpec());
-
- // Replace POLoad with indexer.
- String[] indexerArgs = new String[3];
- indexerArgs[0] = rightLoader.getLFile().getFuncSpec().toString();
- if (! (PigContext.instantiateFuncFromSpec(indexerArgs[0]) instanceof SamplableLoader)){
- int errCode = 1104;
- String errMsg = "Right input of merge-join must implement SamplableLoader interface. The specified loader " + indexerArgs[0] + " doesn't implement it";
- throw new MRCompilerException(errMsg,errCode);
- }
- List<PhysicalPlan> rightInpPlans = joinOp.getInnerPlansOf(1);
- indexerArgs[1] = ObjectSerializer.serialize((Serializable)rightInpPlans);
- indexerArgs[2] = ObjectSerializer.serialize(rightPipelinePlan);
- FileSpec lFile = new FileSpec(rightLoader.getLFile().getFileName(),new FuncSpec(MergeJoinIndexer.class.getName(), indexerArgs));
- rightLoader.setLFile(lFile);
-
- // Loader of mro will return a tuple of form (key1, key2, ..,filename, offset)
- // Now set up a POLocalRearrange which has "all" as the key and tuple fetched
- // by loader as the "value" of POLocalRearrange
- // Sorting of index can possibly be achieved by using Hadoop sorting between map and reduce instead of Pig doing sort. If that is so,
- // it will simplify lot of the code below.
-
- PhysicalPlan lrPP = new PhysicalPlan();
- ConstantExpression ce = new ConstantExpression(new OperatorKey(scope,nig.getNextNodeId(scope)));
- ce.setValue("all");
- ce.setResultType(DataType.CHARARRAY);
- lrPP.add(ce);
-
- List<PhysicalPlan> lrInnerPlans = new ArrayList<PhysicalPlan>();
- lrInnerPlans.add(lrPP);
-
- POLocalRearrange lr = new POLocalRearrange(new OperatorKey(scope,nig.getNextNodeId(scope)));
- lr.setIndex(0);
- lr.setKeyType(DataType.CHARARRAY);
- lr.setPlans(lrInnerPlans);
- lr.setResultType(DataType.TUPLE);
- rightMROpr.mapPlan.addAsLeaf(lr);
-
- rightMROpr.setMapDone(true);
-
- // On the reduce side of this indexing job, there will be a global rearrange followed by POSort.
- // Output of POSort will be index file dumped on the DFS.
-
- // First add POPackage.
- POPackage pkg = new POPackage(new OperatorKey(scope,nig.getNextNodeId(scope)));
- pkg.setKeyType(DataType.CHARARRAY);
- pkg.setNumInps(1);
- pkg.setInner(new boolean[]{false});
- rightMROpr.reducePlan.add(pkg);
-
- // Next project tuples from the bag created by POPackage.
- POProject topPrj = new POProject(new OperatorKey(scope,nig.getNextNodeId(scope)));
- topPrj.setColumn(1);
- topPrj.setResultType(DataType.TUPLE);
- topPrj.setOverloaded(true);
- rightMROpr.reducePlan.add(topPrj);
- rightMROpr.reducePlan.connect(pkg, topPrj);
-
- // Now create and add POSort. Sort plan is project *.
- List<PhysicalPlan> sortPlans = new ArrayList<PhysicalPlan>(1);
- PhysicalPlan innerSortPlan = new PhysicalPlan();
- POProject prj = new POProject(new OperatorKey(scope,nig.getNextNodeId(scope)));
- prj.setStar(true);
- prj.setOverloaded(false);
- prj.setResultType(DataType.TUPLE);
- innerSortPlan.add(prj);
- sortPlans.add(innerSortPlan);
-
- // Currently we assume all columns are in asc order.
- // Add two because filename and offset are added by Indexer in addition to keys.
- List<Boolean> mAscCols = new ArrayList<Boolean>(rightInpPlans.size()+2);
- for(int i=0; i< rightInpPlans.size()+2; i++)
- mAscCols.add(true);
-
- POSort sortOp = new POSort(new OperatorKey(scope,nig.getNextNodeId(scope)),1, null, sortPlans, mAscCols, null);
- rightMROpr.reducePlan.add(sortOp);
- rightMROpr.reducePlan.connect(topPrj, sortOp);
-
- POStore st = getStore();
- FileSpec strFile = getTempFileSpec();
- st.setSFile(strFile);
- rightMROpr.reducePlan.addAsLeaf(st);
- rightMROpr.setReduceDone(true);
+ POLoad rightLoader = (POLoad)rightMROpr.mapPlan.getRoots().get(0);
+ LoadFunc rightLoadFunc = (LoadFunc) PigContext.instantiateFuncFromSpec(rightLoader.getLFile().getFuncSpec());
+ if(rightLoadFunc instanceof IndexableLoadFunc) {
+ joinOp.setRightLoaderFuncSpec(rightLoader.getLFile().getFuncSpec());
+ joinOp.setRightInputFileName(rightLoader.getLFile().getFileName());
+ rightMROpr = null; // we don't need the right MROper since
+ // the right loader is an IndexableLoadFunc which can handle the index
+ // itself
+ // validate that the join keys in merge join are only
+ // simple column projections or '*' and not expression - expressions
+ // cannot be handled when the index is built by the storage layer on the sorted
+ // data when the sorted data (and corresponding index) is written.
+ // So merge join will be restricted not have expressions as
+ // join keys
+ int numInputs = mPlan.getPredecessors(joinOp).size(); // should be 2
+ for(int i = 0; i < numInputs; i++) {
+ List<PhysicalPlan> keyPlans = joinOp.getInnerPlansOf(i);
+ for (PhysicalPlan keyPlan : keyPlans) {
+ for(PhysicalOperator op : keyPlan) {
+ if(!(op instanceof POProject)) {
+ int errCode = 1106;
+ String errMsg = "Merge join is possible only for simple column or '*' join keys when using " +
+ rightLoader.getLFile().getFuncSpec() + " as the loader";
+ throw new MRCompilerException(errMsg, errCode, PigException.INPUT);
+ }
+ }
+ }
+ }
+ } else {
+ // Replace POLoad with indexer.
+ String[] indexerArgs = new String[3];
+ FileSpec origRightLoaderFileSpec = rightLoader.getLFile();
+ indexerArgs[0] = origRightLoaderFileSpec.getFuncSpec().toString();
+ if (! (PigContext.instantiateFuncFromSpec(indexerArgs[0]) instanceof SamplableLoader)){
+ int errCode = 1104;
+ String errMsg = "Right input of merge-join must implement SamplableLoader interface. The specified loader " + indexerArgs[0] + " doesn't implement it";
+ throw new MRCompilerException(errMsg,errCode);
+ }
+ List<PhysicalPlan> rightInpPlans = joinOp.getInnerPlansOf(1);
+ indexerArgs[1] = ObjectSerializer.serialize((Serializable)rightInpPlans);
+ indexerArgs[2] = ObjectSerializer.serialize(rightPipelinePlan);
+ FileSpec lFile = new FileSpec(rightLoader.getLFile().getFileName(),new FuncSpec(MergeJoinIndexer.class.getName(), indexerArgs));
+ rightLoader.setLFile(lFile);
+
+ // Loader of mro will return a tuple of form (key1, key2, ..,filename, offset)
+ // Now set up a POLocalRearrange which has "all" as the key and tuple fetched
+ // by loader as the "value" of POLocalRearrange
+ // Sorting of index can possibly be achieved by using Hadoop sorting between map and reduce instead of Pig doing sort. If that is so,
+ // it will simplify lot of the code below.
+
+ PhysicalPlan lrPP = new PhysicalPlan();
+ ConstantExpression ce = new ConstantExpression(new OperatorKey(scope,nig.getNextNodeId(scope)));
+ ce.setValue("all");
+ ce.setResultType(DataType.CHARARRAY);
+ lrPP.add(ce);
+
+ List<PhysicalPlan> lrInnerPlans = new ArrayList<PhysicalPlan>();
+ lrInnerPlans.add(lrPP);
+
+ POLocalRearrange lr = new POLocalRearrange(new OperatorKey(scope,nig.getNextNodeId(scope)));
+ lr.setIndex(0);
+ lr.setKeyType(DataType.CHARARRAY);
+ lr.setPlans(lrInnerPlans);
+ lr.setResultType(DataType.TUPLE);
+ rightMROpr.mapPlan.addAsLeaf(lr);
+
+ rightMROpr.setMapDone(true);
+
+ // On the reduce side of this indexing job, there will be a global rearrange followed by POSort.
+ // Output of POSort will be index file dumped on the DFS.
+
+ // First add POPackage.
+ POPackage pkg = new POPackage(new OperatorKey(scope,nig.getNextNodeId(scope)));
+ pkg.setKeyType(DataType.CHARARRAY);
+ pkg.setNumInps(1);
+ pkg.setInner(new boolean[]{false});
+ rightMROpr.reducePlan.add(pkg);
+
+ // Next project tuples from the bag created by POPackage.
+ POProject topPrj = new POProject(new OperatorKey(scope,nig.getNextNodeId(scope)));
+ topPrj.setColumn(1);
+ topPrj.setResultType(DataType.TUPLE);
+ topPrj.setOverloaded(true);
+ rightMROpr.reducePlan.add(topPrj);
+ rightMROpr.reducePlan.connect(pkg, topPrj);
+
+ // Now create and add POSort. Sort plan is project *.
+ List<PhysicalPlan> sortPlans = new ArrayList<PhysicalPlan>(1);
+ PhysicalPlan innerSortPlan = new PhysicalPlan();
+ POProject prj = new POProject(new OperatorKey(scope,nig.getNextNodeId(scope)));
+ prj.setStar(true);
+ prj.setOverloaded(false);
+ prj.setResultType(DataType.TUPLE);
+ innerSortPlan.add(prj);
+ sortPlans.add(innerSortPlan);
+
+ // Currently we assume all columns are in asc order.
+ // Add two because filename and offset are added by Indexer in addition to keys.
+ List<Boolean> mAscCols = new ArrayList<Boolean>(rightInpPlans.size()+2);
+ for(int i=0; i< rightInpPlans.size()+2; i++)
+ mAscCols.add(true);
+
+ POSort sortOp = new POSort(new OperatorKey(scope,nig.getNextNodeId(scope)),1, null, sortPlans, mAscCols, null);
+ rightMROpr.reducePlan.add(sortOp);
+ rightMROpr.reducePlan.connect(topPrj, sortOp);
+
+ POStore st = getStore();
+ FileSpec strFile = getTempFileSpec();
+ st.setSFile(strFile);
+ rightMROpr.reducePlan.addAsLeaf(st);
+ rightMROpr.setReduceDone(true);
+
+ // set up the DefaultIndexableLoader for the join operator
+ String[] defaultIndexableLoaderArgs = new String[4];
+ defaultIndexableLoaderArgs[0] = origRightLoaderFileSpec.getFuncSpec().toString();
+ defaultIndexableLoaderArgs[1] = strFile.getFileName();
+ defaultIndexableLoaderArgs[2] = strFile.getFuncSpec().toString();
+ defaultIndexableLoaderArgs[3] = joinOp.getOperatorKey().scope;
+ joinOp.setRightLoaderFuncSpec((new FuncSpec(DefaultIndexableLoader.class.getName(), defaultIndexableLoaderArgs)));
+ joinOp.setRightInputFileName(origRightLoaderFileSpec.getFileName());
+
+ }
+
- joinOp.setIndexFile(strFile);
+// joinOp.setIndexFile(strFile);
// We are done with right side. Lets work on left now.
// Join will be materialized in leftMROper.
@@ -1254,9 +1296,10 @@
String msg = "Both map and reduce phases have been done. This is unexpected while compiling.";
throw new PlanException(msg, errCode, PigException.BUG);
}
-
- // We want to ensure indexing job runs prior to actual join job. So, connect them in order.
- MRPlan.connect(rightMROpr, curMROp);
+ if(rightMROpr != null) {
+ // We want to ensure indexing job runs prior to actual join job. So, connect them in order.
+ MRPlan.connect(rightMROpr, curMROp);
+ }
phyToMROpMap.put(joinOp, curMROp);
}
catch(PlanException e){
Modified: hadoop/pig/branches/load-store-redesign/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReducePOStoreImpl.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReducePOStoreImpl.java?rev=831443&r1=831442&r2=831443&view=diff
==============================================================================
--- hadoop/pig/branches/load-store-redesign/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReducePOStoreImpl.java (original)
+++ hadoop/pig/branches/load-store-redesign/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReducePOStoreImpl.java Fri Oct 30 20:23:54 2009
@@ -33,6 +33,7 @@
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.util.ReflectionUtils;
import org.apache.pig.ResourceSchema;
+import org.apache.pig.SortInfo;
import org.apache.pig.StoreConfig;
import org.apache.pig.StoreFunc;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStoreImpl;
@@ -73,7 +74,7 @@
@SuppressWarnings("unchecked")
@Override
- public StoreFunc createStoreFunc(FileSpec sFile, Schema schema)
+ public StoreFunc createStoreFunc(FileSpec sFile, Schema schema, SortInfo sortInfo)
throws IOException {
Configuration outputConf = context.getConfiguration();
Modified: hadoop/pig/branches/load-store-redesign/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigMapReduce.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigMapReduce.java?rev=831443&r1=831442&r2=831443&view=diff
==============================================================================
--- hadoop/pig/branches/load-store-redesign/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigMapReduce.java (original)
+++ hadoop/pig/branches/load-store-redesign/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigMapReduce.java Fri Oct 30 20:23:54 2009
@@ -299,8 +299,6 @@
rp.explain(baos);
log.debug(baos.toString());
}
- // till here
-
pigReporter = new ProgressableReporter();
if(!(rp.isEmpty())) {
roots = rp.getRoots().toArray(new PhysicalOperator[1]);
Modified: hadoop/pig/branches/load-store-redesign/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigOutputCommitter.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigOutputCommitter.java?rev=831443&r1=831442&r2=831443&view=diff
==============================================================================
--- hadoop/pig/branches/load-store-redesign/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigOutputCommitter.java (original)
+++ hadoop/pig/branches/load-store-redesign/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigOutputCommitter.java Fri Oct 30 20:23:54 2009
@@ -58,7 +58,6 @@
List<POStore> reduceStores;
/**
- * @param outputPath
* @param context
* @throws IOException
*/
@@ -110,14 +109,11 @@
*/
@Override
public void cleanupJob(JobContext context) throws IOException {
- System.err.println("XXX: IN CLEANUPJOB");
// call clean up on all map and reduce committers
for (OutputCommitter mapCommitter : mapOutputCommitters) {
- System.err.println("XXX: calling cleanup on map committers");
mapCommitter.cleanupJob(context);
}
for (OutputCommitter reduceCommitter : reduceOutputCommitters) {
- System.err.println("XXX: calling cleanup on reduce committers");
reduceCommitter.cleanupJob(context);
}
@@ -130,12 +126,10 @@
public void abortTask(TaskAttemptContext context) throws IOException {
if(context.getTaskAttemptID().isMap()) {
for (OutputCommitter mapCommitter : mapOutputCommitters) {
- System.err.println("XXX: calling abort task on map committers");
mapCommitter.abortTask(context);
}
} else {
for (OutputCommitter reduceCommitter : reduceOutputCommitters) {
- System.err.println("XXX: calling abort task on reduce committers");
reduceCommitter.abortTask(context);
}
}
@@ -148,12 +142,10 @@
public void commitTask(TaskAttemptContext context) throws IOException {
if(context.getTaskAttemptID().isMap()) {
for (OutputCommitter mapCommitter : mapOutputCommitters) {
- System.err.println("XXX: calling commitTask on map committers");
mapCommitter.commitTask(context);
}
} else {
for (OutputCommitter reduceCommitter : reduceOutputCommitters) {
- System.err.println("XXX: calling commitTask on reduce committers");
reduceCommitter.commitTask(context);
}
}
@@ -168,17 +160,13 @@
boolean needCommit = false;
if(context.getTaskAttemptID().isMap()) {
for (OutputCommitter mapCommitter : mapOutputCommitters) {
- System.err.println("XXX: calling needsTaskCommit on map committers");
needCommit = needCommit || mapCommitter.needsTaskCommit(context);
}
- System.err.println("XXX: returning " + needCommit + " for needscommmut");
return needCommit;
} else {
for (OutputCommitter reduceCommitter : reduceOutputCommitters) {
- System.err.println("XXX: calling needsTaskCommit on reduce committers");
needCommit = needCommit || reduceCommitter.needsTaskCommit(context);
}
- System.err.println("XXX: returning " + needCommit + " for needscommmut");
return needCommit;
}
}
@@ -188,14 +176,11 @@
*/
@Override
public void setupJob(JobContext context) throws IOException {
- System.err.println("XXX: in SETUPJOB");
// call set up on all map and reduce committers
for (OutputCommitter mapCommitter : mapOutputCommitters) {
- System.err.println("XXX: calling setup on map committers");
mapCommitter.setupJob(context);
}
for (OutputCommitter reduceCommitter : reduceOutputCommitters) {
- System.err.println("XXX: calling setup on reduce committers");
reduceCommitter.setupJob(context);
}
}
@@ -207,15 +192,12 @@
public void setupTask(TaskAttemptContext context) throws IOException {
if(context.getTaskAttemptID().isMap()) {
for (OutputCommitter mapCommitter : mapOutputCommitters) {
- System.err.println("XXX: calling setupTask on map committers");
mapCommitter.setupTask(context);
}
} else {
for (OutputCommitter reduceCommitter : reduceOutputCommitters) {
- System.err.println("XXX: calling setupTask on reduce committers");
reduceCommitter.setupTask(context);
}
}
}
}
-
Modified: hadoop/pig/branches/load-store-redesign/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigOutputFormat.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigOutputFormat.java?rev=831443&r1=831442&r2=831443&view=diff
==============================================================================
--- hadoop/pig/branches/load-store-redesign/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigOutputFormat.java (original)
+++ hadoop/pig/branches/load-store-redesign/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigOutputFormat.java Fri Oct 30 20:23:54 2009
@@ -31,6 +31,7 @@
import org.apache.pig.StoreFunc;
import org.apache.pig.backend.hadoop.datastorage.ConfigurationUtil;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStore;
+import org.apache.pig.backend.hadoop.executionengine.util.MapRedUtil;
import org.apache.pig.data.Tuple;
import org.apache.pig.impl.util.ObjectSerializer;
Modified: hadoop/pig/branches/load-store-redesign/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/plans/DotMRPrinter.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/plans/DotMRPrinter.java?rev=831443&r1=831442&r2=831443&view=diff
==============================================================================
--- hadoop/pig/branches/load-store-redesign/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/plans/DotMRPrinter.java (original)
+++ hadoop/pig/branches/load-store-redesign/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/plans/DotMRPrinter.java Fri Oct 30 20:23:54 2009
@@ -187,7 +187,7 @@
@Override
protected String[] getAttributes(InnerOperator op) {
String[] attributes = new String[3];
- attributes[0] = "label=\""+getName(op)+"\"";
+ attributes[0] = "label=\""+super.getName(op)+"\"";
attributes[1] = "style=\"filled\"";
attributes[2] = "fillcolor=\"white\"";
return attributes;
Modified: hadoop/pig/branches/load-store-redesign/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/LogToPhyTranslationVisitor.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/LogToPhyTranslationVisitor.java?rev=831443&r1=831442&r2=831443&view=diff
==============================================================================
--- hadoop/pig/branches/load-store-redesign/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/LogToPhyTranslationVisitor.java (original)
+++ hadoop/pig/branches/load-store-redesign/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/LogToPhyTranslationVisitor.java Fri Oct 30 20:23:54 2009
@@ -33,6 +33,7 @@
import org.apache.pig.FuncSpec;
import org.apache.pig.LoadFunc;
import org.apache.pig.PigException;
+import org.apache.pig.SortInfo;
import org.apache.pig.data.DataBag;
import org.apache.pig.data.DataType;
import org.apache.pig.data.NonSpillableDataBag;
@@ -1365,6 +1366,11 @@
}
sort.setResultType(s.getType());
+ try {
+ sort.setSortInfo(s.getSortInfo());
+ } catch (FrontendException e) {
+ throw new LogicalToPhysicalTranslatorException(e);
+ }
}
@@ -1556,6 +1562,7 @@
@Override
public void visit(LOStore loStore) throws VisitorException {
String scope = loStore.getOperatorKey().scope;
+
POStore store = new POStore(new OperatorKey(scope, nodeGen
.getNextNodeId(scope)));
store.setSFile(loStore.getOutputFile());
@@ -1581,6 +1588,19 @@
if(op != null) {
from = logToPhyMap.get(op.get(0));
+ SortInfo sortInfo = null;
+ // if store's predecessor is limit,
+ // check limit's predecessor
+ if(op.get(0) instanceof LOLimit) {
+ op = loStore.getPlan().getPredecessors(op.get(0));
+ }
+ PhysicalOperator sortPhyOp = logToPhyMap.get(op.get(0));
+ // if this predecessor is a sort, get
+ // the sort info.
+ if(op.get(0) instanceof LOSort) {
+ sortInfo = ((POSort)sortPhyOp).getSortInfo();
+ }
+ store.setSortInfo(sortInfo);
} else {
int errCode = 2051;
String msg = "Did not find a predecessor for Store." ;
Modified: hadoop/pig/branches/load-store-redesign/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/PhysicalOperator.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/PhysicalOperator.java?rev=831443&r1=831442&r2=831443&view=diff
==============================================================================
--- hadoop/pig/branches/load-store-redesign/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/PhysicalOperator.java (original)
+++ hadoop/pig/branches/load-store-redesign/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/PhysicalOperator.java Fri Oct 30 20:23:54 2009
@@ -276,7 +276,7 @@
}
public Result getNext(DataBag db) throws ExecException {
- Result ret = new Result();
+ Result ret = null;
DataBag tmpBag = BagFactory.getInstance().newDefaultBag();
for(ret = getNext(dummyTuple);ret.returnStatus!=POStatus.STATUS_EOP;ret=getNext(dummyTuple)){
if(ret.returnStatus == POStatus.STATUS_ERR) return ret;
Modified: hadoop/pig/branches/load-store-redesign/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/plans/PhysicalPlan.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/plans/PhysicalPlan.java?rev=831443&r1=831442&r2=831443&view=diff
==============================================================================
--- hadoop/pig/branches/load-store-redesign/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/plans/PhysicalPlan.java (original)
+++ hadoop/pig/branches/load-store-redesign/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/plans/PhysicalPlan.java Fri Oct 30 20:23:54 2009
@@ -257,13 +257,13 @@
new ArrayList<PhysicalOperator>(inputs.size());
PhysicalOperator cloneOp = matches.get(op);
if (cloneOp == null) {
- String msg = "Unable to find clone for op " + cloneOp.name();
+ String msg = "Unable to find clone for op " + op.name();
throw new CloneNotSupportedException(msg);
}
for (PhysicalOperator iOp : inputs) {
PhysicalOperator cloneIOp = matches.get(iOp);
if (cloneIOp == null) {
- String msg = "Unable to find clone for op " + cloneIOp.name();
+ String msg = "Unable to find clone for op " + iOp.name();
throw new CloneNotSupportedException(msg);
}
newInputs.add(cloneIOp);
Modified: hadoop/pig/branches/load-store-redesign/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POCombinerPackage.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POCombinerPackage.java?rev=831443&r1=831442&r2=831443&view=diff
==============================================================================
--- hadoop/pig/branches/load-store-redesign/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POCombinerPackage.java (original)
+++ hadoop/pig/branches/load-store-redesign/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POCombinerPackage.java Fri Oct 30 20:23:54 2009
@@ -25,6 +25,7 @@
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigMapReduce;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.POStatus;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.Result;
@@ -32,6 +33,7 @@
import org.apache.pig.data.BagFactory;
import org.apache.pig.data.DataBag;
import org.apache.pig.data.DataType;
+import org.apache.pig.data.InternalCachedBag;
import org.apache.pig.data.NonSpillableDataBag;
import org.apache.pig.data.Tuple;
import org.apache.pig.data.TupleFactory;
@@ -120,13 +122,25 @@
keyLookup = lrKeyInfo.second;
}
+ private DataBag createDataBag() {
+ String bagType = null;
+ if (PigMapReduce.sJobConf != null) {
+ bagType = PigMapReduce.sJobConf.get("pig.cachedbag.type");
+ }
+
+ if (bagType != null && bagType.equalsIgnoreCase("default")) {
+ return new NonSpillableDataBag();
+ }
+ return new InternalCachedBag();
+ }
+
@Override
public Result getNext(Tuple t) throws ExecException {
int keyField = -1;
//Create numInputs bags
Object[] fields = new Object[mBags.length];
for (int i = 0; i < mBags.length; i++) {
- if (mBags[i]) fields[i] = new NonSpillableDataBag();
+ if (mBags[i]) fields[i] = createDataBag();
}
// For each indexed tup in the inp, split them up and place their
Modified: hadoop/pig/branches/load-store-redesign/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/PODistinct.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/PODistinct.java?rev=831443&r1=831442&r2=831443&view=diff
==============================================================================
--- hadoop/pig/branches/load-store-redesign/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/PODistinct.java (original)
+++ hadoop/pig/branches/load-store-redesign/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/PODistinct.java Fri Oct 30 20:23:54 2009
@@ -24,6 +24,7 @@
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigMapReduce;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.POStatus;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.Result;
@@ -31,6 +32,8 @@
import org.apache.pig.data.BagFactory;
import org.apache.pig.data.DataBag;
import org.apache.pig.data.DataType;
+import org.apache.pig.data.InternalDistinctBag;
+import org.apache.pig.data.InternalSortedBag;
import org.apache.pig.data.Tuple;
import org.apache.pig.impl.plan.NodeIdGenerator;
import org.apache.pig.impl.plan.OperatorKey;
@@ -73,9 +76,21 @@
@Override
public Result getNext(Tuple t) throws ExecException {
- if (!inputsAccumulated) {
- Result in = processInput();
- distinctBag = BagFactory.getInstance().newDistinctBag();
+ if (!inputsAccumulated) {
+ Result in = processInput();
+
+ // by default, we create InternalSortedBag, unless user configures
+ // explicitly to use old bag
+ String bagType = null;
+ if (PigMapReduce.sJobConf != null) {
+ bagType = PigMapReduce.sJobConf.get("pig.cachedbag.distinct.type");
+ }
+ if (bagType != null && bagType.equalsIgnoreCase("default")) {
+ distinctBag = BagFactory.getInstance().newDistinctBag();
+ } else {
+ distinctBag = new InternalDistinctBag(3);
+ }
+
while (in.returnStatus != POStatus.STATUS_EOP) {
if (in.returnStatus == POStatus.STATUS_ERR) {
log.error("Error in reading from inputs");
Modified: hadoop/pig/branches/load-store-redesign/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POFRJoin.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POFRJoin.java?rev=831443&r1=831442&r2=831443&view=diff
==============================================================================
--- hadoop/pig/branches/load-store-redesign/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POFRJoin.java (original)
+++ hadoop/pig/branches/load-store-redesign/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POFRJoin.java Fri Oct 30 20:23:54 2009
@@ -95,22 +95,6 @@
private transient BagFactory mBagFactory;
private boolean setUp;
- public POFRJoin(OperatorKey k) throws PlanException, ExecException {
- this(k,-1,null, null, null, null, -1);
- }
-
- public POFRJoin(OperatorKey k, int rp) throws PlanException, ExecException {
- this(k, rp, null, null, null, null, -1);
- }
-
- public POFRJoin(OperatorKey k, List<PhysicalOperator> inp) throws PlanException, ExecException {
- this(k, -1, inp, null, null, null, -1);
- }
-
- public POFRJoin(OperatorKey k, int rp, List<PhysicalOperator> inp) throws PlanException, ExecException {
- this(k,rp,inp,null, null, null, -1);
- }
-
public POFRJoin(OperatorKey k, int rp, List<PhysicalOperator> inp, List<List<PhysicalPlan>> ppLists, List<List<Byte>> keyTypes, FileSpec[] replFiles, int fragment) throws ExecException{
super(k,rp,inp);