You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by ga...@apache.org on 2008/05/19 17:15:20 UTC
svn commit: r657848 - in /incubator/pig/branches/types: ./
src/org/apache/pig/ src/org/apache/pig/builtin/ src/org/apache/pig/data/
src/org/apache/pig/impl/logicalLayer/
Author: gates
Date: Mon May 19 08:15:19 2008
New Revision: 657848
URL: http://svn.apache.org/viewvc?rev=657848&view=rev
Log:
Fixed a few TODO FIXs, added DOUBLE to isAtomic in DataType.
Modified:
incubator/pig/branches/types/build.xml
incubator/pig/branches/types/src/org/apache/pig/PigServer.java
incubator/pig/branches/types/src/org/apache/pig/builtin/ARITY.java
incubator/pig/branches/types/src/org/apache/pig/builtin/AVG.java
incubator/pig/branches/types/src/org/apache/pig/builtin/COUNT.java
incubator/pig/branches/types/src/org/apache/pig/builtin/MAX.java
incubator/pig/branches/types/src/org/apache/pig/builtin/MIN.java
incubator/pig/branches/types/src/org/apache/pig/builtin/SUM.java
incubator/pig/branches/types/src/org/apache/pig/data/DataType.java
incubator/pig/branches/types/src/org/apache/pig/data/DefaultAbstractBag.java
incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOPrinter.java
incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOProject.java
incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOVisitor.java
incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LogicalPlanBuilder.java
Modified: incubator/pig/branches/types/build.xml
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/build.xml?rev=657848&r1=657847&r2=657848&view=diff
==============================================================================
--- incubator/pig/branches/types/build.xml (original)
+++ incubator/pig/branches/types/build.xml Mon May 19 08:15:19 2008
@@ -134,6 +134,7 @@
<target name="compile-sources">
<javac encoding="${build.encoding}" srcdir="${sources}"
includes="**/plan/*.java, **/plan/optimizer/*.java, **/data/*.java, **/pig/builtin/*.java,
+ **/impl/io/*.java, **/impl/mapReduceLayer/*.java,
**/test/utils/*.java, **/test/TestOperatorPlan.java, **/test/TestBuiltin.java,
**/test/TestConstExpr.java, **/test/TestFilter.java, **/test/TestPhyOp.java,
**/test/TestAdd.java, **/test/TestSubtract.java, **/test/TestMultiply.java,
Modified: incubator/pig/branches/types/src/org/apache/pig/PigServer.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/PigServer.java?rev=657848&r1=657847&r2=657848&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/PigServer.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/PigServer.java Mon May 19 08:15:19 2008
@@ -257,7 +257,6 @@
LogicalPlan readFrom = (LogicalPlan) aliases.get(id);
- // TODO FIX Make this work
try {
ExecPhysicalPlan pp =
pigContext.getExecutionEngine().compile(readFrom, null);
Modified: incubator/pig/branches/types/src/org/apache/pig/builtin/ARITY.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/builtin/ARITY.java?rev=657848&r1=657847&r2=657848&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/builtin/ARITY.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/builtin/ARITY.java Mon May 19 08:15:19 2008
@@ -20,6 +20,7 @@
import java.io.IOException;
import org.apache.pig.EvalFunc;
+import org.apache.pig.data.DataType;
import org.apache.pig.data.Tuple;
import org.apache.pig.impl.logicalLayer.schema.Schema;
@@ -32,8 +33,6 @@
@Override
public Schema outputSchema(Schema input) {
- // TODO FIX
- // return new AtomSchema("arity");
- return null;
+ return new Schema(new Schema.FieldSchema(null, DataType.INTEGER));
}
}
Modified: incubator/pig/branches/types/src/org/apache/pig/builtin/AVG.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/builtin/AVG.java?rev=657848&r1=657847&r2=657848&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/builtin/AVG.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/builtin/AVG.java Mon May 19 08:15:19 2008
@@ -162,9 +162,7 @@
@Override
public Schema outputSchema(Schema input) {
- // TODO FIX
- // return new AtomSchema("average");
- return null;
+ return new Schema(new Schema.FieldSchema(null, DataType.DOUBLE));
}
}
Modified: incubator/pig/branches/types/src/org/apache/pig/builtin/COUNT.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/builtin/COUNT.java?rev=657848&r1=657847&r2=657848&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/builtin/COUNT.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/builtin/COUNT.java Mon May 19 08:15:19 2008
@@ -127,9 +127,7 @@
@Override
public Schema outputSchema(Schema input) {
- // TODO FIX
- // return new AtomSchema("count" + count++);
- return null;
+ return new Schema(new Schema.FieldSchema(null, DataType.LONG));
}
private static int count = 1;
Modified: incubator/pig/branches/types/src/org/apache/pig/builtin/MAX.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/builtin/MAX.java?rev=657848&r1=657847&r2=657848&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/builtin/MAX.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/builtin/MAX.java Mon May 19 08:15:19 2008
@@ -113,9 +113,7 @@
@Override
public Schema outputSchema(Schema input) {
- // TODO FIX
- // return new AtomSchema("max" + count++);
- return null;
+ return new Schema(new Schema.FieldSchema(null, DataType.DOUBLE));
}
private static int count = 1;
Modified: incubator/pig/branches/types/src/org/apache/pig/builtin/MIN.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/builtin/MIN.java?rev=657848&r1=657847&r2=657848&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/builtin/MIN.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/builtin/MIN.java Mon May 19 08:15:19 2008
@@ -109,9 +109,7 @@
@Override
public Schema outputSchema(Schema input) {
- // TODO FIX
- // return new AtomSchema("min" + count++);
- return null;
+ return new Schema(new Schema.FieldSchema(null, DataType.DOUBLE));
}
private static int count = 1;
Modified: incubator/pig/branches/types/src/org/apache/pig/builtin/SUM.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/builtin/SUM.java?rev=657848&r1=657847&r2=657848&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/builtin/SUM.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/builtin/SUM.java Mon May 19 08:15:19 2008
@@ -112,9 +112,7 @@
@Override
public Schema outputSchema(Schema input) {
- // TODO FIX
- // return new AtomSchema("sum" + count++);
- return null;
+ return new Schema(new Schema.FieldSchema(null, DataType.DOUBLE));
}
private static int count = 1;
Modified: incubator/pig/branches/types/src/org/apache/pig/data/DataType.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/data/DataType.java?rev=657848&r1=657847&r2=657848&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/data/DataType.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/data/DataType.java Mon May 19 08:15:19 2008
@@ -426,7 +426,8 @@
public static boolean isAtomic(byte dataType) {
return ((dataType == BYTEARRAY) || (dataType == CHARARRAY) ||
(dataType == INTEGER) || (dataType == LONG) ||
- (dataType == FLOAT) || (dataType == BOOLEAN));
+ (dataType == FLOAT) || (dataType == DOUBLE) ||
+ (dataType == FLOAT));
}
/**
Modified: incubator/pig/branches/types/src/org/apache/pig/data/DefaultAbstractBag.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/data/DefaultAbstractBag.java?rev=657848&r1=657847&r2=657848&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/data/DefaultAbstractBag.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/data/DefaultAbstractBag.java Mon May 19 08:15:19 2008
@@ -348,7 +348,8 @@
* Report progress to HDFS.
*/
protected void reportProgress() {
- // TODO FIX
+ // TODO FIX Need to hook this into the progress reporting
+ // infrastructure once Shravan finishs that.
/*
if (PigMapReduce.reporter != null) {
PigMapReduce.reporter.progress();
Modified: incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOPrinter.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOPrinter.java?rev=657848&r1=657847&r2=657848&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOPrinter.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOPrinter.java Mon May 19 08:15:19 2008
@@ -18,12 +18,15 @@
package org.apache.pig.impl.logicalLayer;
import java.util.ArrayList;
-import java.util.List;
import java.util.Iterator;
+import java.util.List;
import java.io.PrintStream;
+import org.apache.pig.data.DataType;
+import org.apache.pig.impl.logicalLayer.FrontendException;
import org.apache.pig.impl.logicalLayer.schema.Schema;
import org.apache.pig.impl.plan.DepthFirstWalker;
+import org.apache.pig.impl.plan.MultiMap;
import org.apache.pig.impl.plan.VisitorException;
/**
@@ -31,40 +34,194 @@
*/
public class LOPrinter extends LOVisitor {
- public LOPrinter(LogicalPlan plan) {
+ private PrintStream mStream = null;
+ private int mIndent = 0;
+
+ /**
+ * @param ps PrintStream to output plan information to
+ * @param plan Logical plan to print
+ */
+ public LOPrinter(PrintStream ps, LogicalPlan plan) {
super(plan, new DepthFirstWalker(plan));
}
- // TODO FIX
- /*
- private PrintStream mStream = null;
-
- public LOPrinter(PrintStream ps) {
- mStream = ps;
+ public void visit(LOAdd a) throws VisitorException {
+ visitBinary(a, "+");
}
- /**
- * Only LOCogroup.visit() should ever call this method.
- */
- /*
- @Override
- public void visitCogroup(LOCogroup g) {
- print(g, g.name(), g.getSpecs());
- super.visitCogroup(g);
+ public void visit(LOAnd a) throws VisitorException {
+ visitBinary(a, "AND");
}
-
- /**
- * Only LOEval.visit() should ever call this method.
- */
- /*
- @Override
- public void visitEval(LOEval e) {
- List<EvalSpec> ls = new ArrayList<EvalSpec>();
- ls.add(e.getSpec());
- print(e, e.name(), ls);
- super.visitEval(e);
+
+ public void visit(LOBinCond bc) throws VisitorException {
+ print(bc);
+ mStream.print(" COND: (");
+ bc.getCond().visit(this);
+ mStream.print(") TRUE: (");
+ bc.getLhsOp().visit(this);
+ mStream.print(") FALSE (");
+ bc.getRhsOp().visit(this);
+ mStream.print(")");
+ }
+
+ public void visit(LOCogroup g) throws VisitorException {
+ print(g);
+ mStream.print("GROUP BY PLANS:");
+ MultiMap<LogicalOperator, LogicalPlan> plans = g.getGroupByPlans();
+ for (LogicalOperator lo : plans.keySet()) {
+ // Visit the associated plans
+ for (LogicalPlan plan : plans.get(lo)) {
+ mIndent++;
+ pushWalker(new DepthFirstWalker(plan));
+ visit();
+ popWalker();
+ mIndent--;
+ }
+ mStream.println();
+ }
+ // Visit input operators
+ for (LogicalOperator lo : plans.keySet()) {
+ // Visit the operator
+ lo.visit(this);
+ }
}
+ public void visit(LOConst c) throws VisitorException {
+ print(c);
+ mStream.print(" VALUE (" + c.getValue() + ")");
+ }
+
+ public void visit(LOCross c) throws VisitorException {
+ print(c);
+ mStream.println();
+ super.visit(c);
+ }
+
+ public void visit(LODistinct d) throws VisitorException {
+ print(d);
+ mStream.println();
+ super.visit(d);
+ }
+
+ public void visit(LODivide d) throws VisitorException {
+ visitBinary(d, "/");
+ }
+
+ public void visit(LOEqual e) throws VisitorException {
+ visitBinary(e, "==");
+ }
+
+ public void visit(LOFilter f) throws VisitorException {
+ print(f);
+ mStream.print(" COMP: ");
+ mIndent++;
+ pushWalker(new DepthFirstWalker(f.getComparisonPlan()));
+ visit();
+ mIndent--;
+ mStream.println();
+ f.getInput().visit(this);
+ }
+
+ public void visit(LOForEach f) throws VisitorException {
+ print(f);
+ mStream.print(" PLAN: ");
+ mIndent++;
+ pushWalker(new DepthFirstWalker(f.getForEachPlan()));
+ visit();
+ mIndent--;
+ mStream.println();
+ // Visit our input
+ mPlan.getPredecessors((LogicalOperator)f).get(0).visit(this);
+ }
+
+ public void visit(LOGreaterThan gt) throws VisitorException {
+ visitBinary(gt, ">");
+ }
+
+ public void visit(LOGreaterThanEqual gte) throws VisitorException {
+ visitBinary(gte, ">=");
+ }
+
+ public void visit(LOLesserThan lt) throws VisitorException {
+ visitBinary(lt, "<");
+ }
+
+ public void visit(LOLesserThanEqual lte) throws VisitorException {
+ visitBinary(lte, "<=");
+ }
+
+ public void visit(LOLoad load) throws VisitorException {
+ print(load);
+ mStream.print(" FILE: " + load.getInputFile().getFileName());
+ mStream.print(" FUNC: " + load.getLoadFunc().getClass().getName());
+ mStream.println();
+ }
+
+ public void visit(LOMapLookup mlu) throws VisitorException {
+ print(mlu);
+ mStream.print("(");
+ mlu.getMap().visit(this);
+ mStream.print(")# " + mlu.getKey());
+ }
+
+ public void visit(LOMod m) throws VisitorException {
+ visitBinary(m, "MOD");
+ }
+
+ public void visit(LOMultiply m) throws VisitorException {
+ visitBinary(m, "*");
+ }
+
+ public void visit(LONegative n) throws VisitorException {
+ visitUnary(n, "-");
+ }
+
+ public void visit(LONot n) throws VisitorException {
+ visitUnary(n, "NOT");
+ }
+
+ public void visit(LONotEqual ne) throws VisitorException {
+ visitBinary(ne, "!=");
+ }
+
+ public void visit(LOOr or) throws VisitorException {
+ visitBinary(or, "OR");
+ }
+
+ public void visit(LOProject p) throws VisitorException {
+ print(p);
+ if (p.isStar()) {
+ mStream.print(" ALL ");
+ } else {
+ List<Integer> cols = p.getProjection();
+ mStream.print(" COL");
+ if (cols.size() > 1) mStream.print("S");
+ mStream.print(" (");
+ for (int i = 0; i < cols.size(); i++) {
+ if (i > 0) mStream.print(", ");
+ mStream.print(cols.get(i));
+ }
+ mStream.print(")");
+ }
+ mStream.print(" FROM ");
+ if (p.getSentinel()) {
+ // This project is connected to some other relation, don't follow
+ // that path or we'll cycle in the graph.
+ p.getExpression().name();
+ } else {
+ mIndent++;
+ p.getExpression().visit(this);
+ mIndent--;
+ }
+ }
+
+ public void visit(LORegexp r) throws VisitorException {
+ print(r);
+ mStream.print(" REGEX (" + r.getRegexp() + ") LOOKING IN (");
+ r.getOperand().visit(this);
+ mStream.print(")");
+ }
+
/**
* Only LOUnion.visit() should ever call this method.
*/
@@ -76,16 +233,6 @@
}
/**
- * Only LOLoad.visit() should ever call this method.
- */
- /*
- @Override
- public void visitLoad(LOLoad l) {
- print(l, l.name());
- super.visitLoad(l);
- }
-
- /**
* Only LOSort.visit() should ever call this method.
*/
/*
@@ -121,55 +268,67 @@
List<EvalSpec> empty = new ArrayList<EvalSpec>();
print(lo, name, empty);
}
+ */
- private void print(
- LogicalOperator lo,
- String name,
- List<EvalSpec> specs) {
- mStream.println(name);
- mStream.println("Object id: " + lo.hashCode());
- mStream.print("Inputs: ");
- List<OperatorKey> inputs = lo.getInputs();
- Iterator<OperatorKey> i = inputs.iterator();
- while (i.hasNext()) {
- LogicalOperator input = lo.getOpTable().get(i.next());
- mStream.print(input.hashCode() + " ");
- }
- mStream.println();
-
- mStream.print("Schema: ");
- printSchema(lo.outputSchema(), 0);
- mStream.println();
-
- mStream.println("EvalSpecs:");
- //printSpecs(specs);
- Iterator<EvalSpec> j = specs.iterator();
- while (j.hasNext()) {
- j.next().visit(new EvalSpecPrinter(mStream));
+ private void visitBinary(
+ BinaryExpressionOperator b,
+ String op) throws VisitorException {
+ print(b);
+ mStream.print(" (");
+ b.getLhsOperand().visit(this);
+ mStream.print(") " + op + " (");
+ b.getRhsOperand().visit(this);
+ mStream.print(") ");
+ }
+
+ private void visitUnary(
+ UnaryExpressionOperator e,
+ String op) throws VisitorException {
+ print(e);
+ mStream.print(op + " (");
+ e.getOperand().visit(this);
+ mStream.print(") ");
+ }
+
+ private void print(LogicalOperator lo) {
+ for (int i = 0; i < mIndent; i++) mStream.print(" ");
+
+ printName(lo);
+
+ if (!(lo instanceof ExpressionOperator)) {
+ mStream.print("Inputs: ");
+ for (LogicalOperator predecessor : mPlan.getPredecessors(lo)) {
+ printName(predecessor);
+ }
+ mStream.print("Schema: ");
+ try {
+ printSchema(lo.getSchema());
+ } catch (FrontendException fe) {
+ // ignore it, nothing we can do
+ mStream.print("()");
+ }
}
+ mStream.print(" : ");
}
- private void printSchema(Schema schema, int pos) {
- if (schema instanceof AtomSchema) {
- String a = schema.getAlias();
- if (a == null) mStream.print("$" + pos);
- else mStream.print(a);
- } else if (schema instanceof TupleSchema) {
- mStream.print("(");
- TupleSchema ts = (TupleSchema)schema;
- int sz = ts.numFields();
- for (int j = 0; j < sz; j++) {
- if (j != 0) mStream.print(", ");
- Schema s = ts.schemaFor(j);
- if (s == null) mStream.print("$" + j);
- else printSchema(s, j);
+ private void printName(LogicalOperator lo) {
+ mStream.println(lo.name() + " key(" + lo.getOperatorKey().scope +
+ ", " + lo.getOperatorKey().id + ") ");
+ }
+
+ private void printSchema(Schema schema) {
+ mStream.print("(");
+ for (Schema.FieldSchema fs : schema.getFields()) {
+ if (fs.alias != null) mStream.print(fs.alias + ": ");
+ mStream.print(DataType.findTypeName(fs.type));
+ if (fs.schema != null) {
+ if (fs.type == DataType.BAG) mStream.print("{");
+ printSchema(fs.schema);
+ if (fs.type == DataType.BAG) mStream.print("}");
}
- mStream.print(")");
- } else {
- throw new AssertionError("Unknown schema type.");
}
+ mStream.print(")");
}
- */
}
Modified: incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOProject.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOProject.java?rev=657848&r1=657847&r2=657848&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOProject.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOProject.java Mon May 19 08:15:19 2008
@@ -115,6 +115,10 @@
return mExp;
}
+ public boolean isStar() {
+ return mIsStar;
+ }
+
public List<Integer> getProjection() {
return mProjection;
}
Modified: incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOVisitor.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOVisitor.java?rev=657848&r1=657847&r2=657848&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOVisitor.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOVisitor.java Mon May 19 08:15:19 2008
@@ -120,6 +120,9 @@
for(LogicalOperator op: cg.getInputs()) {
for(LogicalPlan lp: mapGByPlans.get(op)) {
if (null != lp) {
+ // TODO FIX - How do we know this should be a
+ // DependencyOrderWalker? We should be replicating the
+ // walker the current visitor is using.
PlanWalker w = new DependencyOrderWalker(lp);
pushWalker(w);
for(LogicalOperator logicalOp: lp.getRoots()) {
Modified: incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LogicalPlanBuilder.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LogicalPlanBuilder.java?rev=657848&r1=657847&r2=657848&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LogicalPlanBuilder.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LogicalPlanBuilder.java Mon May 19 08:15:19 2008
@@ -17,8 +17,6 @@
*/
package org.apache.pig.impl.logicalLayer;
-// TODO FIX
-
import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.util.Map;
@@ -33,7 +31,6 @@
*
*/
public class LogicalPlanBuilder {
- // TODO FIX
public static ClassLoader classloader = LogicalPlanBuilder.class.getClassLoader();
private PigContext pigContext;