You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by ga...@apache.org on 2008/01/22 22:17:22 UTC
svn commit: r614325 [4/6] - in /incubator/pig/branches/types: ./ lib/
scripts/ src/org/apache/pig/ src/org/apache/pig/builtin/
src/org/apache/pig/data/ src/org/apache/pig/impl/
src/org/apache/pig/impl/builtin/ src/org/apache/pig/impl/eval/
src/org/apac...
Modified: incubator/pig/branches/types/src/org/apache/pig/impl/io/DataBagFileReader.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/io/DataBagFileReader.java?rev=614325&r1=614324&r2=614325&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/impl/io/DataBagFileReader.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/impl/io/DataBagFileReader.java Tue Jan 22 13:17:12 2008
@@ -15,75 +15,87 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.pig.impl.io;
+/*
+package org.apache.pig.impl.io;
+
+import java.io.BufferedInputStream;
+import java.io.DataInputStream;
+import java.io.EOFException;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.util.Iterator;
-import java.io.BufferedInputStream;
-import java.io.DataInputStream;
-import java.io.EOFException;
-import java.io.File;
-import java.io.FileInputStream;
-import java.io.IOException;
-import java.util.Iterator;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.impl.mapreduceExec.PigMapReduce;
-import org.apache.pig.data.Datum;
-import org.apache.pig.data.DatumImpl;
+
+public class DataBagFileReader {
+ File store;
+
+ public DataBagFileReader(File f) throws IOException{
+ store = f;
+ }
+
+ public static int notifyInterval = 1000;
+ public int numNotifies;
+ private class myIterator implements Iterator<Tuple>{
+ DataInputStream in;
+ Tuple nextTuple;
+ int curCall;
+
+ public myIterator() throws IOException{
+ numNotifies = 0;
+ in = new DataInputStream(new BufferedInputStream(new FileInputStream(store)));
+ getNextTuple();
+ }
+
+ private void getNextTuple() throws IOException{
+ if (curCall < notifyInterval - 1)
+ curCall ++;
+ else{
+ if (PigMapReduce.reporter != null)
+ PigMapReduce.reporter.progress();
+ curCall = 0;
+ numNotifies ++;
+ }
-
-public class DataBagFileReader {
- File store;
-
- public DataBagFileReader(File f) throws IOException{
- store = f;
- }
-
- private class myIterator implements Iterator<Datum>{
- DataInputStream in;
- Datum nextDatum;
-
- public myIterator() throws IOException{
- in = new DataInputStream(new BufferedInputStream(new FileInputStream(store)));
- getNextDatum();
- }
-
- private void getNextDatum() throws IOException{
- try{
- /*
- nextDatum = new Datum();
- nextDatum.readFields(in);
- */
- nextDatum = DatumImpl.readDatum(in);
- } catch (EOFException e) {
- in.close();
- nextDatum = null;
- }
- }
-
- public boolean hasNext(){
- return nextDatum != null;
- }
-
- public Datum next(){
- Datum returnValue = nextDatum;
- if (returnValue!=null){
- try{
- getNextDatum();
- }catch (IOException e){
- throw new RuntimeException(e.getMessage());
- }
- }
- return returnValue;
- }
-
- public void remove(){
- throw new RuntimeException("Read only cursor");
- }
- }
-
- public Iterator<Datum> content() throws IOException{
- return new myIterator();
- }
-
- public void clear() throws IOException{
- store.delete();
- }
-}
+ try{
+ nextTuple = new Tuple();
+ nextTuple.readFields(in);
+ } catch (EOFException e) {
+ in.close();
+ nextTuple = null;
+ }
+ }
+
+ public boolean hasNext(){
+ return nextTuple != null;
+ }
+
+ public Tuple next(){
+ Tuple returnValue = nextTuple;
+ if (returnValue!=null){
+ try{
+ getNextTuple();
+ }catch (IOException e){
+ throw new RuntimeException(e.getMessage());
+ }
+ }
+ return returnValue;
+ }
+
+ public void remove(){
+ throw new RuntimeException("Read only cursor");
+ }
+ }
+
+ public Iterator<Tuple> content() throws IOException{
+ return new myIterator();
+ }
+
+ public void clear() throws IOException{
+ store.delete();
+ }
+}
+*/
Modified: incubator/pig/branches/types/src/org/apache/pig/impl/io/DataBagFileWriter.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/io/DataBagFileWriter.java?rev=614325&r1=614324&r2=614325&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/impl/io/DataBagFileWriter.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/impl/io/DataBagFileWriter.java Tue Jan 22 13:17:12 2008
@@ -15,44 +15,56 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.pig.impl.io;
-
-import java.io.BufferedOutputStream;
-import java.io.DataOutputStream;
-import java.io.File;
-import java.io.FileOutputStream;
-import java.io.IOException;
-import java.util.Iterator;
-
-import org.apache.pig.data.Datum;
-
-
-
-public class DataBagFileWriter {
- File store;
- DataOutputStream out;
-
- public DataBagFileWriter(File store) throws IOException{
- this.store = store;
- out = new DataOutputStream(new BufferedOutputStream(new FileOutputStream(store)));
- }
-
- public void write(Datum d) throws IOException{
- d.write(out);
- }
+ /*
+package org.apache.pig.impl.io;
+
+import java.io.BufferedOutputStream;
+import java.io.DataOutputStream;
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.util.Iterator;
+
+import org.apache.pig.data.Tuple;
+
+
+
+public class DataBagFileWriter {
+ File store;
+ DataOutputStream out;
+
+ public DataBagFileWriter(File store) throws IOException{
+ this.store = store;
+ out = new DataOutputStream(new BufferedOutputStream(new FileOutputStream(store)));
+ }
+
+ public void write(Tuple t) throws IOException{
+ t.write(out);
+ }
+
+ public long write(Iterator<Tuple> iter) throws IOException{
- public void write(Iterator<Datum> iter) throws IOException{
- while (iter.hasNext())
+ long initialSize = getFileLength();
+ while (iter.hasNext())
iter.next().write(out);
- }
-
- public void close() throws IOException{
- flush();
- out.close();
- }
+
+ return getFileLength() - initialSize;
+ }
- public void flush() throws IOException{
+ public long getFileLength() throws IOException{
out.flush();
+ return store.length();
}
-}
+
+ public void close() throws IOException{
+ flush();
+ out.close();
+ }
+
+ public void flush() throws IOException{
+ out.flush();
+ }
+
+}
+*/
Modified: incubator/pig/branches/types/src/org/apache/pig/impl/io/FileLocalizer.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/io/FileLocalizer.java?rev=614325&r1=614324&r2=614325&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/impl/io/FileLocalizer.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/impl/io/FileLocalizer.java Tue Jan 22 13:17:12 2008
@@ -32,6 +32,7 @@
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.mapred.JobConf;
import org.apache.pig.PigServer.ExecType;
import org.apache.pig.impl.PigContext;
@@ -158,7 +159,10 @@
Path paths[] = null;
if (fs.exists(path)) {
if (fs.isFile(path)) return fs.open(path);
- paths = fs.listPaths(path);
+ FileStatus fileStat[] = fs.listStatus(path);
+ paths = new Path[fileStat.length];
+ for (int i = 0; i < fileStat.length; i++)
+ paths[i] = fileStat[i].getPath();
} else {
// It might be a glob
if (!globMatchesFiles(path, paths, fs)) throw new IOException(path + " does not exist");
Modified: incubator/pig/branches/types/src/org/apache/pig/impl/io/PigFile.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/io/PigFile.java?rev=614325&r1=614324&r2=614325&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/impl/io/PigFile.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/impl/io/PigFile.java Tue Jan 22 13:17:12 2008
@@ -27,7 +27,6 @@
import org.apache.pig.data.BagFactory;
import org.apache.pig.data.DataBag;
import org.apache.pig.data.Tuple;
-import org.apache.pig.data.Datum;
import org.apache.pig.impl.PigContext;
@@ -45,8 +44,7 @@
}
public DataBag load(LoadFunc lfunc, PigContext pigContext) throws IOException {
- DataBag content =
- BagFactory.getInstance().getNewBag(Datum.DataType.TUPLE);
+ DataBag content = BagFactory.getInstance().newDefaultBag();
InputStream is = FileLocalizer.open(file, pigContext);
lfunc.bindTo(file, new BufferedPositionedInputStream(is), 0, Long.MAX_VALUE);
Tuple f = null;
@@ -60,8 +58,8 @@
public void store(DataBag data, StoreFunc sfunc, PigContext pigContext) throws IOException {
BufferedOutputStream bos = new BufferedOutputStream(FileLocalizer.create(file, append, pigContext));
sfunc.bindTo(bos);
- for (Iterator<Datum> it = data.content(); it.hasNext();) {
- Tuple row = (Tuple)it.next();
+ for (Iterator<Tuple> it = data.iterator(); it.hasNext();) {
+ Tuple row = it.next();
sfunc.putNext(row);
}
sfunc.finish();
Modified: incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOCogroup.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOCogroup.java?rev=614325&r1=614324&r2=614325&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOCogroup.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOCogroup.java Tue Jan 22 13:17:12 2008
@@ -21,8 +21,8 @@
import java.util.ArrayList;
import java.util.List;
-import org.apache.pig.data.Datum;
import org.apache.pig.data.Tuple;
+import org.apache.pig.data.TupleFactory;
import org.apache.pig.impl.eval.EvalSpec;
import org.apache.pig.impl.logicalLayer.schema.AtomSchema;
import org.apache.pig.impl.logicalLayer.schema.Schema;
@@ -30,136 +30,153 @@
-public class LOCogroup extends LogicalOperator{
- private static final long serialVersionUID = 1L;
-
- protected ArrayList<EvalSpec> specs;
-
- public LOCogroup(List<LogicalOperator> inputs, ArrayList<EvalSpec> specs) {
- super(inputs);
+public class LOCogroup extends LogicalOperator {
+ private static final long serialVersionUID = 1L;
+
+ protected ArrayList<EvalSpec> specs;
+
+ public LOCogroup(List<LogicalOperator> inputs,
+ ArrayList<EvalSpec> specs) {
+ super(inputs);
this.specs = specs;
getOutputType();
}
-
- @Override
- public String name() {
- return "CoGroup";
- }
- @Override
- public String arguments() {
- StringBuffer sb = new StringBuffer();
-
+
+ @Override
+ public String name() {
+ if (inputs.size() == 1) return "Group";
+ else return "CoGroup";
+ }
+ @Override
+ public String arguments() {
+ StringBuffer sb = new StringBuffer();
+
for (int i = 0; i < specs.size(); i++) {
sb.append(specs.get(i));
- if (i+1 < specs.size()) sb.append(", ");
+ if (i + 1 < specs.size())
+ sb.append(", ");
}
-
+
return sb.toString();
- }
-
- public static Datum[] getGroupAndTuple(Datum d){
- if (!(d instanceof Tuple)){
- throw new RuntimeException("Internal Error: Evaluation of group expression did not return a tuple");
- }
- Tuple output = (Tuple)d;
- if (output.arity() < 2){
- throw new RuntimeException("Internal Error: Evaluation of group expression returned a tuple with <2 fields");
- }
-
- Datum[] groupAndTuple = new Datum[2];
- if (output.arity() == 2){
- groupAndTuple[0] = output.getField(0);
- groupAndTuple[1] = output.getField(1);
- }else{
- Tuple group = new Tuple();
- for (int j=0; j<output.arity()-1; j++){
- group.appendField(output.getField(j));
- }
- groupAndTuple[0] = group;
- groupAndTuple[1] = output.getField(output.arity()-1);
- }
- return groupAndTuple;
- }
-
+ }
+
+ public static Object[] getGroupAndTuple(Object d) {
+ if (!(d instanceof Tuple)) {
+ throw new RuntimeException
+ ("Internal Error: Evaluation of group expression did not return a tuple");
+ }
+ Tuple output = (Tuple) d;
+ if (output.size() < 2) {
+ throw new RuntimeException
+ ("Internal Error: Evaluation of group expression returned a tuple with <2 fields");
+ }
+
+ Object[] groupAndTuple = new Object[2];
+ try {
+ if (output.size() == 2) {
+ groupAndTuple[0] = output.get(0);
+ groupAndTuple[1] = output.get(1);
+ } else {
+ Tuple group = TupleFactory.getInstance().newTuple(output.size());
+ for (int j = 0; j < output.size() - 1; j++) {
+ group.set(j, output.get(j));
+ }
+ groupAndTuple[0] = group;
+ groupAndTuple[1] = output.get(output.size() - 1);
+ }
+ } catch(IOException e) {
+ throw new RuntimeException(e);
+ }
+ return groupAndTuple;
+ }
+
@Override
- public TupleSchema outputSchema() {
- if (schema == null){
- schema = new TupleSchema();
-
-
- Schema groupElementSchema = specs.get(0).getOutputSchemaForPipe(getInputs().get(0).outputSchema());
- if (groupElementSchema == null){
- groupElementSchema = new TupleSchema();
- groupElementSchema.setAlias("group");
- }else{
-
- if (!(groupElementSchema instanceof TupleSchema))
- throw new RuntimeException("Internal Error: Schema of group expression was atomic");
- List<Schema> fields = ((TupleSchema)groupElementSchema).getFields();
-
- if (fields.size() < 2)
- throw new RuntimeException("Internal Error: Schema of group expression retured <2 fields");
-
- if (fields.size() == 2){
- groupElementSchema = fields.get(0);
- groupElementSchema.removeAllAliases();
- groupElementSchema.setAlias("group");
- }else{
- groupElementSchema = new TupleSchema();
- groupElementSchema.setAlias("group");
-
- for (int i=0; i<fields.size()-1; i++){
- ((TupleSchema)groupElementSchema).add(fields.get(i));
- }
- }
-
- }
-
- schema.add(groupElementSchema);
-
- for (LogicalOperator lo : getInputs()) {
- TupleSchema inputSchema = lo.outputSchema();
- if (inputSchema == null)
- inputSchema = new TupleSchema();
- schema.add(inputSchema);
- }
- }
-
- schema.setAlias(alias);
+ public TupleSchema outputSchema() {
+ if (schema == null) {
+ schema = new TupleSchema();
+
+
+ Schema groupElementSchema =
+ specs.get(0).getOutputSchemaForPipe(getInputs().get(0).
+ outputSchema());
+ if (groupElementSchema == null) {
+ groupElementSchema = new TupleSchema();
+ groupElementSchema.setAlias("group");
+ } else {
+
+ if (!(groupElementSchema instanceof TupleSchema))
+ throw new RuntimeException
+ ("Internal Error: Schema of group expression was atomic");
+ List<Schema> fields =
+ ((TupleSchema) groupElementSchema).getFields();
+
+ if (fields.size() < 2)
+ throw new RuntimeException
+ ("Internal Error: Schema of group expression retured <2 fields");
+
+ if (fields.size() == 2) {
+ groupElementSchema = fields.get(0);
+ groupElementSchema.removeAllAliases();
+ groupElementSchema.setAlias("group");
+ } else {
+ groupElementSchema = new TupleSchema();
+ groupElementSchema.setAlias("group");
+
+ for (int i = 0; i < fields.size() - 1; i++) {
+ ((TupleSchema) groupElementSchema).add(fields.get(i));
+ }
+ }
+
+ }
+
+ schema.add(groupElementSchema);
+
+ for (LogicalOperator lo:getInputs()) {
+ TupleSchema inputSchema = lo.outputSchema();
+ if (inputSchema == null)
+ inputSchema = new TupleSchema();
+ schema.add(inputSchema);
+ }
+ }
+
+ schema.setAlias(alias);
return schema;
}
-
+
@Override
- public int getOutputType(){
- int outputType = FIXED;
- for (int i=0; i<getInputs().size(); i++){
- switch (getInputs().get(i).getOutputType()){
- case FIXED:
- continue;
- case MONOTONE:
- outputType = AMENDABLE;
- break;
- case AMENDABLE:
- default:
- throw new RuntimeException("Can't feed a cogroup into another in the streaming case");
- }
- }
- return outputType;
+ public int getOutputType() {
+ int outputType = FIXED;
+ for (int i = 0; i < getInputs().size(); i++) {
+ switch (getInputs().get(i).getOutputType()) {
+ case FIXED:
+ continue;
+ case MONOTONE:
+ outputType = AMENDABLE;
+ break;
+ case AMENDABLE:
+ default:
+ throw new RuntimeException
+ ("Can't feed a cogroup into another in the streaming case");
+ }
+ }
+ return outputType;
}
@Override
- public List<String> getFuncs() {
+ public List<String> getFuncs() {
List<String> funcs = super.getFuncs();
- for (EvalSpec spec: specs) {
+ for (EvalSpec spec:specs) {
funcs.addAll(spec.getFuncs());
}
return funcs;
}
- public ArrayList<EvalSpec> getSpecs() {
- return specs;
- }
+ public ArrayList<EvalSpec> getSpecs() {
+ return specs;
+ }
-
+ public void visit(LOVisitor v) {
+ v.visitCogroup(this);
+ }
}
Modified: incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOEval.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOEval.java?rev=614325&r1=614324&r2=614325&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOEval.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOEval.java Tue Jan 22 13:17:12 2008
@@ -24,34 +24,36 @@
-public class LOEval extends LogicalOperator{
- private static final long serialVersionUID = 1L;
-
- protected EvalSpec spec;
+public class LOEval extends LogicalOperator {
+ private static final long serialVersionUID = 1L;
+
+ protected EvalSpec spec;
public LOEval(LogicalOperator input, EvalSpec specIn) {
- super(input);
+ super(input);
spec = specIn;
getOutputType();
}
@Override
- public String name() {
- return "Eval";
+ public String name() {
+ return "Foreach";
}
@Override
- public String arguments() {
+ public String arguments() {
return spec.toString();
}
@Override
- public TupleSchema outputSchema() {
+ public TupleSchema outputSchema() {
if (schema == null) {
//System.out.println("LOEval input: " + inputs[0].outputSchema());
//System.out.println("LOEval spec: " + spec);
- schema = (TupleSchema)spec.getOutputSchemaForPipe(getInputs().get(0).outputSchema());
-
+ schema =
+ (TupleSchema) spec.getOutputSchemaForPipe(getInputs().get(0).
+ outputSchema());
+
//System.out.println("LOEval output: " + schema);
}
schema.setAlias(alias);
@@ -59,7 +61,7 @@
}
@Override
- public int getOutputType() {
+ public int getOutputType() {
switch (getInputs().get(0).getOutputType()) {
case FIXED:
return FIXED;
@@ -72,13 +74,17 @@
}
@Override
- public List<String> getFuncs() {
+ public List<String> getFuncs() {
List<String> funcs = super.getFuncs();
funcs.addAll(spec.getFuncs());
return funcs;
}
- public EvalSpec getSpec() {
- return spec;
+ public EvalSpec getSpec() {
+ return spec;
+ }
+
+ public void visit(LOVisitor v) {
+ v.visitEval(this);
}
}
Modified: incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOLoad.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOLoad.java?rev=614325&r1=614324&r2=614325&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOLoad.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOLoad.java Tue Jan 22 13:17:12 2008
@@ -29,70 +29,67 @@
public class LOLoad extends LogicalOperator {
- private static final long serialVersionUID = 1L;
-
- protected FileSpec inputFileSpec;
-
- protected int outputType = FIXED;
+ private static final long serialVersionUID = 1L;
+ protected FileSpec inputFileSpec;
- public LOLoad(FileSpec inputFileSpec) throws IOException, ParseException{
- super();
- this.inputFileSpec = inputFileSpec;
- try
- {
- LoadFunc storageFunc = (LoadFunc) PigContext.instantiateFuncFromSpec(inputFileSpec.getFuncSpec());
- }
- catch (IOException e)
- {
- Throwable cause = e.getCause();
- while (cause != null && cause.getClass().getName() != "java.lang.ClassNotFoundException")
- {
- System.out.println("cause = " + cause.getClass().getName());
- cause = cause.getCause();
- }
-
- if (cause != null)
- {
- throw new ParseException("Load function " + inputFileSpec.getFuncSpec() + " not found");
- }
- else
- {
- throw e;
- }
-
- }
+ protected int outputType = FIXED;
+
+
+ public LOLoad(FileSpec inputFileSpec) throws IOException, ParseException {
+ super();
+ this.inputFileSpec = inputFileSpec;
+ try {
+ LoadFunc storageFunc =
+ (LoadFunc) PigContext.instantiateFuncFromSpec(inputFileSpec.
+ getFuncSpec());
+ } catch(IOException e) {
+ Throwable cause = e.getCause();
+ while (cause != null
+ && cause.getClass().getName() !=
+ "java.lang.ClassNotFoundException") {
+ System.out.println("cause = " + cause.getClass().getName());
+ cause = cause.getCause();
+ } if (cause != null) {
+ throw new ParseException("Load function " +
+ inputFileSpec.getFuncSpec() +
+ " not found");
+ } else {
+ throw e;
+ }
+
+ }
//TODO: Handle Schemas defined by Load Functions
schema = new TupleSchema();
}
@Override
- public String name() {
+ public String name() {
return "Load";
}
-
- public FileSpec getInputFileSpec(){
- return inputFileSpec;
+
+ public FileSpec getInputFileSpec() {
+ return inputFileSpec;
}
-
+
public void setInputFileSpec(FileSpec spec) {
- inputFileSpec = spec;
+ inputFileSpec = spec;
}
-
- @Override
- public String arguments() {
- return inputFileSpec.toString();
+
+ @Override
+ public String arguments() {
+ return inputFileSpec.toString();
}
@Override
- public TupleSchema outputSchema() {
- schema.setAlias(alias);
+ public TupleSchema outputSchema() {
+ schema.setAlias(alias);
return this.schema;
}
@Override
- public int getOutputType() {
+ public int getOutputType() {
return outputType;
}
@@ -104,18 +101,22 @@
}
@Override
- public String toString() {
- StringBuffer result = new StringBuffer(super.toString());
- result.append(" (outputType: ");
- result.append(outputType);
- result.append(')');
- return result.toString();
- }
+ public String toString() {
+ StringBuffer result = new StringBuffer(super.toString());
+ result.append(" (outputType: ");
+ result.append(outputType);
+ result.append(')');
+ return result.toString();
+ }
- @Override
- public List<String> getFuncs() {
+ @Override
+ public List<String> getFuncs() {
List<String> funcs = super.getFuncs();
funcs.add(inputFileSpec.getFuncName());
return funcs;
}
+
+ public void visit(LOVisitor v) {
+ v.visitLoad(this);
+ }
}
Modified: incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LORead.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LORead.java?rev=614325&r1=614324&r2=614325&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LORead.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LORead.java Tue Jan 22 13:17:12 2008
@@ -24,78 +24,80 @@
public class LORead extends LogicalOperator {
- private static final long serialVersionUID = 1L;
+ private static final long serialVersionUID = 1L;
-
- protected IntermedResult readFrom = null;
- boolean readsFromSplit = false;
-
- @Override
- public String toString() {
- StringBuffer result = new StringBuffer(super.toString());
- result.append(" (readsFromSplit: ");
- result.append(readsFromSplit);
- result.append(')');
- return result.toString();
- }
+ protected IntermedResult readFrom = null;
+ boolean readsFromSplit = false;
+ @Override
+ public String toString() {
+ StringBuffer result = new StringBuffer(super.toString());
+ result.append(" (readsFromSplit: ");
+ result.append(readsFromSplit);
+ result.append(')');
+ return result.toString();
+ }
- //Since intermed result may have multiple outputs, which output do I read?
-
+ //Since intermed result may have multiple outputs, which output do I read?
public int splitOutputToRead = 0;
-
- public LORead(IntermedResult readFromIn) {
- super();
+
+ public LORead(IntermedResult readFromIn) {
+ super();
readFrom = readFromIn;
- }
-
- public LORead(IntermedResult readFromIn, int outputToRead) {
- super();
- readsFromSplit = true;
- this.splitOutputToRead = outputToRead;
+ }
+
+ public LORead(IntermedResult readFromIn, int outputToRead) {
+ super();
+ readsFromSplit = true;
+ this.splitOutputToRead = outputToRead;
readFrom = readFromIn;
- }
-
- public boolean readsFromSplit(){
- return readsFromSplit;
- }
-
- @Override
- public String name() {
- return "Read";
- }
- @Override
- public String arguments() {
- return alias;
- }
-
+ }
+
+ public boolean readsFromSplit() {
+ return readsFromSplit;
+ }
+
+ @Override
+ public String name() {
+ return "Read";
+ }
+ @Override
+ public String arguments() {
+ return alias;
+ }
+
@Override
- public TupleSchema outputSchema() {
- if (schema == null) {
- if (readFrom.lp != null && readFrom.lp.root != null && readFrom.lp.root.outputSchema() != null) {
+ public TupleSchema outputSchema() {
+ if (schema == null) {
+ if (readFrom.lp != null && readFrom.lp.root != null
+ && readFrom.lp.root.outputSchema() != null) {
schema = readFrom.lp.root.outputSchema().copy();
} else {
schema = new TupleSchema();
}
- }
-
- schema.removeAllAliases();
+ }
+
+ schema.removeAllAliases();
schema.setAlias(alias);
-
+
return schema;
}
-
-
-
- @Override
- public int getOutputType(){
- return readFrom.getOutputType();
- }
- public IntermedResult getReadFrom() {
- return readFrom;
+
+
+ @Override
+ public int getOutputType() {
+ return readFrom.getOutputType();
+ }
+
+ public IntermedResult getReadFrom() {
+ return readFrom;
+ }
+
+ public void visit(LOVisitor v) {
+ v.visitRead(this);
}
}
Modified: incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOSort.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOSort.java?rev=614325&r1=614324&r2=614325&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOSort.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOSort.java Tue Jan 22 13:17:12 2008
@@ -15,64 +15,67 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.pig.impl.logicalLayer;
-
-
+package org.apache.pig.impl.logicalLayer;
+
+
import org.apache.pig.impl.eval.EvalSpec;
import org.apache.pig.impl.logicalLayer.schema.TupleSchema;
-
-public class LOSort extends LogicalOperator {
- private static final long serialVersionUID = 1L;
- private EvalSpec sortSpec;
-
-
- protected EvalSpec spec;
-
- public EvalSpec getSpec() {
- return spec;
- }
-
-
-
- public LOSort( LogicalOperator input, EvalSpec sortSpec){
- super(input);
- this.sortSpec = sortSpec;
- getOutputType();
- }
-
- @Override
- public String name() {
- return "SORT";
- }
-
- @Override
- public String arguments() {
- return sortSpec.toString();
- }
-
- @Override
- public int getOutputType() {
- switch(getInputs().get(0).getOutputType()){
- case FIXED:
- return FIXED;
- default:
- throw new RuntimeException("Blocking operator such as sort cannot handle streaming input");
- }
- }
-
- @Override
- public TupleSchema outputSchema() {
- if (schema== null)
- schema = getInputs().get(0).outputSchema().copy();
-
- schema.setAlias(alias);
- return schema;
-
- }
-
- public EvalSpec getSortSpec() {
- return sortSpec;
- }
-
-}
+
+public class LOSort extends LogicalOperator {
+ private static final long serialVersionUID = 1L;
+ private EvalSpec sortSpec;
+
+
+ protected EvalSpec spec;
+
+ public EvalSpec getSpec() {
+ return spec;
+ }
+
+ public LOSort(LogicalOperator input, EvalSpec sortSpec) {
+ super(input);
+ this.sortSpec = sortSpec;
+ getOutputType();
+ }
+
+ @Override
+ public String name() {
+ return "SORT";
+ }
+
+ @Override
+ public String arguments() {
+ return sortSpec.toString();
+ }
+
+ @Override
+ public int getOutputType() {
+ switch (getInputs().get(0).getOutputType()) {
+ case FIXED:
+ return FIXED;
+ default:
+ throw new RuntimeException
+ ("Blocking operator such as sort cannot handle streaming input");
+ }
+ }
+
+ @Override
+ public TupleSchema outputSchema() {
+ if (schema == null)
+ schema = getInputs().get(0).outputSchema().copy();
+
+ schema.setAlias(alias);
+ return schema;
+
+ }
+
+ public EvalSpec getSortSpec() {
+ return sortSpec;
+ }
+
+ public void visit(LOVisitor v) {
+ v.visitSort(this);
+ }
+
+}
Modified: incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOSplit.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOSplit.java?rev=614325&r1=614324&r2=614325&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOSplit.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOSplit.java Tue Jan 22 13:17:12 2008
@@ -15,41 +15,44 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.pig.impl.logicalLayer;
-
-import java.util.ArrayList;
-import java.util.List;
+package org.apache.pig.impl.logicalLayer;
+
+import java.util.ArrayList;
+import java.util.List;
import org.apache.pig.impl.eval.cond.Cond;
import org.apache.pig.impl.logicalLayer.schema.TupleSchema;
-
-
-public class LOSplit extends LogicalOperator {
- private static final long serialVersionUID = 1L;
-
- List<Cond> conditions = new ArrayList<Cond>();
-
- public LOSplit(LogicalOperator input){
- super(input);
- }
-
- public void addCond(Cond cond){
- conditions.add(cond);
- }
-
- @Override
- public int getOutputType(){
- return getInputs().get(0).getOutputType();
- }
-
- public ArrayList<Cond> getConditions(){
- return new ArrayList<Cond>(conditions);
- }
-
- @Override
- public TupleSchema outputSchema(){
- return getInputs().get(0).outputSchema().copy();
- }
-
-
-}
+
+
+public class LOSplit extends LogicalOperator {
+ private static final long serialVersionUID = 1L;
+
+ List<Cond> conditions = new ArrayList<Cond>();
+
+ public LOSplit(LogicalOperator input) {
+ super(input);
+ }
+
+ public void addCond(Cond cond) {
+ conditions.add(cond);
+ }
+
+ @Override
+ public int getOutputType() {
+ return getInputs().get(0).getOutputType();
+ }
+
+ public ArrayList<Cond> getConditions() {
+ return new ArrayList<Cond> (conditions);
+ }
+
+ @Override
+ public TupleSchema outputSchema() {
+ return getInputs().get(0).outputSchema().copy();
+ }
+
+ public void visit(LOVisitor v) {
+ v.visitSplit(this);
+ }
+
+}
Modified: incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOStore.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOStore.java?rev=614325&r1=614324&r2=614325&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOStore.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOStore.java Tue Jan 22 13:17:12 2008
@@ -27,59 +27,62 @@
public class LOStore extends LogicalOperator {
- private static final long serialVersionUID = 1L;
-
- protected FileSpec outputFileSpec;
+ private static final long serialVersionUID = 1L;
- protected boolean append;
-
+ protected FileSpec outputFileSpec;
- public LOStore(LogicalOperator input, FileSpec fileSpec, boolean append) throws IOException{
- super(input);
+ protected boolean append;
+
+
+ public LOStore(LogicalOperator input,
+ FileSpec fileSpec,
+ boolean append) throws IOException {
+ super(input);
this.outputFileSpec = fileSpec;
this.append = append;
//See if the store function spec is valid
- try{
- StoreFunc StoreFunc = (StoreFunc) PigContext.instantiateFuncFromSpec(fileSpec.getFuncSpec());
- }catch (Exception e){
- IOException ioe = new IOException(e.getMessage());
- ioe.setStackTrace(e.getStackTrace());
- throw ioe;
- }
-
- getOutputType();
+ try {
+ StoreFunc StoreFunc =
+ (StoreFunc) PigContext.instantiateFuncFromSpec(
+ fileSpec.getFuncSpec());
+ } catch(Exception e) {
+ IOException ioe = new IOException(e.getMessage());
+ ioe.setStackTrace(e.getStackTrace());
+ throw ioe;
+ } getOutputType();
}
-
-
- public FileSpec getOutputFileSpec(){
- return outputFileSpec;
- }
-
-
- @Override
- public String toString() {
-
- StringBuffer result = new StringBuffer(super.toString());
- result.append(" (append: ");
- result.append(append);
- result.append(')');
- return result.toString();
- }
- @Override
- public String name() {
+ public FileSpec getOutputFileSpec() {
+ return outputFileSpec;
+ }
+
+
+ @Override
+ public String toString() {
+ StringBuffer result = new StringBuffer(super.toString());
+ result.append(" (append: ");
+ result.append(append);
+ result.append(')');
+ return result.toString();
+ }
+
+
+ @Override
+ public String name() {
return "Store";
}
@Override
- public TupleSchema outputSchema() {
- throw new RuntimeException("Internal error: Asking for schema of a store operator.");
+ public TupleSchema outputSchema() {
+ throw new
+ RuntimeException
+ ("Internal error: Asking for schema of a store operator.");
}
@Override
- public int getOutputType() {
+ public int getOutputType() {
switch (getInputs().get(0).getOutputType()) {
case FIXED:
return FIXED;
@@ -91,14 +94,18 @@
}
@Override
- public List<String> getFuncs() {
+ public List<String> getFuncs() {
List<String> funcs = super.getFuncs();
funcs.add(outputFileSpec.getFuncName());
return funcs;
}
- public boolean isAppend() {
- return append;
+ public boolean isAppend() {
+ return append;
+ }
+
+ public void visit(LOVisitor v) {
+ v.visitStore(this);
}
}
Modified: incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOUnion.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOUnion.java?rev=614325&r1=614324&r2=614325&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOUnion.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOUnion.java Tue Jan 22 13:17:12 2008
@@ -24,52 +24,59 @@
public class LOUnion extends LogicalOperator {
- private static final long serialVersionUID = 1L;
-
+ private static final long serialVersionUID = 1L;
+
public LOUnion(List<LogicalOperator> inputsIn) {
- super(inputsIn);
+ super(inputsIn);
}
-
+
@Override
- public String name() {
+ public String name() {
return "Union";
}
@Override
- public TupleSchema outputSchema() {
- if (schema == null){
- TupleSchema longest = getInputs().get(0).outputSchema();
- int current = 0;
- for (LogicalOperator lo : getInputs()) {
- if (lo != null && lo.outputSchema() != null && lo.outputSchema().numFields() > current) {
- longest = lo.outputSchema();
- current = longest.numFields();
- }
- }
- schema = longest.copy();
- }
-
- schema.setAlias(alias);
+ public TupleSchema outputSchema() {
+ if (schema == null) {
+ TupleSchema longest = getInputs().get(0).outputSchema();
+ int current = 0;
+ for (LogicalOperator lo:getInputs()) {
+ if (lo != null && lo.outputSchema() != null
+ && lo.outputSchema().numFields() > current) {
+ longest = lo.outputSchema();
+ current = longest.numFields();
+ }
+ }
+ schema = longest.copy();
+ }
+
+ schema.setAlias(alias);
return schema;
}
-
- @Override
- public int getOutputType(){
- int outputType = FIXED;
- for (int i=0; i<getInputs().size(); i++){
- switch (getInputs().get(i).getOutputType()){
- case FIXED:
- continue;
- case MONOTONE:
- if (outputType == FIXED)
- outputType = MONOTONE;
- continue;
- case AMENDABLE:
- outputType = AMENDABLE;
- default:
- throw new RuntimeException("Invalid input type to the UNION operator");
- }
- }
- return outputType;
+
+ @Override
+ public int getOutputType() {
+ int outputType = FIXED;
+ for (int i = 0; i < getInputs().size(); i++) {
+ switch (getInputs().get(i).getOutputType()) {
+ case FIXED:
+ continue;
+ case MONOTONE:
+ if (outputType == FIXED)
+ outputType = MONOTONE;
+ continue;
+ case AMENDABLE:
+ outputType = AMENDABLE;
+ default:
+ throw new
+ RuntimeException
+ ("Invalid input type to the UNION operator");
+ }
+ }
+ return outputType;
+ }
+
+ public void visit(LOVisitor v) {
+ v.visitUnion(this);
}
}
Modified: incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LogicalOperator.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LogicalOperator.java?rev=614325&r1=614324&r2=614325&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LogicalOperator.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LogicalOperator.java Tue Jan 22 13:17:12 2008
@@ -27,58 +27,55 @@
abstract public class LogicalOperator implements Serializable {
- public String alias = null;
-
- public static final int FIXED = 1;
- public static final int MONOTONE = 2;
- public static final int UPDATABLE = 3; // Reserved for future use
- public static final int AMENDABLE = 4;
-
- protected int requestedParallelism = -1;
- protected TupleSchema schema = null;
- protected List<LogicalOperator> inputs;
-
- protected LogicalOperator(){
- this.inputs = new ArrayList<LogicalOperator>();
- }
-
- protected LogicalOperator(List<LogicalOperator> inputs) {
- this.inputs = inputs;
- }
-
- protected LogicalOperator(LogicalOperator input) {
- this.inputs = new ArrayList<LogicalOperator>();
- inputs.add(input);
- }
-
- public String getAlias() {
- return alias;
- }
-
- public void setAlias(String newAlias) {
- alias = newAlias;
- }
-
- public int getRequestedParallelism() {
- return requestedParallelism;
- }
-
- public void setRequestedParallelism(int newRequestedParallelism) {
- requestedParallelism = newRequestedParallelism;
- }
-
- @Override
- public String toString() {
- StringBuffer result = new StringBuffer(super.toString());
- result.append(" (alias: ");
- result.append(alias);
- result.append(", requestedParallelism: ");
- result.append(requestedParallelism);
- result.append(')');
- return result.toString();
- }
+ public String alias = null;
- public abstract TupleSchema outputSchema();
+ public static final int FIXED = 1;
+ public static final int MONOTONE = 2;
+ public static final int UPDATABLE = 3; // Reserved for future use
+ public static final int AMENDABLE = 4;
+
+ protected int requestedParallelism = -1;
+ protected TupleSchema schema = null;
+ protected List<LogicalOperator> inputs;
+
+ protected LogicalOperator() {
+ this.inputs = new ArrayList<LogicalOperator> ();
+ } protected LogicalOperator(List<LogicalOperator> inputs) {
+ this.inputs = inputs;
+ }
+
+ protected LogicalOperator(LogicalOperator input) {
+ this.inputs = new ArrayList<LogicalOperator> ();
+ inputs.add(input);
+ }
+
+ public String getAlias() {
+ return alias;
+ }
+
+ public void setAlias(String newAlias) {
+ alias = newAlias;
+ }
+
+ public int getRequestedParallelism() {
+ return requestedParallelism;
+ }
+
+ public void setRequestedParallelism(int newRequestedParallelism) {
+ requestedParallelism = newRequestedParallelism;
+ }
+
+ @Override public String toString() {
+ StringBuffer result = new StringBuffer(super.toString());
+ result.append(" (alias: ");
+ result.append(alias);
+ result.append(", requestedParallelism: ");
+ result.append(requestedParallelism);
+ result.append(')');
+ return result.toString();
+ }
+
+ public abstract TupleSchema outputSchema();
public String name() {
return "ROOT";
@@ -99,10 +96,17 @@
}
return funcs;
}
-
+
public abstract int getOutputType();
- public void setSchema(TupleSchema schema) {
- this.schema = schema;
- }
+ public void setSchema(TupleSchema schema) {
+ this.schema = schema;
+ }
+
+ /**
+ * Visit all of the logical operators in a tree, starting with this
+ * one.
+ * @param v LOVisitor to visit this logical plan with.
+ */
+ public abstract void visit(LOVisitor v);
}
Propchange: incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/parser/
------------------------------------------------------------------------------
--- svn:ignore (added)
+++ svn:ignore Tue Jan 22 13:17:12 2008
@@ -0,0 +1,8 @@
+
+TokenMgrError.java
+Token.java
+SimpleNode.java
+SimpleCharStream.java
+ParseException.java
+Node.java
+JJTQueryParserState.java
Modified: incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/parser/QueryParser.jjt
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/parser/QueryParser.jjt?rev=614325&r1=614324&r2=614325&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/parser/QueryParser.jjt (original)
+++ incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/parser/QueryParser.jjt Tue Jan 22 13:17:12 2008
@@ -180,7 +180,8 @@
if (spec instanceof CompositeEvalSpec)
spec = ((CompositeEvalSpec)spec).getSpecs().get(0);
if ( spec instanceof ConstSpec ||
- (spec instanceof FuncEvalSpec && ((FuncEvalSpec)spec).getReturnType() == DataAtom.class))
+ (spec instanceof FuncEvalSpec &&
+ DataType.isAtomic(DataType.findType(((FuncEvalSpec)spec).getReturnType()))))
isAtomic = true;
else if (spec instanceof FuncEvalSpec)
isAtomic = false;
@@ -515,7 +516,7 @@
}
-LogicalOperator OrderClause() : {LogicalOperator op; EvalSpec sortSpec = null; ProjectSpec projSpec;}
+LogicalOperator OrderClause() : {LogicalOperator op; EvalSpec sortSpec = null; ProjectSpec projSpec; String funcName;}
{
(
op = NestedExpr() <BY>
@@ -530,6 +531,17 @@
)
| (sortSpec = Star() {sortSpec = new GenerateSpec(sortSpec);})
)
+ (
+ <USING> funcName = QualifiedFunction()
+ {
+ try {
+ sortSpec.setComparatorName(funcName);
+ } catch (Exception e){
+ throw new ParseException(e.getMessage());
+ }
+ }
+ )?
+
)
{
return new LOSort(op, sortSpec);
@@ -607,13 +619,23 @@
}
EvalSpec NestedSortOrArrange(Schema over, Map<String, EvalSpec> specs):
-{EvalSpec sortSpec; ProjectSpec projSpec; EvalSpec item; Schema subSchema = null; Token t;}
+{EvalSpec sortSpec; ProjectSpec projSpec; EvalSpec item; Schema subSchema = null; Token t; String funcName;}
{
(
( t = <ORDER> | t = <ARRANGE> )
item = BaseEvalSpec(over,specs) { subSchema = item.getOutputSchemaForPipe(over); }
- <BY> ( (projSpec = SimpleProj(subSchema) {sortSpec = projSpec;})
- | sortSpec = Star() )
+ <BY> ( (projSpec = SimpleProj(subSchema) {sortSpec = projSpec;})
+ | sortSpec = Star() )
+ (
+ <USING> funcName = QualifiedFunction()
+ {
+ try {
+ sortSpec.setComparatorName(funcName);
+ } catch (Exception e){
+ throw new ParseException(e.getMessage());
+ }
+ }
+ )?
)
{ return copyItemAndAddSpec(item,new SortDistinctSpec(false, sortSpec)); }
}
@@ -932,6 +954,7 @@
return funcName;
}
}
+
/**
* Bug 831620 - '$' support
Modified: incubator/pig/branches/types/src/org/apache/pig/impl/mapreduceExec/MapReduceLauncher.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/mapreduceExec/MapReduceLauncher.java?rev=614325&r1=614324&r2=614325&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/impl/mapreduceExec/MapReduceLauncher.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/impl/mapreduceExec/MapReduceLauncher.java Tue Jan 22 13:17:12 2008
@@ -17,8 +17,6 @@
*/
package org.apache.pig.impl.mapreduceExec;
-import java.io.ByteArrayOutputStream;
-import java.io.DataOutputStream;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
@@ -29,19 +27,21 @@
import org.apache.log4j.Logger;
import org.apache.pig.builtin.PigStorage;
import org.apache.pig.data.DataBag;
+import org.apache.pig.data.BagFactory;
import org.apache.pig.data.IndexedTuple;
import org.apache.pig.data.Tuple;
-import org.apache.pig.data.Datum;
-import org.apache.pig.impl.PigContext;
import org.apache.pig.impl.eval.EvalSpec;
import org.apache.pig.impl.io.PigFile;
import org.apache.pig.impl.physicalLayer.POMapreduce;
import org.apache.pig.impl.util.JarManager;
+import org.apache.pig.impl.util.PigLogger;
import org.apache.pig.impl.util.ObjectSerializer;
import org.apache.pig.impl.util.PigLogger;
import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.UTF8;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.WritableComparator;
+import org.apache.hadoop.io.WritableComparator;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.TaskReport;
import org.apache.hadoop.mapred.JobClient;
@@ -65,7 +65,17 @@
numMRJobs = numMRJobsIn;
mrJobNumber = 0;
}
-
+
+ public static class PigWritableComparator extends WritableComparator {
+ public PigWritableComparator() {
+ super(Tuple.class);
+ }
+
+ public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2){
+ return WritableComparator.compareBytes(b1, s1, l1, b2, s2, l2);
+ }
+ }
+
static Random rand = new Random();
/**
@@ -91,7 +101,7 @@
* @throws IOException
*/
public boolean launchPig(POMapreduce pom) throws IOException {
- Logger log = PigLogger.getLogger();
+ Logger log = PigLogger.getLogger();
JobConf conf = new JobConf(pom.pigContext.getConf());
conf.setJobName(pom.pigContext.getJobName());
boolean success = false;
@@ -112,7 +122,7 @@
{
FileOutputStream fos = new FileOutputStream(submitJarFile);
JarManager.createJar(fos, funcs, pom.pigContext);
- System.out.println("Job jar size = " + submitJarFile.length());
+ log.debug("Job jar size = " + submitJarFile.length());
conf.setJar(submitJarFile.getPath());
String user = System.getProperty("user.name");
conf.setUser(user != null ? user : "Pigster");
@@ -133,20 +143,28 @@
conf.set("pig.pigContext", ObjectSerializer.serialize(pom.pigContext));
conf.setMapRunnerClass(PigMapReduce.class);
- if (pom.toCombine != null)
- conf.setCombinerClass(PigCombine.class);
+ if (pom.toCombine != null) {
+ conf.setCombinerClass(PigCombine.class);
+ //conf.setCombinerClass(PigMapReduce.class);
+ }
if (pom.quantilesFile!=null){
conf.set("pig.quantilesFile", pom.quantilesFile);
- }
+ }
+ else{
+ // this is not a sort job - can use byte comparison to speed up processing
+ conf.setOutputKeyComparatorClass(PigWritableComparator.class);
+ }
if (pom.partitionFunction!=null){
conf.setPartitionerClass(SortPartitioner.class);
}
conf.setReducerClass(PigMapReduce.class);
conf.setInputFormat(PigInputFormat.class);
conf.setOutputFormat(PigOutputFormat.class);
- conf.setInputKeyClass(UTF8.class);
- conf.setInputValueClass(Tuple.class);
+ // not used starting with 0.15 conf.setInputKeyClass(Text.class);
+ // not used starting with 0.15 conf.setInputValueClass(Tuple.class);
conf.setOutputKeyClass(Tuple.class);
+ if (pom.userComparator != null)
+ conf.setOutputKeyComparatorClass(pom.userComparator);
conf.setOutputValueClass(IndexedTuple.class);
conf.set("pig.inputs", ObjectSerializer.serialize(pom.inputFileSpecs));
@@ -212,8 +230,8 @@
// create an empty output file
PigFile f = new PigFile(outputFile.toString(), false);
- f.store(new DataBag(Datum.DataType.TUPLE),
- new PigStorage(), pom.pigContext);
+ f.store(BagFactory.getInstance().newDefaultBag(),
+ new PigStorage(), pom.pigContext);
}
Modified: incubator/pig/branches/types/src/org/apache/pig/impl/mapreduceExec/PigCombine.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/mapreduceExec/PigCombine.java?rev=614325&r1=614324&r2=614325&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/impl/mapreduceExec/PigCombine.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/impl/mapreduceExec/PigCombine.java Tue Jan 22 13:17:12 2008
@@ -28,10 +28,10 @@
import org.apache.hadoop.mapred.Reducer;
import org.apache.hadoop.mapred.Reporter;
import org.apache.pig.data.BagFactory;
-import org.apache.pig.data.BigDataBag;
-import org.apache.pig.data.Datum;
+import org.apache.pig.data.DataBag;
import org.apache.pig.data.IndexedTuple;
import org.apache.pig.data.Tuple;
+import org.apache.pig.data.TupleFactory;
import org.apache.pig.impl.PigContext;
import org.apache.pig.impl.eval.EvalSpec;
import org.apache.pig.impl.eval.collector.DataCollector;
@@ -47,17 +47,20 @@
private OutputCollector oc;
private int index;
private int inputCount;
- private BigDataBag bags[];
- private File tmpdir;
+ private DataBag bags[];
+ private TupleFactory mTupleFactory = TupleFactory.getInstance();
+ // private File tmpdir;
public void reduce(WritableComparable key, Iterator values, OutputCollector output, Reporter reporter)
throws IOException {
try {
+ /*
tmpdir = new File(job.get("mapred.task.id"));
tmpdir.mkdirs();
BagFactory.init(tmpdir);
+ */
PigContext pigContext = (PigContext) ObjectSerializer.deserialize(job.get("pig.pigContext"));
if (evalPipe == null) {
inputCount = ((ArrayList<FileSpec>)ObjectSerializer.deserialize(job.get("pig.inputs"))).size();
@@ -69,37 +72,37 @@
evalPipe = esp.setupPipe(finalout);
//throw new RuntimeException("combine spec: " + evalSpec + " combine pipe: " + esp.toString());
- bags = new BigDataBag[inputCount];
+ bags = new DataBag[inputCount];
for (int i = 0; i < inputCount; i++) {
- bags[i] = BagFactory.getInstance().getNewBigBag(Datum.DataType.TUPLE);
+ bags[i] = BagFactory.getInstance().newDefaultBag();
}
}
PigSplit split = PigInputFormat.PigRecordReader.getPigRecordReader().getPigFileSplit();
index = split.getIndex();
- Datum groupName = ((Tuple) key).getField(0);
+ String groupName = (String)((Tuple) key).get(0);
finalout.group = ((Tuple) key);
finalout.index = index;
- Tuple t = new Tuple(1 + inputCount);
- t.setField(0, groupName);
+ Tuple t = mTupleFactory.newTuple(1 + inputCount);
+ t.set(0, groupName);
for (int i = 1; i < 1 + inputCount; i++) {
bags[i - 1].clear();
- t.setField(i, bags[i - 1]);
+ t.set(i, bags[i - 1]);
}
while (values.hasNext()) {
IndexedTuple it = (IndexedTuple) values.next();
- t.getBagField(it.index + 1).add(it);
+ ((DataBag)t.get(it.index + 1)).add(it.toTuple());
}
for (int i = 0; i < inputCount; i++) { // XXX: shouldn't we only do this if INNER flag is set?
- if (t.getBagField(1 + i).isEmpty())
+ if (((DataBag)t.get(1 + i)).size() == 0)
return;
}
// throw new RuntimeException("combine input: " + t.toString());
evalPipe.add(t);
- evalPipe.add(null); // EOF marker
+ // evalPipe.add(null); // EOF marker
} catch (Throwable tr) {
tr.printStackTrace();
RuntimeException exp = new RuntimeException(tr.getMessage());
@@ -132,10 +135,11 @@
}
@Override
- public void add(Datum d){
+ public void add(Object d){
if (d == null) return; // EOF marker from eval pipeline; ignore
try{
- oc.collect(group, new IndexedTuple(((Tuple)d).getTupleField(0),index));
+ // oc.collect(group, new IndexedTuple(((Tuple)d).getTupleField(0),index));
+ oc.collect(group, new IndexedTuple(((Tuple)d),index));
}catch (IOException e){
throw new RuntimeException(e);
}
Modified: incubator/pig/branches/types/src/org/apache/pig/impl/mapreduceExec/PigInputFormat.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/mapreduceExec/PigInputFormat.java?rev=614325&r1=614324&r2=614325&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/impl/mapreduceExec/PigInputFormat.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/impl/mapreduceExec/PigInputFormat.java Tue Jan 22 13:17:12 2008
@@ -24,7 +24,8 @@
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.UTF8;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.compress.CompressionCodec;
@@ -37,6 +38,7 @@
import org.apache.hadoop.mapred.Reporter;
import org.apache.pig.LoadFunc;
import org.apache.pig.data.Tuple;
+import org.apache.pig.data.TupleFactory;
import org.apache.pig.impl.PigContext;
import org.apache.pig.impl.eval.EvalSpec;
import org.apache.pig.impl.io.BufferedPositionedInputStream;
@@ -45,7 +47,7 @@
import org.apache.tools.bzip2r.CBZip2InputStream;
-public class PigInputFormat implements InputFormat, JobConfigurable {
+public class PigInputFormat implements InputFormat<Text, Tuple>, JobConfigurable {
public InputSplit[] getSplits(JobConf job, int numSplits) throws IOException {
@@ -87,15 +89,15 @@
//paths.add(path);
for (int j = 0; j < paths.size(); j++) {
Path fullPath = new Path(fs.getWorkingDirectory(), paths.get(j));
- if (fs.isDirectory(fullPath)) {
- Path children[] = fs.listPaths(fullPath);
+ if (fs.getFileStatus(fullPath).isDir()) {
+ FileStatus children[] = fs.listStatus(fullPath);
for(int k = 0; k < children.length; k++) {
- paths.add(children[k]);
+ paths.add(children[k].getPath());
}
continue;
}
- long bs = fs.getBlockSize(fullPath);
- long size = fs.getLength(fullPath);
+ long bs = fs.getFileStatus(fullPath).getBlockSize();
+ long size = fs.getFileStatus(fullPath).getLen();
long pos = 0;
String name = paths.get(j).getName();
if (name.endsWith(".gz")) {
@@ -114,7 +116,7 @@
return splits.toArray(new PigSplit[splits.size()]);
}
- public RecordReader getRecordReader(InputSplit split, JobConf job, Reporter reporter) throws IOException {
+ public RecordReader<Text, Tuple> getRecordReader(InputSplit split, JobConf job, Reporter reporter) throws IOException {
PigRecordReader r = new PigRecordReader(job, (PigSplit)split, compressionCodecs);
return r;
}
@@ -127,7 +129,7 @@
codecList = conf.get("io.compression.codecs", "none");
}
- public static class PigRecordReader implements RecordReader {
+ public static class PigRecordReader implements RecordReader<Text, Tuple> {
/**
* This is a tremendously ugly hack to get around the fact that mappers do not have access
* to their readers. We take advantage of the fact that RecordReader.next and Mapper.map is
@@ -146,6 +148,7 @@
LoadFunc loader;
CompressionCodecFactory compressionFactory;
JobConf job;
+ TupleFactory mTupleFactory = TupleFactory.getInstance();
PigRecordReader(JobConf job, PigSplit split, CompressionCodecFactory compressionFactory) throws IOException {
this.split = split;
@@ -182,15 +185,15 @@
public JobConf getJobConf(){
return job;
}
-
- public boolean next(Writable key, Writable value) throws IOException {
+
+ public boolean next(Text key, Tuple value) throws IOException {
Tuple t = loader.getNext();
if (t == null) {
return false;
}
- ((UTF8) key).set(split.getPath().getName());
- ((Tuple)value).copyFrom(t);
+ key.set(split.getPath().getName());
+ value.reference(t);
return true;
}
@@ -206,19 +209,19 @@
return split;
}
- public WritableComparable createKey() {
- return new UTF8();
+ public Text createKey() {
+ return new Text();
}
- public Writable createValue() {
- return new Tuple();
+ public Tuple createValue() {
+ return mTupleFactory.newTuple();
}
- public float getProgress() throws IOException {
- float progress = getPos() - split.getStart();
- float finish = split.getLength();
- return progress/finish;
- }
+ public float getProgress() throws IOException {
+ float progress = getPos() - split.getStart();
+ float finish = split.getLength();
+ return progress/finish;
+ }
}
public void validateInput(JobConf arg0) throws IOException {
Modified: incubator/pig/branches/types/src/org/apache/pig/impl/mapreduceExec/PigMapReduce.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/mapreduceExec/PigMapReduce.java?rev=614325&r1=614324&r2=614325&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/impl/mapreduceExec/PigMapReduce.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/impl/mapreduceExec/PigMapReduce.java Tue Jan 22 13:17:12 2008
@@ -36,9 +36,9 @@
import org.apache.hadoop.mapred.Reporter;
import org.apache.pig.data.BagFactory;
import org.apache.pig.data.DataBag;
-import org.apache.pig.data.Datum;
import org.apache.pig.data.IndexedTuple;
import org.apache.pig.data.Tuple;
+import org.apache.pig.data.TupleFactory;
import org.apache.pig.impl.PigContext;
import org.apache.pig.impl.eval.EvalSpec;
import org.apache.pig.impl.eval.StarSpec;
@@ -87,9 +87,10 @@
private int index;
private int inputCount;
private boolean isInner[];
- private File tmpdir;
+ // private File tmpdir;
private static PigContext pigContext = null;
ArrayList<PigRecordWriter> sideFileWriters = new ArrayList<PigRecordWriter>();
+ TupleFactory mTupleFactory = TupleFactory.getInstance();
/**
* This function is called in MapTask by Hadoop as the Mapper.run() method. We basically pull
@@ -100,9 +101,11 @@
PigMapReduce.reporter = reporter;
oc = output;
+ /*
tmpdir = new File(job.get("mapred.task.id"));
tmpdir.mkdirs();
BagFactory.init(tmpdir);
+ */
setupMapPipe(reporter);
@@ -125,10 +128,12 @@
PigMapReduce.reporter = reporter;
try {
+ /*
tmpdir = new File(job.get("mapred.task.id"));
tmpdir.mkdirs();
BagFactory.init(tmpdir);
+ */
oc = output;
if (evalPipe == null) {
@@ -136,22 +141,21 @@
}
DataBag[] bags = new DataBag[inputCount];
- Datum groupName = ((Tuple) key).getField(0);
- Tuple t = new Tuple(1 + inputCount);
- t.setField(0, groupName);
+ String groupName = (String)((Tuple) key).get(0);
+ Tuple t = mTupleFactory.newTuple(1 + inputCount);
+ t.set(0, groupName);
for (int i = 1; i < 1 + inputCount; i++) {
- bags[i - 1] =
- BagFactory.getInstance().getNewBag(Datum.DataType.TUPLE);
- t.setField(i, bags[i - 1]);
+ bags[i - 1] = BagFactory.getInstance().newDefaultBag();
+ t.set(i, bags[i - 1]);
}
while (values.hasNext()) {
IndexedTuple it = (IndexedTuple) values.next();
- t.getBagField(it.index + 1).add(it.toTuple());
+ ((DataBag)t.get(it.index + 1)).add(it.toTuple());
}
for (int i = 0; i < inputCount; i++) {
- if (isInner[i] && t.getBagField(1 + i).isEmpty())
+ if (isInner[i] && ((DataBag)t.get(1 + i)).size() == 0)
return;
}
@@ -300,14 +304,15 @@
}
@Override
- public void add(Datum d){
+ public void add(Object d){
try{
if (group == null) {
oc.collect(null, (Tuple)d);
} else {
- Datum[] groupAndTuple = LOCogroup.getGroupAndTuple(d);
+ Object[] groupAndTuple = LOCogroup.getGroupAndTuple(d);
// wrap group label in a tuple, so it becomes writable.
- oc.collect(new Tuple(groupAndTuple[0]), new IndexedTuple((Tuple)groupAndTuple[1], index));
+ oc.collect(mTupleFactory.newTuple(groupAndTuple[0]),
+ new IndexedTuple((Tuple)groupAndTuple[1], index));
}
}catch(IOException e){
throw new RuntimeException(e);
@@ -329,7 +334,7 @@
}
@Override
- public void add(Datum d){
+ public void add(Object d){
try{
//System.out.println("Adding " + d + " to reduce output");
oc.collect(null, (Tuple)d);
Modified: incubator/pig/branches/types/src/org/apache/pig/impl/mapreduceExec/PigSplit.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/mapreduceExec/PigSplit.java?rev=614325&r1=614324&r2=614325&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/impl/mapreduceExec/PigSplit.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/impl/mapreduceExec/PigSplit.java Tue Jan 22 13:17:12 2008
@@ -161,7 +161,9 @@
try{
return ois.readObject();
}catch (ClassNotFoundException cnfe){
- throw new IOException(cnfe);
+ IOException newE = new IOException(cnfe.getMessage());
+ newE.initCause(cnfe);
+ throw newE;
}
}
Modified: incubator/pig/branches/types/src/org/apache/pig/impl/mapreduceExec/SortPartitioner.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/mapreduceExec/SortPartitioner.java?rev=614325&r1=614324&r2=614325&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/impl/mapreduceExec/SortPartitioner.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/impl/mapreduceExec/SortPartitioner.java Tue Jan 22 13:17:12 2008
@@ -15,62 +15,66 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.pig.impl.mapreduceExec;
-
-import java.io.IOException;
-import java.io.InputStream;
-import java.util.ArrayList;
-import java.util.Arrays;
-
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.io.WritableComparable;
-import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapred.Partitioner;
+package org.apache.pig.impl.mapreduceExec;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.ArrayList;
+import java.util.Arrays;
+
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.io.WritableComparator;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.Partitioner;
import org.apache.pig.builtin.BinStorage;
import org.apache.pig.data.Tuple;
import org.apache.pig.impl.io.BufferedPositionedInputStream;
import org.apache.pig.impl.io.FileLocalizer;
-
-
-public class SortPartitioner implements Partitioner {
- Tuple[] quantiles;
-
- public int getPartition(WritableComparable key, Writable value,
- int numPartitions) {
- try{
- Tuple keyTuple = (Tuple)key;
- int index = Arrays.binarySearch(quantiles, keyTuple.getTupleField(0));
- if (index < 0)
- index = -index-1;
- return Math.min(index, numPartitions - 1);
- }catch(IOException e){
- throw new RuntimeException(e);
- }
- }
-
- public void configure(JobConf job) {
- String quantilesFile = job.get("pig.quantilesFile", "");
- if (quantilesFile.length() == 0)
- throw new RuntimeException("Sort paritioner used but no quantiles found");
-
- try{
- InputStream is = FileLocalizer.openDFSFile(quantilesFile,job);
- BinStorage loader = new BinStorage();
- loader.bindTo(quantilesFile, new BufferedPositionedInputStream(is), 0, Long.MAX_VALUE);
-
- Tuple t;
- ArrayList<Tuple> quantiles = new ArrayList<Tuple>();
-
- while(true){
- t = loader.getNext();
- if (t==null)
- break;
- quantiles.add(t);
- }
- this.quantiles = quantiles.toArray(new Tuple[0]);
- }catch (IOException e){
- throw new RuntimeException(e);
- }
- }
-
-}
+
+
+public class SortPartitioner implements Partitioner {
+ Tuple[] quantiles;
+ WritableComparator comparator;
+
+ public int getPartition(WritableComparable key, Writable value,
+ int numPartitions) {
+ try{
+ Tuple keyTuple = (Tuple)key;
+ int index = Arrays.binarySearch(quantiles, (Tuple)keyTuple.get(0), comparator);
+ if (index < 0)
+ index = -index-1;
+ return Math.min(index, numPartitions - 1);
+ }catch(IOException e){
+ throw new RuntimeException(e);
+ }
+ }
+
+ public void configure(JobConf job) {
+ String quantilesFile = job.get("pig.quantilesFile", "");
+ if (quantilesFile.length() == 0)
+ throw new RuntimeException("Sort paritioner used but no quantiles found");
+
+ try{
+ InputStream is = FileLocalizer.openDFSFile(quantilesFile,job);
+ BinStorage loader = new BinStorage();
+ loader.bindTo(quantilesFile, new BufferedPositionedInputStream(is), 0, Long.MAX_VALUE);
+
+ Tuple t;
+ ArrayList<Tuple> quantiles = new ArrayList<Tuple>();
+
+ while(true){
+ t = loader.getNext();
+ if (t==null)
+ break;
+ quantiles.add(t);
+ }
+ this.quantiles = quantiles.toArray(new Tuple[0]);
+ }catch (IOException e){
+ throw new RuntimeException(e);
+ }
+
+ comparator = job.getOutputKeyComparator();
+ }
+
+}
Modified: incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/IntermedResult.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/IntermedResult.java?rev=614325&r1=614324&r2=614325&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/IntermedResult.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/IntermedResult.java Tue Jan 22 13:17:12 2008
@@ -24,8 +24,8 @@
import org.apache.pig.PigServer.ExecType;
import org.apache.pig.builtin.BinStorage;
import org.apache.pig.data.DataBag;
+import org.apache.pig.data.BagFactory;
import org.apache.pig.data.Tuple;
-import org.apache.pig.data.Datum;
import org.apache.pig.impl.PigContext;
import org.apache.pig.impl.io.FileSpec;
import org.apache.pig.impl.io.PigFile;
@@ -59,7 +59,7 @@
public IntermedResult() {
executed = true;
- databag = new DataBag(Datum.DataType.TUPLE);
+ databag = BagFactory.getInstance().newDefaultBag();
}
public IntermedResult(DataBag bag) {
Modified: incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/MapreducePlanCompiler.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/MapreducePlanCompiler.java?rev=614325&r1=614324&r2=614325&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/MapreducePlanCompiler.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/MapreducePlanCompiler.java Tue Jan 22 13:17:12 2008
@@ -19,19 +19,29 @@
import java.io.IOException;
import java.util.ArrayList;
+import java.util.Comparator;
import java.util.Map;
+import java.util.Iterator;
+import org.apache.hadoop.io.WritableComparator;
import org.apache.pig.builtin.BinStorage;
+import org.apache.pig.data.Tuple;
import org.apache.pig.impl.PigContext;
+import org.apache.pig.impl.FunctionInstantiator;
import org.apache.pig.impl.builtin.FindQuantiles;
import org.apache.pig.impl.builtin.RandomSampleLoader;
+import org.apache.pig.impl.eval.BinCondSpec;
import org.apache.pig.impl.eval.ConstSpec;
import org.apache.pig.impl.eval.EvalSpec;
+import org.apache.pig.impl.eval.FilterSpec;
import org.apache.pig.impl.eval.FuncEvalSpec;
import org.apache.pig.impl.eval.GenerateSpec;
import org.apache.pig.impl.eval.ProjectSpec;
+import org.apache.pig.impl.eval.CompositeEvalSpec;
+import org.apache.pig.impl.eval.MapLookupSpec;
import org.apache.pig.impl.eval.SortDistinctSpec;
import org.apache.pig.impl.eval.StarSpec;
+import org.apache.pig.impl.eval.EvalSpecVisitor;
import org.apache.pig.impl.io.FileLocalizer;
import org.apache.pig.impl.io.FileSpec;
import org.apache.pig.impl.logicalLayer.LOCogroup;
@@ -194,9 +204,35 @@
} else { // push into "reduce" phase
- // use combiner, if amenable
- if (mro.toReduce == null && lo.getSpec().amenableToCombiner()) {
- //TODO
+ EvalSpec spec = lo.getSpec();
+
+ if (mro.toReduce == null && shouldCombine(spec)) {
+ // Push this spec into the combiner. But we also need to
+ // create a new spec with a changed expected projection to
+ // push into the reducer.
+
+ if (mro.toCombine != null) {
+ throw new AssertionError("Combiner already set.");
+ }
+ // mro.toCombine = spec;
+
+ // Now, we need to adjust the expected projection for the
+ // eval spec(s) we just pushed. Also, this will change the
+ // function to be the final instead of general instance.
+ EvalSpec newSpec = spec.copy(pigContext);
+ newSpec.visit(new ReduceAdjuster(pigContext));
+ mro.addReduceSpec(newSpec);
+
+ // Adjust the function name for the combine spec, to set it
+ // to the initial function instead of the general
+ // instance. Make a copy of the eval spec rather than
+ // adjusting the existing one, to prevent breaking the
+ // logical plan in case another physical plan is generated
+ // from it later.
+ EvalSpec combineSpec = spec.copy(pigContext);
+ combineSpec.visit(new CombineAdjuster());
+ mro.toCombine = combineSpec;
+
} else {
mro.addReduceSpec(lo.getSpec()); // otherwise, don't use combiner
}
@@ -271,7 +307,216 @@
sortJob.addReduceSpec(new GenerateSpec(ps));
sortJob.reduceParallelism = loSort.getRequestedParallelism();
+
+ String comparatorFuncName = loSort.getSortSpec().getComparatorName();
+ if (comparatorFuncName != null) {
+ try {
+ sortJob.userComparator =
+ (Class<WritableComparator>)Class.forName(comparatorFuncName);
+ } catch (ClassNotFoundException e) {
+ throw new RuntimeException("Unable to find user comparator " + comparatorFuncName, e);
+ }
+ }
+
return sortJob;
}
-
+
+ private boolean shouldCombine(EvalSpec spec) {
+ // Determine whether this something we can combine or not.
+ // First, it must be a generate spec.
+ if (!(spec instanceof GenerateSpec)) {
+ return false;
+ }
+
+ GenerateSpec gen = (GenerateSpec)spec;
+
+ // Second, the first immediate child of the generate spec must be
+ // a project with a value of 0.
+ Iterator<EvalSpec> i = gen.getSpecs().iterator();
+ if (!i.hasNext()) return false;
+ EvalSpec s = i.next();
+ if (!(s instanceof ProjectSpec)) {
+ return false;
+ } else {
+ ProjectSpec p = (ProjectSpec)s;
+ if (p.numCols() > 1) return false;
+ else if (p.getCol() != 0) return false;
+ }
+
+ // Third, all subsequent immediate children of the generate spec
+ // must be func eval specs
+ while (i.hasNext()) {
+ s = i.next();
+ if (!(s instanceof FuncEvalSpec)) return false;
+ }
+
+ // Third, walk the entire tree of the generate spec and see if we
+ // can combine it.
+ CombineDeterminer cd = new CombineDeterminer();
+ gen.visit(cd);
+ return cd.useCombiner();
+ }
+
+ private class ReduceAdjuster extends EvalSpecVisitor {
+ private int position = 0;
+ FunctionInstantiator instantiator = null;
+
+ public ReduceAdjuster(FunctionInstantiator fi) {
+ instantiator = fi;
+ }
+
+ public void visitGenerate(GenerateSpec g) {
+ Iterator<EvalSpec> i = g.getSpecs().iterator();
+ for (position = 0; i.hasNext(); position++) {
+ i.next().visit(this);
+ }
+ }
+
+ public void visitFuncEval(FuncEvalSpec fe) {
+ // Need to replace our arg spec with a project of our position.
+ // DON'T visit our args, they're exactly what we're trying to
+ // lop off.
+ // The first ProjectSpec in the Composite is because the tuples
+ // will come out of the combiner in the form (groupkey,
+ // {(x, y, z)}). The second ProjectSpec contains the offset of
+ // the projection element we're interested in.
+ CompositeEvalSpec cs = new CompositeEvalSpec(new ProjectSpec(1));
+ cs.addSpec(new ProjectSpec(position));
+ fe.setArgs(new GenerateSpec(cs));
+
+
+ // Reset the function to call the final instance of itself
+ // instead of the general instance. Have to instantiate the
+ // function itself first so we can find out if it's algebraic
+ // or not.
+ try {
+ fe.instantiateFunc(instantiator);
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ fe.resetFuncToFinal();
+ }
+ }
+
+ private class CombineAdjuster extends EvalSpecVisitor {
+ private int position = 0;
+
+ //We don't want to be performing any flattening in the combiner since the column numbers in
+ //the reduce spec assume that there is no combiner. If the combiner performs flattening, the column
+ //numbers get messed up. For now, since combiner works only with generate group, func1(), func2(),...,
+ //it suffices to write visitors for those eval spec types.
+
+ public void visitFuncEval(FuncEvalSpec fe) {
+ // Reset the function to call the initial instance of itself
+ // instead of the general instance.
+ fe.resetFuncToInitial();
+ fe.setFlatten(false);
+ }
+
+
+ @Override
+ public void visitProject(ProjectSpec p) {
+ p.setFlatten(false);
+ }
+
+
+ }
+
+ private class CombineDeterminer extends EvalSpecVisitor {
+ private class FuncCombinable extends EvalSpecVisitor {
+ public boolean combinable = true;
+
+ @Override
+ public void visitBinCond(BinCondSpec bc) {
+ combinable = false;
+ }
+
+ @Override
+ public void visitFilter(FilterSpec bc) {
+ combinable = false;
+ }
+
+ @Override
+ public void visitFuncEval(FuncEvalSpec bc) {
+ combinable = false;
+ }
+
+ @Override
+ public void visitSortDistinct(SortDistinctSpec bc) {
+ combinable = false;
+ }
+ };
+
+ private int shouldCombine = 0;
+
+ public boolean useCombiner() {
+ return shouldCombine > 0;
+ }
+
+ @Override
+ public void visitBinCond(BinCondSpec bc) {
+ // TODO Could be true if both are true. But the logic in
+ // CombineAdjuster and ReduceAdjuster don't know how to handle
+ // binconds, so just do false for now.
+ shouldCombine = -1;
+ }
+
+ @Override
+ public void visitCompositeEval(CompositeEvalSpec ce) {
+ // If we've already determined we're not combinable, stop.
+ if (shouldCombine < 0) return;
+
+ for (EvalSpec spec: ce.getSpecs()) {
+ spec.visit(this);
+ }
+ }
+
+ // ConstSpec is a NOP, as it neither will benefit from nor
+ // prevents combinability.
+
+ @Override
+ public void visitFilter(FilterSpec f) {
+ shouldCombine = -1;
+ }
+
+ @Override
+ public void visitFuncEval(FuncEvalSpec fe) {
+ // Check the functions arguments, to make sure they are
+ // combinable.
+ FuncCombinable fc = new FuncCombinable();
+ fe.getArgs().visit(fc);
+ if (!fc.combinable) {
+ shouldCombine = -1;
+ return;
+ }
+
+ if (fe.combinable()) shouldCombine = 1;
+ else shouldCombine = -1;
+ }
+
+ @Override
+ public void visitGenerate(GenerateSpec g) {
+ // If we've already determined we're not combinable, stop.
+ if (shouldCombine < 0) return;
+
+ for (EvalSpec spec: g.getSpecs()) {
+ spec.visit(this);
+ }
+ }
+
+ // MapLookupSpec is a NOP, as it neither will benefit from nor
+ // prevents combinability.
+
+ // ProjectSpec is a NOP, as it neither will benefit from nor
+ // prevents combinability.
+
+ @Override
+ public void visitSortDistinct(SortDistinctSpec sd) {
+ shouldCombine = -1;
+ }
+
+ // StarSpec is a NOP, as it neither will benefit from nor
+ // prevents combinability.
+ }
+
}