You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by ga...@apache.org on 2007/11/13 21:40:08 UTC
svn commit: r594634 - in
/incubator/pig/trunk/src/org/apache/pig/impl/logicalLayer: LOCogroup.java
LOEval.java LOLoad.java LORead.java LOSort.java LOSplit.java LOStore.java
LOUnion.java LogicalOperator.java
Author: gates
Date: Tue Nov 13 12:40:07 2007
New Revision: 594634
URL: http://svn.apache.org/viewvc?rev=594634&view=rev
Log:
Cleaned up formatting in LogicalOperator tree.
Modified:
incubator/pig/trunk/src/org/apache/pig/impl/logicalLayer/LOCogroup.java
incubator/pig/trunk/src/org/apache/pig/impl/logicalLayer/LOEval.java
incubator/pig/trunk/src/org/apache/pig/impl/logicalLayer/LOLoad.java
incubator/pig/trunk/src/org/apache/pig/impl/logicalLayer/LORead.java
incubator/pig/trunk/src/org/apache/pig/impl/logicalLayer/LOSort.java
incubator/pig/trunk/src/org/apache/pig/impl/logicalLayer/LOSplit.java
incubator/pig/trunk/src/org/apache/pig/impl/logicalLayer/LOStore.java
incubator/pig/trunk/src/org/apache/pig/impl/logicalLayer/LOUnion.java
incubator/pig/trunk/src/org/apache/pig/impl/logicalLayer/LogicalOperator.java
Modified: incubator/pig/trunk/src/org/apache/pig/impl/logicalLayer/LOCogroup.java
URL: http://svn.apache.org/viewvc/incubator/pig/trunk/src/org/apache/pig/impl/logicalLayer/LOCogroup.java?rev=594634&r1=594633&r2=594634&view=diff
==============================================================================
--- incubator/pig/trunk/src/org/apache/pig/impl/logicalLayer/LOCogroup.java (original)
+++ incubator/pig/trunk/src/org/apache/pig/impl/logicalLayer/LOCogroup.java Tue Nov 13 12:40:07 2007
@@ -30,140 +30,150 @@
-public class LOCogroup extends LogicalOperator{
- private static final long serialVersionUID = 1L;
-
- protected ArrayList<EvalSpec> specs;
-
- public LOCogroup(List<LogicalOperator> inputs, ArrayList<EvalSpec> specs) {
- super(inputs);
+public class LOCogroup extends LogicalOperator {
+ private static final long serialVersionUID = 1L;
+
+ protected ArrayList<EvalSpec> specs;
+
+ public LOCogroup(List<LogicalOperator> inputs,
+ ArrayList<EvalSpec> specs) {
+ super(inputs);
this.specs = specs;
getOutputType();
}
-
- @Override
- public String name() {
- return "CoGroup";
- }
- @Override
- public String arguments() {
- StringBuffer sb = new StringBuffer();
-
+
+ @Override
+ public String name() {
+ return "CoGroup";
+ }
+ @Override
+ public String arguments() {
+ StringBuffer sb = new StringBuffer();
+
for (int i = 0; i < specs.size(); i++) {
sb.append(specs.get(i));
- if (i+1 < specs.size()) sb.append(", ");
+ if (i + 1 < specs.size())
+ sb.append(", ");
}
-
+
return sb.toString();
- }
-
- public static Datum[] getGroupAndTuple(Datum d){
- if (!(d instanceof Tuple)){
- throw new RuntimeException("Internal Error: Evaluation of group expression did not return a tuple");
- }
- Tuple output = (Tuple)d;
- if (output.arity() < 2){
- throw new RuntimeException("Internal Error: Evaluation of group expression returned a tuple with <2 fields");
- }
-
- Datum[] groupAndTuple = new Datum[2];
- try{
- if (output.arity() == 2){
- groupAndTuple[0] = output.getField(0);
- groupAndTuple[1] = output.getField(1);
- }else{
- Tuple group = new Tuple();
- for (int j=0; j<output.arity()-1; j++){
- group.appendField(output.getField(j));
- }
- groupAndTuple[0] = group;
- groupAndTuple[1] = output.getField(output.arity()-1);
- }
- }catch (IOException e){
- throw new RuntimeException(e);
- }
- return groupAndTuple;
- }
-
+ }
+
+ public static Datum[] getGroupAndTuple(Datum d) {
+ if (!(d instanceof Tuple)) {
+ throw new RuntimeException
+ ("Internal Error: Evaluation of group expression did not return a tuple");
+ }
+ Tuple output = (Tuple) d;
+ if (output.arity() < 2) {
+ throw new RuntimeException
+ ("Internal Error: Evaluation of group expression returned a tuple with <2 fields");
+ }
+
+ Datum[]groupAndTuple = new Datum[2];
+ try {
+ if (output.arity() == 2) {
+ groupAndTuple[0] = output.getField(0);
+ groupAndTuple[1] = output.getField(1);
+ } else {
+ Tuple group = new Tuple();
+ for (int j = 0; j < output.arity() - 1; j++) {
+ group.appendField(output.getField(j));
+ }
+ groupAndTuple[0] = group;
+ groupAndTuple[1] = output.getField(output.arity() - 1);
+ }
+ } catch(IOException e) {
+ throw new RuntimeException(e);
+ }
+ return groupAndTuple;
+ }
+
@Override
- public TupleSchema outputSchema() {
- if (schema == null){
- schema = new TupleSchema();
-
-
- Schema groupElementSchema = specs.get(0).getOutputSchemaForPipe(getInputs().get(0).outputSchema());
- if (groupElementSchema == null){
- groupElementSchema = new TupleSchema();
- groupElementSchema.setAlias("group");
- }else{
-
- if (!(groupElementSchema instanceof TupleSchema))
- throw new RuntimeException("Internal Error: Schema of group expression was atomic");
- List<Schema> fields = ((TupleSchema)groupElementSchema).getFields();
-
- if (fields.size() < 2)
- throw new RuntimeException("Internal Error: Schema of group expression retured <2 fields");
-
- if (fields.size() == 2){
- groupElementSchema = fields.get(0);
- groupElementSchema.removeAllAliases();
- groupElementSchema.setAlias("group");
- }else{
- groupElementSchema = new TupleSchema();
- groupElementSchema.setAlias("group");
-
- for (int i=0; i<fields.size()-1; i++){
- ((TupleSchema)groupElementSchema).add(fields.get(i));
- }
- }
-
- }
-
- schema.add(groupElementSchema);
-
- for (LogicalOperator lo : getInputs()) {
- TupleSchema inputSchema = lo.outputSchema();
- if (inputSchema == null)
- inputSchema = new TupleSchema();
- schema.add(inputSchema);
- }
- }
-
- schema.setAlias(alias);
+ public TupleSchema outputSchema() {
+ if (schema == null) {
+ schema = new TupleSchema();
+
+
+ Schema groupElementSchema =
+ specs.get(0).getOutputSchemaForPipe(getInputs().get(0).
+ outputSchema());
+ if (groupElementSchema == null) {
+ groupElementSchema = new TupleSchema();
+ groupElementSchema.setAlias("group");
+ } else {
+
+ if (!(groupElementSchema instanceof TupleSchema))
+ throw new RuntimeException
+ ("Internal Error: Schema of group expression was atomic");
+ List<Schema> fields =
+ ((TupleSchema) groupElementSchema).getFields();
+
+ if (fields.size() < 2)
+ throw new RuntimeException
+ ("Internal Error: Schema of group expression retured <2 fields");
+
+ if (fields.size() == 2) {
+ groupElementSchema = fields.get(0);
+ groupElementSchema.removeAllAliases();
+ groupElementSchema.setAlias("group");
+ } else {
+ groupElementSchema = new TupleSchema();
+ groupElementSchema.setAlias("group");
+
+ for (int i = 0; i < fields.size() - 1; i++) {
+ ((TupleSchema) groupElementSchema).add(fields.get(i));
+ }
+ }
+
+ }
+
+ schema.add(groupElementSchema);
+
+ for (LogicalOperator lo:getInputs()) {
+ TupleSchema inputSchema = lo.outputSchema();
+ if (inputSchema == null)
+ inputSchema = new TupleSchema();
+ schema.add(inputSchema);
+ }
+ }
+
+ schema.setAlias(alias);
return schema;
}
-
+
@Override
- public int getOutputType(){
- int outputType = FIXED;
- for (int i=0; i<getInputs().size(); i++){
- switch (getInputs().get(i).getOutputType()){
- case FIXED:
- continue;
- case MONOTONE:
- outputType = AMENDABLE;
- break;
- case AMENDABLE:
- default:
- throw new RuntimeException("Can't feed a cogroup into another in the streaming case");
- }
- }
- return outputType;
+ public int getOutputType() {
+ int outputType = FIXED;
+ for (int i = 0; i < getInputs().size(); i++) {
+ switch (getInputs().get(i).getOutputType()) {
+ case FIXED:
+ continue;
+ case MONOTONE:
+ outputType = AMENDABLE;
+ break;
+ case AMENDABLE:
+ default:
+ throw new RuntimeException
+ ("Can't feed a cogroup into another in the streaming case");
+ }
+ }
+ return outputType;
}
@Override
- public List<String> getFuncs() {
+ public List<String> getFuncs() {
List<String> funcs = super.getFuncs();
- for (EvalSpec spec: specs) {
+ for (EvalSpec spec:specs) {
funcs.addAll(spec.getFuncs());
}
return funcs;
}
- public ArrayList<EvalSpec> getSpecs() {
- return specs;
- }
+ public ArrayList<EvalSpec> getSpecs() {
+ return specs;
+ }
+
-
}
Modified: incubator/pig/trunk/src/org/apache/pig/impl/logicalLayer/LOEval.java
URL: http://svn.apache.org/viewvc/incubator/pig/trunk/src/org/apache/pig/impl/logicalLayer/LOEval.java?rev=594634&r1=594633&r2=594634&view=diff
==============================================================================
--- incubator/pig/trunk/src/org/apache/pig/impl/logicalLayer/LOEval.java (original)
+++ incubator/pig/trunk/src/org/apache/pig/impl/logicalLayer/LOEval.java Tue Nov 13 12:40:07 2007
@@ -24,34 +24,36 @@
-public class LOEval extends LogicalOperator{
- private static final long serialVersionUID = 1L;
-
- protected EvalSpec spec;
+public class LOEval extends LogicalOperator {
+ private static final long serialVersionUID = 1L;
+
+ protected EvalSpec spec;
public LOEval(LogicalOperator input, EvalSpec specIn) {
- super(input);
+ super(input);
spec = specIn;
getOutputType();
}
@Override
- public String name() {
+ public String name() {
return "Eval";
}
@Override
- public String arguments() {
+ public String arguments() {
return spec.toString();
}
@Override
- public TupleSchema outputSchema() {
+ public TupleSchema outputSchema() {
if (schema == null) {
//System.out.println("LOEval input: " + inputs[0].outputSchema());
//System.out.println("LOEval spec: " + spec);
- schema = (TupleSchema)spec.getOutputSchemaForPipe(getInputs().get(0).outputSchema());
-
+ schema =
+ (TupleSchema) spec.getOutputSchemaForPipe(getInputs().get(0).
+ outputSchema());
+
//System.out.println("LOEval output: " + schema);
}
schema.setAlias(alias);
@@ -59,7 +61,7 @@
}
@Override
- public int getOutputType() {
+ public int getOutputType() {
switch (getInputs().get(0).getOutputType()) {
case FIXED:
return FIXED;
@@ -72,13 +74,13 @@
}
@Override
- public List<String> getFuncs() {
+ public List<String> getFuncs() {
List<String> funcs = super.getFuncs();
funcs.addAll(spec.getFuncs());
return funcs;
}
- public EvalSpec getSpec() {
- return spec;
- }
+ public EvalSpec getSpec() {
+ return spec;
+ }
}
Modified: incubator/pig/trunk/src/org/apache/pig/impl/logicalLayer/LOLoad.java
URL: http://svn.apache.org/viewvc/incubator/pig/trunk/src/org/apache/pig/impl/logicalLayer/LOLoad.java?rev=594634&r1=594633&r2=594634&view=diff
==============================================================================
--- incubator/pig/trunk/src/org/apache/pig/impl/logicalLayer/LOLoad.java (original)
+++ incubator/pig/trunk/src/org/apache/pig/impl/logicalLayer/LOLoad.java Tue Nov 13 12:40:07 2007
@@ -29,70 +29,67 @@
public class LOLoad extends LogicalOperator {
- private static final long serialVersionUID = 1L;
-
- protected FileSpec inputFileSpec;
-
- protected int outputType = FIXED;
-
-
- public LOLoad(FileSpec inputFileSpec) throws IOException, ParseException{
- super();
- this.inputFileSpec = inputFileSpec;
- try
- {
- LoadFunc storageFunc = (LoadFunc) PigContext.instantiateFuncFromSpec(inputFileSpec.getFuncSpec());
- }
- catch (IOException e)
- {
- Throwable cause = e.getCause();
- while (cause != null && cause.getClass().getName() != "java.lang.ClassNotFoundException")
- {
- System.out.println("cause = " + cause.getClass().getName());
- cause = cause.getCause();
- }
-
- if (cause != null)
- {
- throw new ParseException("Load function " + inputFileSpec.getFuncSpec() + " not found");
- }
- else
- {
- throw e;
- }
-
- }
+ private static final long serialVersionUID = 1L;
+
+ protected FileSpec inputFileSpec;
+
+ protected int outputType = FIXED;
+
+
+ public LOLoad(FileSpec inputFileSpec) throws IOException, ParseException {
+ super();
+ this.inputFileSpec = inputFileSpec;
+ try {
+ LoadFunc storageFunc =
+ (LoadFunc) PigContext.instantiateFuncFromSpec(inputFileSpec.
+ getFuncSpec());
+ } catch(IOException e) {
+ Throwable cause = e.getCause();
+ while (cause != null
+ && cause.getClass().getName() !=
+ "java.lang.ClassNotFoundException") {
+ System.out.println("cause = " + cause.getClass().getName());
+ cause = cause.getCause();
+ } if (cause != null) {
+ throw new ParseException("Load function " +
+ inputFileSpec.getFuncSpec() +
+ " not found");
+ } else {
+ throw e;
+ }
+
+ }
//TODO: Handle Schemas defined by Load Functions
schema = new TupleSchema();
}
@Override
- public String name() {
+ public String name() {
return "Load";
}
-
- public FileSpec getInputFileSpec(){
- return inputFileSpec;
+
+ public FileSpec getInputFileSpec() {
+ return inputFileSpec;
}
-
+
public void setInputFileSpec(FileSpec spec) {
- inputFileSpec = spec;
+ inputFileSpec = spec;
}
-
- @Override
- public String arguments() {
- return inputFileSpec.toString();
+
+ @Override
+ public String arguments() {
+ return inputFileSpec.toString();
}
@Override
- public TupleSchema outputSchema() {
- schema.setAlias(alias);
+ public TupleSchema outputSchema() {
+ schema.setAlias(alias);
return this.schema;
}
@Override
- public int getOutputType() {
+ public int getOutputType() {
return outputType;
}
@@ -104,16 +101,16 @@
}
@Override
- public String toString() {
- StringBuffer result = new StringBuffer(super.toString());
- result.append(" (outputType: ");
- result.append(outputType);
- result.append(')');
- return result.toString();
- }
+ public String toString() {
+ StringBuffer result = new StringBuffer(super.toString());
+ result.append(" (outputType: ");
+ result.append(outputType);
+ result.append(')');
+ return result.toString();
+ }
- @Override
- public List<String> getFuncs() {
+ @Override
+ public List<String> getFuncs() {
List<String> funcs = super.getFuncs();
funcs.add(inputFileSpec.getFuncName());
return funcs;
Modified: incubator/pig/trunk/src/org/apache/pig/impl/logicalLayer/LORead.java
URL: http://svn.apache.org/viewvc/incubator/pig/trunk/src/org/apache/pig/impl/logicalLayer/LORead.java?rev=594634&r1=594633&r2=594634&view=diff
==============================================================================
--- incubator/pig/trunk/src/org/apache/pig/impl/logicalLayer/LORead.java (original)
+++ incubator/pig/trunk/src/org/apache/pig/impl/logicalLayer/LORead.java Tue Nov 13 12:40:07 2007
@@ -24,78 +24,75 @@
public class LORead extends LogicalOperator {
- private static final long serialVersionUID = 1L;
+ private static final long serialVersionUID = 1L;
-
- protected IntermedResult readFrom = null;
- boolean readsFromSplit = false;
-
- @Override
- public String toString() {
- StringBuffer result = new StringBuffer(super.toString());
- result.append(" (readsFromSplit: ");
- result.append(readsFromSplit);
- result.append(')');
- return result.toString();
- }
+ protected IntermedResult readFrom = null;
+ boolean readsFromSplit = false;
+ @Override
+ public String toString() {
+ StringBuffer result = new StringBuffer(super.toString());
+ result.append(" (readsFromSplit: ");
+ result.append(readsFromSplit);
+ result.append(')');
+ return result.toString();
+ }
+ //Since intermed result may have multiple outputs, which output do I read?
+ public int splitOutputToRead = 0;
- //Since intermed result may have multiple outputs, which output do I read?
-
- public int splitOutputToRead = 0;
-
- public LORead(IntermedResult readFromIn) {
- super();
+ public LORead(IntermedResult readFromIn) {
+ super();
readFrom = readFromIn;
- }
-
- public LORead(IntermedResult readFromIn, int outputToRead) {
- super();
- readsFromSplit = true;
- this.splitOutputToRead = outputToRead;
+ }
+
+ public LORead(IntermedResult readFromIn, int outputToRead) {
+ super();
+ readsFromSplit = true;
+ this.splitOutputToRead = outputToRead;
readFrom = readFromIn;
- }
-
- public boolean readsFromSplit(){
- return readsFromSplit;
- }
-
- @Override
- public String name() {
- return "Read";
- }
- @Override
- public String arguments() {
- return alias;
- }
-
+ }
+
+ public boolean readsFromSplit() {
+ return readsFromSplit;
+ }
+
+ @Override
+ public String name() {
+ return "Read";
+ }
@Override
- public TupleSchema outputSchema() {
- if (schema == null) {
- if (readFrom.lp != null && readFrom.lp.root != null && readFrom.lp.root.outputSchema() != null) {
+ public String arguments() {
+ return alias;
+ }
+
+ @Override
+ public TupleSchema outputSchema() {
+ if (schema == null) {
+ if (readFrom.lp != null && readFrom.lp.root != null
+ && readFrom.lp.root.outputSchema() != null) {
schema = readFrom.lp.root.outputSchema().copy();
} else {
schema = new TupleSchema();
}
- }
-
- schema.removeAllAliases();
+ }
+
+ schema.removeAllAliases();
schema.setAlias(alias);
-
+
return schema;
}
-
-
-
- @Override
- public int getOutputType(){
- return readFrom.getOutputType();
- }
-
- public IntermedResult getReadFrom() {
- return readFrom;
- }
+
+
+
+ @Override
+ public int getOutputType() {
+ return readFrom.getOutputType();
+ }
+
+ public IntermedResult getReadFrom() {
+ return readFrom;
+ }
}
Modified: incubator/pig/trunk/src/org/apache/pig/impl/logicalLayer/LOSort.java
URL: http://svn.apache.org/viewvc/incubator/pig/trunk/src/org/apache/pig/impl/logicalLayer/LOSort.java?rev=594634&r1=594633&r2=594634&view=diff
==============================================================================
--- incubator/pig/trunk/src/org/apache/pig/impl/logicalLayer/LOSort.java (original)
+++ incubator/pig/trunk/src/org/apache/pig/impl/logicalLayer/LOSort.java Tue Nov 13 12:40:07 2007
@@ -15,64 +15,63 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.pig.impl.logicalLayer;
-
-
+package org.apache.pig.impl.logicalLayer;
+
+
import org.apache.pig.impl.eval.EvalSpec;
import org.apache.pig.impl.logicalLayer.schema.TupleSchema;
-
-public class LOSort extends LogicalOperator {
- private static final long serialVersionUID = 1L;
- private EvalSpec sortSpec;
-
-
- protected EvalSpec spec;
-
- public EvalSpec getSpec() {
- return spec;
- }
-
-
-
- public LOSort( LogicalOperator input, EvalSpec sortSpec){
- super(input);
- this.sortSpec = sortSpec;
- getOutputType();
- }
-
- @Override
- public String name() {
- return "SORT";
- }
-
- @Override
- public String arguments() {
- return sortSpec.toString();
- }
-
- @Override
- public int getOutputType() {
- switch(getInputs().get(0).getOutputType()){
- case FIXED:
- return FIXED;
- default:
- throw new RuntimeException("Blocking operator such as sort cannot handle streaming input");
- }
- }
-
- @Override
- public TupleSchema outputSchema() {
- if (schema== null)
- schema = getInputs().get(0).outputSchema().copy();
-
- schema.setAlias(alias);
- return schema;
-
- }
-
- public EvalSpec getSortSpec() {
- return sortSpec;
- }
-
-}
+
+public class LOSort extends LogicalOperator {
+ private static final long serialVersionUID = 1L;
+ private EvalSpec sortSpec;
+
+
+ protected EvalSpec spec;
+
+ public EvalSpec getSpec() {
+ return spec;
+ }
+
+ public LOSort(LogicalOperator input, EvalSpec sortSpec) {
+ super(input);
+ this.sortSpec = sortSpec;
+ getOutputType();
+ }
+
+ @Override
+ public String name() {
+ return "SORT";
+ }
+
+ @Override
+ public String arguments() {
+ return sortSpec.toString();
+ }
+
+ @Override
+ public int getOutputType() {
+ switch (getInputs().get(0).getOutputType()) {
+ case FIXED:
+ return FIXED;
+ default:
+ throw new RuntimeException
+ ("Blocking operator such as sort cannot handle streaming input");
+ }
+ }
+
+ @Override
+ public TupleSchema outputSchema() {
+ if (schema == null)
+ schema = getInputs().get(0).outputSchema().copy();
+
+ schema.setAlias(alias);
+ return schema;
+
+ }
+
+ public EvalSpec getSortSpec() {
+ return sortSpec;
+ }
+
+}
Modified: incubator/pig/trunk/src/org/apache/pig/impl/logicalLayer/LOSplit.java
URL: http://svn.apache.org/viewvc/incubator/pig/trunk/src/org/apache/pig/impl/logicalLayer/LOSplit.java?rev=594634&r1=594633&r2=594634&view=diff
==============================================================================
--- incubator/pig/trunk/src/org/apache/pig/impl/logicalLayer/LOSplit.java (original)
+++ incubator/pig/trunk/src/org/apache/pig/impl/logicalLayer/LOSplit.java Tue Nov 13 12:40:07 2007
@@ -15,41 +15,41 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.pig.impl.logicalLayer;
-
-import java.util.ArrayList;
-import java.util.List;
+package org.apache.pig.impl.logicalLayer;
+
+import java.util.ArrayList;
+import java.util.List;
import org.apache.pig.impl.eval.cond.Cond;
import org.apache.pig.impl.logicalLayer.schema.TupleSchema;
-
-
-public class LOSplit extends LogicalOperator {
- private static final long serialVersionUID = 1L;
-
- List<Cond> conditions = new ArrayList<Cond>();
-
- public LOSplit(LogicalOperator input){
- super(input);
- }
-
- public void addCond(Cond cond){
- conditions.add(cond);
- }
-
- @Override
- public int getOutputType(){
- return getInputs().get(0).getOutputType();
- }
-
- public ArrayList<Cond> getConditions(){
- return new ArrayList<Cond>(conditions);
- }
-
- @Override
- public TupleSchema outputSchema(){
- return getInputs().get(0).outputSchema().copy();
- }
-
-
-}
+
+
+public class LOSplit extends LogicalOperator {
+ private static final long serialVersionUID = 1L;
+
+ List<Cond> conditions = new ArrayList<Cond>();
+
+ public LOSplit(LogicalOperator input) {
+ super(input);
+ }
+
+ public void addCond(Cond cond) {
+ conditions.add(cond);
+ }
+
+ @Override
+ public int getOutputType() {
+ return getInputs().get(0).getOutputType();
+ }
+
+ public ArrayList<Cond> getConditions() {
+ return new ArrayList<Cond> (conditions);
+ }
+
+ @Override
+ public TupleSchema outputSchema() {
+ return getInputs().get(0).outputSchema().copy();
+ }
+
+
+}
Modified: incubator/pig/trunk/src/org/apache/pig/impl/logicalLayer/LOStore.java
URL: http://svn.apache.org/viewvc/incubator/pig/trunk/src/org/apache/pig/impl/logicalLayer/LOStore.java?rev=594634&r1=594633&r2=594634&view=diff
==============================================================================
--- incubator/pig/trunk/src/org/apache/pig/impl/logicalLayer/LOStore.java (original)
+++ incubator/pig/trunk/src/org/apache/pig/impl/logicalLayer/LOStore.java Tue Nov 13 12:40:07 2007
@@ -27,59 +27,62 @@
public class LOStore extends LogicalOperator {
- private static final long serialVersionUID = 1L;
-
- protected FileSpec outputFileSpec;
+ private static final long serialVersionUID = 1L;
- protected boolean append;
-
+ protected FileSpec outputFileSpec;
- public LOStore(LogicalOperator input, FileSpec fileSpec, boolean append) throws IOException{
- super(input);
+ protected boolean append;
+
+
+ public LOStore(LogicalOperator input,
+ FileSpec fileSpec,
+ boolean append) throws IOException {
+ super(input);
this.outputFileSpec = fileSpec;
this.append = append;
//See if the store function spec is valid
- try{
- StoreFunc StoreFunc = (StoreFunc) PigContext.instantiateFuncFromSpec(fileSpec.getFuncSpec());
- }catch (Exception e){
- IOException ioe = new IOException(e.getMessage());
- ioe.setStackTrace(e.getStackTrace());
- throw ioe;
- }
-
- getOutputType();
+ try {
+ StoreFunc StoreFunc =
+ (StoreFunc) PigContext.instantiateFuncFromSpec(
+ fileSpec.getFuncSpec());
+ } catch(Exception e) {
+ IOException ioe = new IOException(e.getMessage());
+ ioe.setStackTrace(e.getStackTrace());
+ throw ioe;
+ } getOutputType();
+ }
+
+
+ public FileSpec getOutputFileSpec() {
+ return outputFileSpec;
}
-
-
- public FileSpec getOutputFileSpec(){
- return outputFileSpec;
- }
-
-
- @Override
- public String toString() {
-
- StringBuffer result = new StringBuffer(super.toString());
- result.append(" (append: ");
- result.append(append);
- result.append(')');
- return result.toString();
- }
- @Override
- public String name() {
+ @Override
+ public String toString() {
+ StringBuffer result = new StringBuffer(super.toString());
+ result.append(" (append: ");
+ result.append(append);
+ result.append(')');
+ return result.toString();
+ }
+
+
+ @Override
+ public String name() {
return "Store";
}
@Override
- public TupleSchema outputSchema() {
- throw new RuntimeException("Internal error: Asking for schema of a store operator.");
+ public TupleSchema outputSchema() {
+ throw new
+ RuntimeException
+ ("Internal error: Asking for schema of a store operator.");
}
@Override
- public int getOutputType() {
+ public int getOutputType() {
switch (getInputs().get(0).getOutputType()) {
case FIXED:
return FIXED;
@@ -91,14 +94,14 @@
}
@Override
- public List<String> getFuncs() {
+ public List<String> getFuncs() {
List<String> funcs = super.getFuncs();
funcs.add(outputFileSpec.getFuncName());
return funcs;
}
- public boolean isAppend() {
- return append;
- }
+ public boolean isAppend() {
+ return append;
+ }
}
Modified: incubator/pig/trunk/src/org/apache/pig/impl/logicalLayer/LOUnion.java
URL: http://svn.apache.org/viewvc/incubator/pig/trunk/src/org/apache/pig/impl/logicalLayer/LOUnion.java?rev=594634&r1=594633&r2=594634&view=diff
==============================================================================
--- incubator/pig/trunk/src/org/apache/pig/impl/logicalLayer/LOUnion.java (original)
+++ incubator/pig/trunk/src/org/apache/pig/impl/logicalLayer/LOUnion.java Tue Nov 13 12:40:07 2007
@@ -24,52 +24,55 @@
public class LOUnion extends LogicalOperator {
- private static final long serialVersionUID = 1L;
-
+ private static final long serialVersionUID = 1L;
+
public LOUnion(List<LogicalOperator> inputsIn) {
- super(inputsIn);
+ super(inputsIn);
}
-
+
@Override
- public String name() {
+ public String name() {
return "Union";
}
@Override
- public TupleSchema outputSchema() {
- if (schema == null){
- TupleSchema longest = getInputs().get(0).outputSchema();
- int current = 0;
- for (LogicalOperator lo : getInputs()) {
- if (lo != null && lo.outputSchema() != null && lo.outputSchema().numFields() > current) {
- longest = lo.outputSchema();
- current = longest.numFields();
- }
- }
- schema = longest.copy();
- }
-
- schema.setAlias(alias);
+ public TupleSchema outputSchema() {
+ if (schema == null) {
+ TupleSchema longest = getInputs().get(0).outputSchema();
+ int current = 0;
+ for (LogicalOperator lo:getInputs()) {
+ if (lo != null && lo.outputSchema() != null
+ && lo.outputSchema().numFields() > current) {
+ longest = lo.outputSchema();
+ current = longest.numFields();
+ }
+ }
+ schema = longest.copy();
+ }
+
+ schema.setAlias(alias);
return schema;
}
-
- @Override
- public int getOutputType(){
- int outputType = FIXED;
- for (int i=0; i<getInputs().size(); i++){
- switch (getInputs().get(i).getOutputType()){
- case FIXED:
- continue;
- case MONOTONE:
- if (outputType == FIXED)
- outputType = MONOTONE;
- continue;
- case AMENDABLE:
- outputType = AMENDABLE;
- default:
- throw new RuntimeException("Invalid input type to the UNION operator");
- }
- }
- return outputType;
- }
+
+ @Override
+ public int getOutputType() {
+ int outputType = FIXED;
+ for (int i = 0; i < getInputs().size(); i++) {
+ switch (getInputs().get(i).getOutputType()) {
+ case FIXED:
+ continue;
+ case MONOTONE:
+ if (outputType == FIXED)
+ outputType = MONOTONE;
+ continue;
+ case AMENDABLE:
+ outputType = AMENDABLE;
+ default:
+ throw new
+ RuntimeException
+ ("Invalid input type to the UNION operator");
+ }
+ }
+ return outputType;
+ }
}
Modified: incubator/pig/trunk/src/org/apache/pig/impl/logicalLayer/LogicalOperator.java
URL: http://svn.apache.org/viewvc/incubator/pig/trunk/src/org/apache/pig/impl/logicalLayer/LogicalOperator.java?rev=594634&r1=594633&r2=594634&view=diff
==============================================================================
--- incubator/pig/trunk/src/org/apache/pig/impl/logicalLayer/LogicalOperator.java (original)
+++ incubator/pig/trunk/src/org/apache/pig/impl/logicalLayer/LogicalOperator.java Tue Nov 13 12:40:07 2007
@@ -27,58 +27,55 @@
abstract public class LogicalOperator implements Serializable {
- public String alias = null;
-
- public static final int FIXED = 1;
- public static final int MONOTONE = 2;
- public static final int UPDATABLE = 3; // Reserved for future use
- public static final int AMENDABLE = 4;
-
- protected int requestedParallelism = -1;
- protected TupleSchema schema = null;
- protected List<LogicalOperator> inputs;
-
- protected LogicalOperator(){
- this.inputs = new ArrayList<LogicalOperator>();
- }
-
- protected LogicalOperator(List<LogicalOperator> inputs) {
- this.inputs = inputs;
- }
-
- protected LogicalOperator(LogicalOperator input) {
- this.inputs = new ArrayList<LogicalOperator>();
- inputs.add(input);
- }
-
- public String getAlias() {
- return alias;
- }
-
- public void setAlias(String newAlias) {
- alias = newAlias;
- }
-
- public int getRequestedParallelism() {
- return requestedParallelism;
- }
-
- public void setRequestedParallelism(int newRequestedParallelism) {
- requestedParallelism = newRequestedParallelism;
- }
-
- @Override
- public String toString() {
- StringBuffer result = new StringBuffer(super.toString());
- result.append(" (alias: ");
- result.append(alias);
- result.append(", requestedParallelism: ");
- result.append(requestedParallelism);
- result.append(')');
- return result.toString();
- }
+ public String alias = null;
- public abstract TupleSchema outputSchema();
+ public static final int FIXED = 1;
+ public static final int MONOTONE = 2;
+ public static final int UPDATABLE = 3; // Reserved for future use
+ public static final int AMENDABLE = 4;
+
+ protected int requestedParallelism = -1;
+ protected TupleSchema schema = null;
+ protected List < LogicalOperator > inputs;
+
+ protected LogicalOperator() {
+ this.inputs = new ArrayList < LogicalOperator > ();
+ } protected LogicalOperator(List < LogicalOperator > inputs) {
+ this.inputs = inputs;
+ }
+
+ protected LogicalOperator(LogicalOperator input) {
+ this.inputs = new ArrayList < LogicalOperator > ();
+ inputs.add(input);
+ }
+
+ public String getAlias() {
+ return alias;
+ }
+
+ public void setAlias(String newAlias) {
+ alias = newAlias;
+ }
+
+ public int getRequestedParallelism() {
+ return requestedParallelism;
+ }
+
+ public void setRequestedParallelism(int newRequestedParallelism) {
+ requestedParallelism = newRequestedParallelism;
+ }
+
+ @Override public String toString() {
+ StringBuffer result = new StringBuffer(super.toString());
+ result.append(" (alias: ");
+ result.append(alias);
+ result.append(", requestedParallelism: ");
+ result.append(requestedParallelism);
+ result.append(')');
+ return result.toString();
+ }
+
+ public abstract TupleSchema outputSchema();
public String name() {
return "ROOT";
@@ -99,10 +96,10 @@
}
return funcs;
}
-
+
public abstract int getOutputType();
- public void setSchema(TupleSchema schema) {
- this.schema = schema;
- }
+ public void setSchema(TupleSchema schema) {
+ this.schema = schema;
+ }
}