You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by dv...@apache.org on 2010/12/23 02:33:45 UTC
svn commit: r1052127 [3/3] - in /pig/trunk: ./
src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/
src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/
src/org/apache/pig/backend/hadoop/executionengine/physi...
Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POLocalRearrange.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POLocalRearrange.java?rev=1052127&r1=1052126&r2=1052127&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POLocalRearrange.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POLocalRearrange.java Thu Dec 23 01:33:44 2010
@@ -52,37 +52,37 @@ import org.apache.pig.pen.util.ExampleTu
public class POLocalRearrange extends PhysicalOperator {
/**
- *
+ *
*/
protected static final long serialVersionUID = 1L;
protected static final TupleFactory mTupleFactory = TupleFactory.getInstance();
private static Log log = LogFactory.getLog(POLocalRearrange.class);
-
+
private static final Result ERR_RESULT = new Result();
protected List<PhysicalPlan> plans;
-
+
protected List<PhysicalPlan> secondaryPlans;
-
+
protected List<ExpressionOperator> leafOps;
-
+
protected List<ExpressionOperator> secondaryLeafOps;
// The position of this LR in the package operator
protected byte index;
-
+
protected byte keyType;
-
+
protected byte mainKeyType;
-
+
protected byte secondaryKeyType;
protected boolean mIsDistinct = false;
-
+
protected boolean isCross = false;
-
+
// map to store mapping of projected columns to
// the position in the "Key" where these will be projected to.
// We use this information to strip off these columns
@@ -93,8 +93,8 @@ public class POLocalRearrange extends Ph
// For the first input (a), the map would contain following key:value
// 2:0 (2 corresponds to $2 in cogroup a by ($2, $3) and 0 corresponds to 1st index in key)
// 3:1 (3 corresponds to $3 in cogroup a by ($2, $3) and 0 corresponds to 2nd index in key)
- private Map<Integer, Integer> mProjectedColsMap;
- private Map<Integer, Integer> mSecondaryProjectedColsMap;
+ private final Map<Integer, Integer> mProjectedColsMap;
+ private final Map<Integer, Integer> mSecondaryProjectedColsMap;
// A place holder Tuple used in distinct case where we really don't
// have any value to pass through. But hadoop gets cranky if we pass a
@@ -119,12 +119,12 @@ public class POLocalRearrange extends Ph
private int mProjectedColsMapSize = 0;
private int mSecondaryProjectedColsMapSize = 0;
-
+
private boolean useSecondaryKey = false;
-
+
// By default, we strip keys from the value.
private boolean stripKeyFromValue = true;
-
+
public POLocalRearrange(OperatorKey k) {
this(k, -1, null);
}
@@ -175,9 +175,9 @@ public class POLocalRearrange extends Ph
/**
* Sets the co-group index of this operator
- *
- * @param index the position of this operator in
- * a co-group operation
+ *
+ * @param index the position of this operator in
+ * a co-group operation
* @throws ExecException if the index value is bigger then 0x7F
*/
public void setIndex(int index) throws ExecException {
@@ -186,7 +186,7 @@ public class POLocalRearrange extends Ph
/**
* Sets the multi-query index of this operator
- *
+ *
* @param index the position of the parent plan of this operator
* in the enclosed split operator
* @throws ExecException if the index value is bigger then 0x7F
@@ -194,19 +194,19 @@ public class POLocalRearrange extends Ph
public void setMultiQueryIndex(int index) throws ExecException {
setIndex(index, true);
}
-
+
private void setIndex(int index, boolean multiQuery) throws ExecException {
- if (index > PigNullableWritable.idxSpace) {
+ if (index > PigNullableWritable.idxSpace) {
// indices in group and cogroup should only
// be in the range 0x00 to 0x7F (only 127 possible
// inputs)
int errCode = 1082;
- String msg = multiQuery?
+ String msg = multiQuery?
"Merge more than 127 map-reduce jobs not supported."
: "Cogroups with more than 127 inputs not supported.";
throw new ExecException(msg, errCode, PigException.INPUT);
} else {
- // We could potentially be sending the (key, value) relating to
+ // We could potentially be sending the (key, value) relating to
// multiple "group by" statements through one map reduce job
// in multiquery optimized execution. In this case, we want
// two keys which have the same content but coming from different
@@ -220,10 +220,10 @@ public class POLocalRearrange extends Ph
// contents coming from different "group by" operations would have different
// indices and hence would go to different invocation of reduce()
this.index = multiQuery ? (byte)(index | PigNullableWritable.mqFlag) : (byte)index;
- }
+ }
}
-
- public boolean isDistinct() {
+
+ public boolean isDistinct() {
return mIsDistinct;
}
@@ -233,7 +233,7 @@ public class POLocalRearrange extends Ph
mFakeTuple = mTupleFactory.newTuple();
}
}
-
+
/**
* Overridden since the attachment of the new input should cause the old
* processing to end.
@@ -242,7 +242,7 @@ public class POLocalRearrange extends Ph
public void attachInput(Tuple t) {
super.attachInput(t);
}
-
+
/**
* Calls getNext on the generate operator inside the nested
* physical plan. Converts the generated tuple into the proper
@@ -250,143 +250,111 @@ public class POLocalRearrange extends Ph
*/
@Override
public Result getNext(Tuple t) throws ExecException {
-
+
Result inp = null;
Result res = ERR_RESULT;
while (true) {
inp = processInput();
- if (inp.returnStatus == POStatus.STATUS_EOP || inp.returnStatus == POStatus.STATUS_ERR)
+ if (inp.returnStatus == POStatus.STATUS_EOP || inp.returnStatus == POStatus.STATUS_ERR) {
break;
- if (inp.returnStatus == POStatus.STATUS_NULL)
+ }
+ if (inp.returnStatus == POStatus.STATUS_NULL) {
continue;
-
+ }
+
for (PhysicalPlan ep : plans) {
ep.attachInput((Tuple)inp.result);
}
-
+
List<Result> resLst = new ArrayList<Result>();
-
+
if (secondaryPlans!=null) {
for (PhysicalPlan ep : secondaryPlans) {
ep.attachInput((Tuple)inp.result);
}
}
-
+
List<Result> secondaryResLst = null;
- if (secondaryLeafOps!=null)
+ if (secondaryLeafOps!=null) {
secondaryResLst = new ArrayList<Result>();
-
+ }
+
for (ExpressionOperator op : leafOps){
-
+
switch(op.getResultType()){
case DataType.BAG:
- res = op.getNext(dummyBag);
- break;
case DataType.BOOLEAN:
- res = op.getNext(dummyBool);
- break;
case DataType.BYTEARRAY:
- res = op.getNext(dummyDBA);
- break;
case DataType.CHARARRAY:
- res = op.getNext(dummyString);
- break;
case DataType.DOUBLE:
- res = op.getNext(dummyDouble);
- break;
case DataType.FLOAT:
- res = op.getNext(dummyFloat);
- break;
case DataType.INTEGER:
- res = op.getNext(dummyInt);
- break;
case DataType.LONG:
- res = op.getNext(dummyLong);
- break;
case DataType.MAP:
- res = op.getNext(dummyMap);
- break;
case DataType.TUPLE:
- res = op.getNext(dummyTuple);
+ res = op.getNext(getDummy(op.getResultType()), op.getResultType());
break;
default:
log.error("Invalid result type: " + DataType.findType(op.getResultType()));
break;
}
-
+
// allow null as group by key
if (res.returnStatus != POStatus.STATUS_OK && res.returnStatus != POStatus.STATUS_NULL) {
return new Result();
}
-
+
resLst.add(res);
}
-
+
if (secondaryLeafOps!=null)
{
for (ExpressionOperator op : secondaryLeafOps){
-
+
switch(op.getResultType()){
case DataType.BAG:
- res = op.getNext(dummyBag);
- break;
case DataType.BOOLEAN:
- res = op.getNext(dummyBool);
- break;
case DataType.BYTEARRAY:
- res = op.getNext(dummyDBA);
- break;
case DataType.CHARARRAY:
- res = op.getNext(dummyString);
- break;
case DataType.DOUBLE:
- res = op.getNext(dummyDouble);
- break;
case DataType.FLOAT:
- res = op.getNext(dummyFloat);
- break;
case DataType.INTEGER:
- res = op.getNext(dummyInt);
- break;
case DataType.LONG:
- res = op.getNext(dummyLong);
- break;
case DataType.MAP:
- res = op.getNext(dummyMap);
- break;
case DataType.TUPLE:
- res = op.getNext(dummyTuple);
+ res = op.getNext(getDummy(op.getResultType()), op.getResultType());
break;
default:
log.error("Invalid result type: " + DataType.findType(op.getResultType()));
break;
}
-
+
// allow null as group by key
if (res.returnStatus != POStatus.STATUS_OK && res.returnStatus != POStatus.STATUS_NULL) {
return new Result();
}
-
+
secondaryResLst.add(res);
}
}
-
+
// If we are using secondary sort key, our new key is:
- // (nullable, index, (key, secondary key), value)
- res.result = constructLROutput(resLst,secondaryResLst,(Tuple)inp.result);
+ // (nullable, index, (key, secondary key), value)
+ res.result = constructLROutput(resLst,secondaryResLst,(Tuple)inp.result);
res.returnStatus = POStatus.STATUS_OK;
-
+
detachPlans(plans);
- if(secondaryPlans != null)
+ if(secondaryPlans != null) {
detachPlans(secondaryPlans);
-
+ }
+
res.result = illustratorMarkup(inp.result, res.result, 0);
return res;
}
return inp;
}
-
+
private void detachPlans(List<PhysicalPlan> plans) {
for (PhysicalPlan ep : plans) {
ep.detachInput();
@@ -398,50 +366,52 @@ public class POLocalRearrange extends Ph
if(resLst.size()>1){
Tuple t = mTupleFactory.newTuple(resLst.size());
int i=-1;
- for(Result res : resLst)
+ for(Result res : resLst) {
t.set(++i, res.result);
- key = t;
+ }
+ key = t;
} else if (resLst.size() == 1 && type == DataType.TUPLE) {
-
+
// We get here after merging multiple jobs that have different
// map key types into a single job during multi-query optimization.
// If the key isn't a tuple, it must be wrapped in a tuple.
Object obj = resLst.get(0).result;
if (obj instanceof Tuple) {
- key = (Tuple)obj;
+ key = obj;
} else {
Tuple t = mTupleFactory.newTuple(1);
t.set(0, resLst.get(0).result);
key = t;
- }
+ }
}
else{
key = resLst.get(0).result;
}
return key;
}
-
+
protected Tuple constructLROutput(List<Result> resLst, List<Result> secondaryResLst, Tuple value) throws ExecException{
Tuple lrOutput = mTupleFactory.newTuple(3);
lrOutput.set(0, Byte.valueOf(this.index));
//Construct key
Object key;
Object secondaryKey=null;
-
+
if (secondaryResLst!=null && secondaryResLst.size()>0)
{
key = getKeyFromResult(resLst, mainKeyType);
secondaryKey = getKeyFromResult(secondaryResLst, secondaryKeyType);
- } else
+ } else {
key = getKeyFromResult(resLst, keyType);
-
+ }
+
if(!stripKeyFromValue){
lrOutput.set(1, key);
lrOutput.set(2, value);
return lrOutput;
}
-
+
if (mIsDistinct) {
//Put the key and the indexed tuple
@@ -453,9 +423,10 @@ public class POLocalRearrange extends Ph
lrOutput.set(2, mFakeTuple);
return lrOutput;
} else if(isCross){
-
- for(int i=0;i<plans.size();i++)
+
+ for(int i=0;i<plans.size();i++) {
value.getAll().remove(0);
+ }
//Put the index, key, and value
//in a tuple and return
lrOutput.set(1, key);
@@ -471,11 +442,11 @@ public class POLocalRearrange extends Ph
compoundKey.set(0, key);
compoundKey.set(1, secondaryKey);
lrOutput.set(1, compoundKey);
- }
- else
+ } else {
lrOutput.set(1, key);
-
- // strip off the columns in the "value" which
+ }
+
+ // strip off the columns in the "value" which
// are present in the "key"
if(mProjectedColsMapSize != 0 || mProjectStar == true) {
@@ -498,17 +469,17 @@ public class POLocalRearrange extends Ph
// the "value" since all elements are in the
// "key"
minimalValue = mTupleFactory.newTuple(0);
-
+
}
lrOutput.set(2, minimalValue);
-
+
} else {
-
+
// there were no columns in the "key"
// which we can strip off from the "value"
// so just send the value we got
lrOutput.set(2, value);
-
+
}
return lrOutput;
}
@@ -519,16 +490,17 @@ public class POLocalRearrange extends Ph
}
public void setKeyType(byte keyType) {
- if (useSecondaryKey)
+ if (useSecondaryKey) {
this.mainKeyType = keyType;
- else
+ } else {
this.keyType = keyType;
+ }
}
public List<PhysicalPlan> getPlans() {
return plans;
}
-
+
public void setUseSecondaryKey(boolean useSecondaryKey) {
this.useSecondaryKey = useSecondaryKey;
mainKeyType = keyType;
@@ -539,12 +511,12 @@ public class POLocalRearrange extends Ph
leafOps.clear();
int keyIndex = 0; // zero based index for fields in the key
for (PhysicalPlan plan : plans) {
- ExpressionOperator leaf = (ExpressionOperator)plan.getLeaves().get(0);
+ ExpressionOperator leaf = (ExpressionOperator)plan.getLeaves().get(0);
leafOps.add(leaf);
-
+
// don't optimize CROSS
if(!isCross) {
- // Look for the leaf Ops which are POProject operators - get the
+ // Look for the leaf Ops which are POProject operators - get the
// the columns that these POProject Operators are projecting.
// They MUST be projecting either a column or '*'.
// Keep track of the columns which are being projected and
@@ -570,16 +542,18 @@ public class POLocalRearrange extends Ph
} else {
try {
List<PhysicalOperator> preds = plan.getPredecessors(leaf);
- if (preds==null || !(preds.get(0) instanceof POProject))
+ if (preds==null || !(preds.get(0) instanceof POProject)) {
mProjectedColsMap.put(project.getColumn(), keyIndex);
+ }
} catch (ExecException e) {
int errCode = 2070;
String msg = "Problem in accessing column from project operator.";
throw new PlanException(msg, errCode, PigException.BUG);
}
}
- if(project.getResultType() == DataType.TUPLE)
+ if(project.getResultType() == DataType.TUPLE) {
isKeyTuple = true;
+ }
}
keyIndex++;
}
@@ -593,18 +567,18 @@ public class POLocalRearrange extends Ph
}
mProjectedColsMapSize = mProjectedColsMap.size();
}
-
+
public void setSecondaryPlans(List<PhysicalPlan> plans) throws PlanException {
this.secondaryPlans = plans;
secondaryLeafOps.clear();
int keyIndex = 0; // zero based index for fields in the key
for (PhysicalPlan plan : plans) {
- ExpressionOperator leaf = (ExpressionOperator)plan.getLeaves().get(0);
+ ExpressionOperator leaf = (ExpressionOperator)plan.getLeaves().get(0);
secondaryLeafOps.add(leaf);
-
+
// don't optimize CROSS
if(!isCross) {
- // Look for the leaf Ops which are POProject operators - get the
+ // Look for the leaf Ops which are POProject operators - get the
// the columns that these POProject Operators are projecting.
// They MUST be projecting either a column or '*'.
// Keep track of the columns which are being projected and
@@ -630,16 +604,18 @@ public class POLocalRearrange extends Ph
} else {
try {
List<PhysicalOperator> preds = plan.getPredecessors(leaf);
- if (preds==null || !(preds.get(0) instanceof POProject))
+ if (preds==null || !(preds.get(0) instanceof POProject)) {
mSecondaryProjectedColsMap.put(project.getColumn(), keyIndex);
+ }
} catch (ExecException e) {
int errCode = 2070;
String msg = "Problem in accessing column from project operator.";
throw new PlanException(msg, errCode, PigException.BUG);
}
}
- if(project.getResultType() == DataType.TUPLE)
+ if(project.getResultType() == DataType.TUPLE) {
isSecondaryKeyTuple = true;
+ }
}
keyIndex++;
}
@@ -653,9 +629,9 @@ public class POLocalRearrange extends Ph
}
mainKeyType = keyType;
keyType = DataType.TUPLE;
- if (plans.size()>1)
+ if (plans.size()>1) {
secondaryKeyType = DataType.TUPLE;
- else
+ } else
{
secondaryKeyType = plans.get(0).getLeaves().get(0).getResultType();
}
@@ -663,7 +639,7 @@ public class POLocalRearrange extends Ph
}
/**
- * Make a deep copy of this operator.
+ * Make a deep copy of this operator.
* @throws CloneNotSupportedException
*/
@Override
@@ -674,7 +650,7 @@ public class POLocalRearrange extends Ph
clonePlans.add(plan.clone());
}
POLocalRearrange clone = new POLocalRearrange(new OperatorKey(
- mKey.scope,
+ mKey.scope,
NodeIdGenerator.getGenerator().getNextNodeId(mKey.scope)),
requestedParallelism);
try {
@@ -710,7 +686,7 @@ public class POLocalRearrange extends Ph
public Map<Integer, Integer> getProjectedColsMap() {
return mProjectedColsMap;
}
-
+
/**
* @return the mProjectedColsMap
*/
@@ -748,7 +724,7 @@ public class POLocalRearrange extends Ph
/**
* @param plans
- * @throws ExecException
+ * @throws ExecException
*/
public void setPlansFromCombiner(List<PhysicalPlan> plans) throws PlanException {
this.plans = plans;
@@ -756,12 +732,12 @@ public class POLocalRearrange extends Ph
mProjectedColsMap.clear();
int keyIndex = 0; // zero based index for fields in the key
for (PhysicalPlan plan : plans) {
- ExpressionOperator leaf = (ExpressionOperator)plan.getLeaves().get(0);
+ ExpressionOperator leaf = (ExpressionOperator)plan.getLeaves().get(0);
leafOps.add(leaf);
-
+
// don't optimize CROSS
if(!isCross) {
- // Look for the leaf Ops which are POProject operators - get the
+ // Look for the leaf Ops which are POProject operators - get the
// the columns that these POProject Operators are projecting.
// Keep track of the columns which are being projected and
// the position in the "Key" where these will be projected to.
@@ -784,8 +760,9 @@ public class POLocalRearrange extends Ph
throw new PlanException(msg, errCode, PigException.BUG);
}
}
- if(project.getResultType() == DataType.TUPLE)
+ if(project.getResultType() == DataType.TUPLE) {
isKeyTuple = true;
+ }
}
keyIndex++;
}
@@ -798,7 +775,7 @@ public class POLocalRearrange extends Ph
isKeyTuple = true;
}
mProjectedColsMapSize = mProjectedColsMap.size();
-
+
}
protected void setStripKeyFromValue(boolean stripKeyFromValue) {
Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPartitionRearrange.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPartitionRearrange.java?rev=1052127&r1=1052126&r2=1052127&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPartitionRearrange.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPartitionRearrange.java Thu Dec 23 01:33:44 2010
@@ -41,7 +41,7 @@ import org.apache.pig.impl.util.Utils;
/**
- * The partition rearrange operator is a part of the skewed join
+ * The partition rearrange operator is a part of the skewed join
* implementation. It has an embedded physical plan that
* generates tuples of the form (inpKey,reducerIndex,(indxed inp Tuple)).
*
@@ -49,10 +49,10 @@ import org.apache.pig.impl.util.Utils;
public class POPartitionRearrange extends POLocalRearrange {
/**
- *
+ *
*/
private static final long serialVersionUID = 1L;
-
+
private Integer totalReducers = -1;
// ReducerMap will store the tuple, max reducer index & min reducer index
private static Map<Object, Pair<Integer, Integer> > reducerMap = new HashMap<Object, Pair<Integer, Integer> >();
@@ -60,7 +60,7 @@ public class POPartitionRearrange extend
protected static final BagFactory mBagFactory = BagFactory.getInstance();
private PigContext pigContext;
-
+
public POPartitionRearrange(OperatorKey k) {
this(k, -1, null);
}
@@ -97,9 +97,9 @@ public class POPartitionRearrange extend
}
}
try {
-
- Integer [] redCnt = new Integer[1];
-
+
+ Integer [] redCnt = new Integer[1];
+
reducerMap = MapRedUtil.loadPartitionFileFromLocalCache(
keyDistFile, redCnt, DataType.NULL);
@@ -126,10 +126,10 @@ public class POPartitionRearrange extend
*/
@Override
public Result getNext(Tuple t) throws ExecException {
-
+
Result inp = null;
Result res = null;
-
+
// Load the skewed join key partitioning file
if (!loaded) {
try {
@@ -138,77 +138,48 @@ public class POPartitionRearrange extend
throw new RuntimeException(e);
}
}
-
+
while (true) {
inp = processInput();
- if (inp.returnStatus == POStatus.STATUS_EOP || inp.returnStatus == POStatus.STATUS_ERR)
+ if (inp.returnStatus == POStatus.STATUS_EOP || inp.returnStatus == POStatus.STATUS_ERR) {
break;
- if (inp.returnStatus == POStatus.STATUS_NULL)
+ }
+ if (inp.returnStatus == POStatus.STATUS_NULL) {
continue;
-
+ }
+
for (PhysicalPlan ep : plans) {
ep.attachInput((Tuple)inp.result);
}
List<Result> resLst = new ArrayList<Result>();
for (ExpressionOperator op : leafOps){
-
- switch(op.getResultType()){
- case DataType.BAG:
- res = op.getNext(dummyBag);
- break;
- case DataType.BOOLEAN:
- res = op.getNext(dummyBool);
- break;
- case DataType.BYTEARRAY:
- res = op.getNext(dummyDBA);
- break;
- case DataType.CHARARRAY:
- res = op.getNext(dummyString);
- break;
- case DataType.DOUBLE:
- res = op.getNext(dummyDouble);
- break;
- case DataType.FLOAT:
- res = op.getNext(dummyFloat);
- break;
- case DataType.INTEGER:
- res = op.getNext(dummyInt);
- break;
- case DataType.LONG:
- res = op.getNext(dummyLong);
- break;
- case DataType.MAP:
- res = op.getNext(dummyMap);
- break;
- case DataType.TUPLE:
- res = op.getNext(dummyTuple);
- break;
- }
- if(res.returnStatus!=POStatus.STATUS_OK)
+ res = op.getNext(getDummy(op.getResultType()), op.getResultType());
+ if(res.returnStatus!=POStatus.STATUS_OK) {
return new Result();
+ }
resLst.add(res);
}
res.result = constructPROutput(resLst,(Tuple)inp.result);
-
+
return res;
}
return inp;
}
- // Returns bag of tuples
+ // Returns bag of tuples
protected DataBag constructPROutput(List<Result> resLst, Tuple value) throws ExecException{
Tuple t = super.constructLROutput(resLst, null, value);
//Construct key
Object key = t.get(1);
-
+
// Construct an output bag and feed in the tuples
DataBag opBag = mBagFactory.newDefaultBag();
//Put the index, key, and value
//in a tuple and return
Pair <Integer, Integer> indexes = reducerMap.get(key); // first -> min, second ->max
-
+
// For non skewed keys, we set the partition index to be -1
if (indexes == null) {
indexes = new Pair <Integer, Integer>(-1,0);
@@ -224,10 +195,10 @@ public class POPartitionRearrange extend
opTuple.set(1, reducerIdx.intValue());
opTuple.set(2, key);
opTuple.set(3, t.get(2));
-
+
opBag.add(opTuple);
}
-
+
return opBag;
}
@@ -246,7 +217,7 @@ public class POPartitionRearrange extend
}
/**
- * Make a deep copy of this operator.
+ * Make a deep copy of this operator.
* @throws CloneNotSupportedException
*/
@Override
Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPreCombinerLocalRearrange.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPreCombinerLocalRearrange.java?rev=1052127&r1=1052126&r2=1052127&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPreCombinerLocalRearrange.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPreCombinerLocalRearrange.java Thu Dec 23 01:33:44 2010
@@ -40,13 +40,13 @@ import org.apache.pig.impl.plan.VisitorE
/**
* A specialized local rearrange operator which behaves
- * like the regular local rearrange in the getNext()
- * as far as getting its input and constructing the
+ * like the regular local rearrange in the getNext()
+ * as far as getting its input and constructing the
* "key" out of the input. It then returns a tuple with
* two fields - the key in the first position and the
* "value" inside a bag in the second position. This output
* format resembles the format out of a Package. This output
- * will feed to a foreach which expects this format.
+ * will feed to a foreach which expects this format.
*/
public class POPreCombinerLocalRearrange extends PhysicalOperator {
@@ -56,11 +56,11 @@ public class POPreCombinerLocalRearrange
protected static BagFactory mBagFactory = BagFactory.getInstance();
private static Log log = LogFactory.getLog(POPreCombinerLocalRearrange.class);
-
+
private static final Result ERR_RESULT = new Result();
-
+
protected List<PhysicalPlan> plans;
-
+
protected List<ExpressionOperator> leafOps;
protected byte keyType;
@@ -112,7 +112,7 @@ public class POPreCombinerLocalRearrange
public void attachInput(Tuple t) {
super.attachInput(t);
}
-
+
/**
* Calls getNext on the generate operator inside the nested
* physical plan. Converts the generated tuple into the proper
@@ -120,89 +120,74 @@ public class POPreCombinerLocalRearrange
*/
@Override
public Result getNext(Tuple t) throws ExecException {
-
+
Result inp = null;
Result res = ERR_RESULT;
while (true) {
inp = processInput();
- if (inp.returnStatus == POStatus.STATUS_EOP || inp.returnStatus == POStatus.STATUS_ERR)
+ if (inp.returnStatus == POStatus.STATUS_EOP || inp.returnStatus == POStatus.STATUS_ERR) {
break;
- if (inp.returnStatus == POStatus.STATUS_NULL)
+ }
+ if (inp.returnStatus == POStatus.STATUS_NULL) {
continue;
-
+ }
+
for (PhysicalPlan ep : plans) {
ep.attachInput((Tuple)inp.result);
}
List<Result> resLst = new ArrayList<Result>();
for (ExpressionOperator op : leafOps){
-
+
switch(op.getResultType()){
case DataType.BAG:
- res = op.getNext(dummyBag);
- break;
case DataType.BOOLEAN:
- res = op.getNext(dummyBool);
- break;
case DataType.BYTEARRAY:
- res = op.getNext(dummyDBA);
- break;
case DataType.CHARARRAY:
- res = op.getNext(dummyString);
- break;
case DataType.DOUBLE:
- res = op.getNext(dummyDouble);
- break;
case DataType.FLOAT:
- res = op.getNext(dummyFloat);
- break;
case DataType.INTEGER:
- res = op.getNext(dummyInt);
- break;
case DataType.LONG:
- res = op.getNext(dummyLong);
- break;
case DataType.MAP:
- res = op.getNext(dummyMap);
- break;
case DataType.TUPLE:
- res = op.getNext(dummyTuple);
+ res = op.getNext(getDummy(op.getResultType()), op.getResultType());
break;
default:
log.error("Invalid result type: "
+ DataType.findType(op.getResultType()));
break;
}
-
+
// allow null as group by key
if (res.returnStatus != POStatus.STATUS_OK
&& res.returnStatus != POStatus.STATUS_NULL) {
return new Result();
}
-
+
resLst.add(res);
}
res.result = constructLROutput(resLst,(Tuple)inp.result);
res.returnStatus = POStatus.STATUS_OK;
-
+
return res;
}
return inp;
}
-
+
protected Tuple constructLROutput(List<Result> resLst, Tuple value) throws ExecException{
//Construct key
Object key;
if(resLst.size()>1){
Tuple t = mTupleFactory.newTuple(resLst.size());
int i=-1;
- for(Result res : resLst)
+ for(Result res : resLst) {
t.set(++i, res.result);
+ }
key = t;
}
else{
key = resLst.get(0).result;
}
-
+
Tuple output = mTupleFactory.newTuple(2);
output.set(0, key);
// put the value in a bag so that the initial
@@ -229,9 +214,9 @@ public class POPreCombinerLocalRearrange
this.plans = plans;
leafOps.clear();
for (PhysicalPlan plan : plans) {
- ExpressionOperator leaf = (ExpressionOperator)plan.getLeaves().get(0);
+ ExpressionOperator leaf = (ExpressionOperator)plan.getLeaves().get(0);
leafOps.add(leaf);
- }
+ }
}
@Override
Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POSort.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POSort.java?rev=1052127&r1=1052126&r2=1052127&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POSort.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POSort.java Thu Dec 23 01:33:44 2010
@@ -37,14 +37,11 @@ import org.apache.pig.backend.hadoop.exe
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhyPlanVisitor;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.ExpressionOperator;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.POUserComparisonFunc;
-import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.POUserFunc;
import org.apache.pig.data.BagFactory;
import org.apache.pig.data.DataBag;
import org.apache.pig.data.DataType;
-import org.apache.pig.data.InternalCachedBag;
import org.apache.pig.data.InternalSortedBag;
import org.apache.pig.data.Tuple;
-import org.apache.pig.data.TupleFactory;
import org.apache.pig.impl.plan.OperatorKey;
import org.apache.pig.impl.plan.NodeIdGenerator;
import org.apache.pig.impl.plan.VisitorException;
@@ -55,13 +52,13 @@ import org.apache.pig.pen.util.LineageTr
* This implementation is applicable for both the physical plan and for the
* local backend, as the conversion of physical to mapreduce would see the SORT
* operator and take necessary steps to convert it to a quantile and a sort job.
- *
+ *
* This is a blocking operator. The sortedDataBag accumulates Tuples and sorts
* them only when there an iterator is started. So all the tuples from the input
* operator should be accumulated and filled into the dataBag. The attachInput
* method is not applicable here.
- *
- *
+ *
+ *
*/
//We intentionally skip type checking in backend for performance reasons
@@ -69,7 +66,7 @@ import org.apache.pig.pen.util.LineageTr
public class POSort extends PhysicalOperator {
/**
- *
+ *
*/
private static final long serialVersionUID = 1L;
//private List<Integer> mSortCols;
@@ -85,9 +82,9 @@ public class POSort extends PhysicalOper
public boolean isUDFComparatorUsed = false;
private DataBag sortedBag;
transient Iterator<Tuple> it;
-
+
private SortInfo sortInfo;
-
+
public POSort(
OperatorKey k,
int rp,
@@ -137,18 +134,20 @@ public class POSort extends PhysicalOper
super(k);
}
-
+
public class SortComparator implements Comparator<Tuple>,Serializable {
/**
- *
+ *
*/
private static final long serialVersionUID = 1L;
+ @Override
public int compare(Tuple o1, Tuple o2) {
int count = 0;
int ret = 0;
- if(sortPlans == null || sortPlans.size() == 0)
- return 0;
+ if(sortPlans == null || sortPlans.size() == 0) {
+ return 0;
+ }
for(PhysicalPlan plan : sortPlans) {
try {
plan.attachInput(o1);
@@ -175,48 +174,36 @@ public class POSort extends PhysicalOper
}
}
-
+
} catch (ExecException e) {
log.error("Invalid result while executing the expression plan : " + plan.toString() + "\n" + e.getMessage());
}
}
return ret;
- }
-
+ }
+
private Result getResult(PhysicalPlan plan, byte resultType) throws ExecException {
ExpressionOperator Op = (ExpressionOperator) plan.getLeaves().get(0);
Result res = null;
-
+
switch (resultType) {
case DataType.BYTEARRAY:
- res = Op.getNext(dummyDBA);
- break;
case DataType.CHARARRAY:
- res = Op.getNext(dummyString);
- break;
case DataType.DOUBLE:
- res = Op.getNext(dummyDouble);
- break;
case DataType.FLOAT:
- res = Op.getNext(dummyFloat);
- break;
case DataType.INTEGER:
- res = Op.getNext(dummyInt);
- break;
case DataType.LONG:
- res = Op.getNext(dummyLong);
- break;
case DataType.TUPLE:
- res = Op.getNext(dummyTuple);
+ res = Op.getNext(getDummy(resultType), resultType);
break;
default: {
int errCode = 2082;
String msg = "Did not expect result of type: " +
DataType.findTypeName(resultType);
- throw new ExecException(msg, errCode, PigException.BUG);
+ throw new ExecException(msg, errCode, PigException.BUG);
}
-
+
}
return res;
}
@@ -225,10 +212,11 @@ public class POSort extends PhysicalOper
public class UDFSortComparator implements Comparator<Tuple>,Serializable {
/**
- *
+ *
*/
private static final long serialVersionUID = 1L;
+ @Override
public int compare(Tuple t1, Tuple t2) {
mSortFunc.attachInput(t1, t2);
@@ -241,10 +229,11 @@ public class POSort extends PhysicalOper
log.error("Input not ready. Error on reading from input. "
+ e.getMessage());
}
- if (res != null)
- return (Integer) res.result;
- else
- return 0;
+ if (res != null) {
+ return (Integer) res.result;
+ } else {
+ return 0;
+ }
}
}
@@ -266,21 +255,21 @@ public class POSort extends PhysicalOper
@Override
public Result getNext(Tuple t) throws ExecException {
Result res = new Result();
-
+
if (!inputsAccumulated) {
- res = processInput();
+ res = processInput();
// by default, we create InternalSortedBag, unless user configures
// explicitly to use old bag
String bagType = null;
if (PigMapReduce.sJobConf != null) {
- bagType = PigMapReduce.sJobConf.get("pig.cachedbag.sort.type");
- }
- if (bagType != null && bagType.equalsIgnoreCase("default")) {
- sortedBag = BagFactory.getInstance().newSortedBag(mComparator);
+ bagType = PigMapReduce.sJobConf.get("pig.cachedbag.sort.type");
+ }
+ if (bagType != null && bagType.equalsIgnoreCase("default")) {
+ sortedBag = BagFactory.getInstance().newSortedBag(mComparator);
} else {
sortedBag = new InternalSortedBag(3, mComparator);
}
-
+
while (res.returnStatus != POStatus.STATUS_EOP) {
if (res.returnStatus == POStatus.STATUS_ERR) {
log.error("Error in reading from the inputs");
@@ -357,17 +346,17 @@ public class POSort extends PhysicalOper
public List<Boolean> getMAscCols() {
return mAscCols;
}
-
+
public void setLimit(long l)
{
limit = l;
}
-
+
public long getLimit()
{
return limit;
}
-
+
public boolean isLimited()
{
return (limit!=-1);
@@ -389,7 +378,7 @@ public class POSort extends PhysicalOper
cloneFunc = mSortFunc.clone();
}
// Don't set inputs as PhysicalPlan.clone will take care of that
- return new POSort(new OperatorKey(mKey.scope,
+ return new POSort(new OperatorKey(mKey.scope,
NodeIdGenerator.getGenerator().getNextNodeId(mKey.scope)),
requestedParallelism, null, clonePlans, cloneAsc, cloneFunc);
}
Modified: pig/trunk/src/org/apache/pig/data/DataType.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/data/DataType.java?rev=1052127&r1=1052126&r2=1052127&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/data/DataType.java (original)
+++ pig/trunk/src/org/apache/pig/data/DataType.java Thu Dec 23 01:33:44 2010
@@ -54,7 +54,7 @@ public class DataType {
// IMPORTANT! Order matters here, as compare() below uses the order to
// order unlike datatypes. Don't change this ordering.
// Spaced unevenly to leave room for new entries without changing
- // values or creating order issues.
+ // values or creating order issues.
public static final byte UNKNOWN = 0;
public static final byte NULL = 1;
public static final byte BOOLEAN = 5; // internal use only
@@ -72,14 +72,14 @@ public class DataType {
public static final byte MAP = 100;
public static final byte TUPLE = 110;
public static final byte BAG = 120;
-
+
/**
- * Internal use only; used to store WriteableComparable objects
+ * Internal use only; used to store WriteableComparable objects
* for creating ordered index in MergeJoin. Expecting a object that
* implements Writable interface and has default constructor
*/
- public static final byte GENERIC_WRITABLECOMPARABLE = 123;
-
+ public static final byte GENERIC_WRITABLECOMPARABLE = 123;
+
/**
* Internal use only.
*/
@@ -92,23 +92,38 @@ public class DataType {
* @return byte code of the type, or ERROR if we don't know.
*/
public static byte findType(Object o) {
- if (o == null) return NULL;
+ if (o == null) {
+ return NULL;
+ }
// Try to put the most common first
- if (o instanceof DataByteArray) return BYTEARRAY;
- else if (o instanceof String) return CHARARRAY;
- else if (o instanceof Tuple) return TUPLE;
- else if (o instanceof DataBag) return BAG;
- else if (o instanceof Integer) return INTEGER;
- else if (o instanceof Long) return LONG;
- else if (o instanceof InternalMap) return INTERNALMAP;
- else if (o instanceof Map) return MAP;
- else if (o instanceof Float) return FLOAT;
- else if (o instanceof Double) return DOUBLE;
- else if (o instanceof Boolean) return BOOLEAN;
- else if (o instanceof Byte) return BYTE;
- else if (o instanceof WritableComparable) return GENERIC_WRITABLECOMPARABLE;
- else {return ERROR;}
+ if (o instanceof DataByteArray) {
+ return BYTEARRAY;
+ } else if (o instanceof String) {
+ return CHARARRAY;
+ } else if (o instanceof Tuple) {
+ return TUPLE;
+ } else if (o instanceof DataBag) {
+ return BAG;
+ } else if (o instanceof Integer) {
+ return INTEGER;
+ } else if (o instanceof Long) {
+ return LONG;
+ } else if (o instanceof InternalMap) {
+ return INTERNALMAP;
+ } else if (o instanceof Map) {
+ return MAP;
+ } else if (o instanceof Float) {
+ return FLOAT;
+ } else if (o instanceof Double) {
+ return DOUBLE;
+ } else if (o instanceof Boolean) {
+ return BOOLEAN;
+ } else if (o instanceof Byte) {
+ return BYTE;
+ } else if (o instanceof WritableComparable) {
+ return GENERIC_WRITABLECOMPARABLE;
+ } else {return ERROR;}
}
/**
@@ -118,19 +133,30 @@ public class DataType {
* @return byte code of the type, or ERROR if we don't know.
*/
public static byte findType(Type t) {
- if (t == null) return NULL;
+ if (t == null) {
+ return NULL;
+ }
// Try to put the most common first
- if (t == DataByteArray.class) return BYTEARRAY;
- else if (t == String.class) return CHARARRAY;
- else if (t == Integer.class) return INTEGER;
- else if (t == Long.class) return LONG;
- else if (t == Float.class) return FLOAT;
- else if (t == Double.class) return DOUBLE;
- else if (t == Boolean.class) return BOOLEAN;
- else if (t == Byte.class) return BYTE;
- else if (t == InternalMap.class) return INTERNALMAP;
- else {
+ if (t == DataByteArray.class) {
+ return BYTEARRAY;
+ } else if (t == String.class) {
+ return CHARARRAY;
+ } else if (t == Integer.class) {
+ return INTEGER;
+ } else if (t == Long.class) {
+ return LONG;
+ } else if (t == Float.class) {
+ return FLOAT;
+ } else if (t == Double.class) {
+ return DOUBLE;
+ } else if (t == Boolean.class) {
+ return BOOLEAN;
+ } else if (t == Byte.class) {
+ return BYTE;
+ } else if (t == InternalMap.class) {
+ return INTERNALMAP;
+ } else {
// Might be a tuple or a bag, need to check the interfaces it
// implements
if (t instanceof Class) {
@@ -170,12 +196,13 @@ public class DataType {
matchedWritableComparable = true;
}
}
- if(matchedWritableComparable)
- return GENERIC_WRITABLECOMPARABLE;
-
+ if(matchedWritableComparable) {
+ return GENERIC_WRITABLECOMPARABLE;
+ }
+
return ERROR;
}
-
+
/**
* Return the number of types Pig knows about.
* @return number of types
@@ -190,23 +217,23 @@ public class DataType {
* @return byte array with an entry for each type.
*/
public static byte[] genAllTypes(){
- byte[] types = { DataType.BAG, DataType.BIGCHARARRAY, DataType.BOOLEAN, DataType.BYTE, DataType.BYTEARRAY,
- DataType.CHARARRAY, DataType.DOUBLE, DataType.FLOAT,
+ byte[] types = { DataType.BAG, DataType.BIGCHARARRAY, DataType.BOOLEAN, DataType.BYTE, DataType.BYTEARRAY,
+ DataType.CHARARRAY, DataType.DOUBLE, DataType.FLOAT,
DataType.GENERIC_WRITABLECOMPARABLE,
- DataType.INTEGER, DataType.INTERNALMAP,
+ DataType.INTEGER, DataType.INTERNALMAP,
DataType.LONG, DataType.MAP, DataType.TUPLE};
return types;
}
-
+
private static String[] genAllTypeNames(){
- String[] names = { "BAG", "BIGCHARARRAY", "BOOLEAN", "BYTE", "BYTEARRAY",
- "CHARARRAY", "DOUBLE", "FLOAT",
+ String[] names = { "BAG", "BIGCHARARRAY", "BOOLEAN", "BYTE", "BYTEARRAY",
+ "CHARARRAY", "DOUBLE", "FLOAT",
"GENERIC_WRITABLECOMPARABLE",
"INTEGER","INTERNALMAP",
"LONG", "MAP", "TUPLE" };
return names;
}
-
+
/**
* Get a map of type values to type names.
* @return map
@@ -243,7 +270,7 @@ public class DataType {
public static String findTypeName(Object o) {
return findTypeName(findType(o));
}
-
+
/**
* Get the type name from the type byte code
* @param dt Type byte code
@@ -333,7 +360,7 @@ public class DataType {
* @return true if the type can have a valid schema (i.e., bag or tuple)
*/
public static boolean isSchemaType(byte dataType) {
- return ((dataType == BAG) || (dataType == TUPLE));
+ return ((dataType == BAG) || (dataType == TUPLE));
}
/**
@@ -357,10 +384,10 @@ public class DataType {
}
/**
- * Same as {@link #compare(Object, Object)}, but does not use reflection to determine the type
+ * Same as {@link #compare(Object, Object)}, but does not use reflection to determine the type
* of passed in objects, relying instead on the caller to provide the appropriate values, as
* determined by {@link DataType#findType(Object)}.
- *
+ *
* Use this version in cases where multiple objects of the same type have to be repeatedly compared.
* @param o1 first object
* @param o2 second object
@@ -394,7 +421,7 @@ public class DataType {
return ((Double)o1).compareTo((Double)o2);
case BYTEARRAY:
- return ((DataByteArray)o1).compareTo((DataByteArray)o2);
+ return ((DataByteArray)o1).compareTo(o2);
case CHARARRAY:
return ((String)o1).compareTo((String)o2);
@@ -406,7 +433,7 @@ public class DataType {
int sz2 = m2.size();
if (sz1 < sz2) {
return -1;
- } else if (sz1 > sz2) {
+ } else if (sz1 > sz2) {
return 1;
} else {
// This is bad, but we have to sort the keys of the maps in order
@@ -428,7 +455,7 @@ public class DataType {
if (c != 0) {
return c;
}
- }
+ }
}
return 0;
}
@@ -439,13 +466,13 @@ public class DataType {
case INTERNALMAP:
return -1; // Don't think anyway will want to do this.
-
+
case TUPLE:
- return ((Tuple)o1).compareTo((Tuple)o2);
+ return ((Tuple)o1).compareTo(o2);
case BAG:
- return ((DataBag)o1).compareTo((DataBag)o2);
-
+ return ((DataBag)o1).compareTo(o2);
+
default:
throw new RuntimeException("Unkown type " + dt1 +
@@ -457,7 +484,7 @@ public class DataType {
return 1;
}
}
-
+
public static byte[] toBytes(Object o) throws ExecException {
return toBytes(o, findType(o));
}
@@ -513,8 +540,11 @@ public class DataType {
try {
switch (type) {
case BOOLEAN:
- if (((Boolean)o) == true) return Integer.valueOf(1);
- else return Integer.valueOf(0);
+ if (((Boolean)o) == true) {
+ return Integer.valueOf(1);
+ } else {
+ return Integer.valueOf(0);
+ }
case BYTE:
return Integer.valueOf(((Byte)o).intValue());
@@ -573,7 +603,7 @@ public class DataType {
* forced to an Integer. This isn't particularly efficient, so if you
* already <b>know</b> that the object you have is an Integer you
* should just cast it. Unlike {@link #toInteger(Object, byte)} this
- * method will first determine the type of o and then do the cast.
+ * method will first determine the type of o and then do the cast.
* Use {@link #toInteger(Object, byte)} if you already know the type.
* @param o object to cast
* @return The object as an Integer.
@@ -599,8 +629,11 @@ public class DataType {
try {
switch (type) {
case BOOLEAN:
- if (((Boolean)o) == true) return Long.valueOf(1);
- else return Long.valueOf(0);
+ if (((Boolean)o) == true) {
+ return Long.valueOf(1);
+ } else {
+ return Long.valueOf(0);
+ }
case BYTE:
return Long.valueOf(((Byte)o).longValue());
@@ -660,7 +693,7 @@ public class DataType {
* forced to an Long. This isn't particularly efficient, so if you
* already <b>know</b> that the object you have is a Long you
* should just cast it. Unlike {@link #toLong(Object, byte)} this
- * method will first determine the type of o and then do the cast.
+ * method will first determine the type of o and then do the cast.
* Use {@link #toLong(Object, byte)} if you already know the type.
* @param o object to cast
* @return The object as a Long.
@@ -741,7 +774,7 @@ public class DataType {
* forced to an Float. This isn't particularly efficient, so if you
* already <b>know</b> that the object you have is a Float you
* should just cast it. Unlike {@link #toFloat(Object, byte)} this
- * method will first determine the type of o and then do the cast.
+ * method will first determine the type of o and then do the cast.
* Use {@link #toFloat(Object, byte)} if you already know the type.
* @param o object to cast
* @return The object as a Float.
@@ -822,7 +855,7 @@ public class DataType {
* forced to an Double. This isn't particularly efficient, so if you
* already <b>know</b> that the object you have is a Double you
* should just cast it. Unlike {@link #toDouble(Object, byte)} this
- * method will first determine the type of o and then do the cast.
+ * method will first determine the type of o and then do the cast.
* Use {@link #toDouble(Object, byte)} if you already know the type.
* @param o object to cast
* @return The object as a Double.
@@ -901,7 +934,7 @@ public class DataType {
* forced to a String. This isn't particularly efficient, so if you
* already <b>know</b> that the object you have is a String you
* should just cast it. Unlike {@link #toString(Object, byte)} this
- * method will first determine the type of o and then do the cast.
+ * method will first determine the type of o and then do the cast.
* Use {@link #toString(Object, byte)} if you already know the type.
* @param o object to cast
* @return The object as a String.
@@ -922,7 +955,9 @@ public class DataType {
*/
@SuppressWarnings("unchecked")
public static Map<String, Object> toMap(Object o) throws ExecException {
- if (o == null) return null;
+ if (o == null) {
+ return null;
+ }
if (o instanceof Map && !(o instanceof InternalMap)) {
try {
@@ -950,7 +985,9 @@ public class DataType {
* @throws ExecException if the type can't be forced to a Double.
*/
public static Tuple toTuple(Object o) throws ExecException {
- if (o == null) return null;
+ if (o == null) {
+ return null;
+ }
if (o instanceof Tuple) {
try {
@@ -978,7 +1015,9 @@ public class DataType {
* @throws ExecException if the type can't be forced to a Double.
*/
public static DataBag toBag(Object o) throws ExecException {
- if (o == null) return null;
+ if (o == null) {
+ return null;
+ }
if (o instanceof DataBag) {
try {
@@ -1007,7 +1046,7 @@ public class DataType {
}
System.out.println(t.toString());
}
-
+
/**
* Determine if this type is a numeric type.
* @param t type (as byte value) to test
@@ -1020,9 +1059,9 @@ public class DataType {
case FLOAT: return true ;
case DOUBLE: return true ;
default: return false ;
- }
+ }
}
-
+
/**
* Determine if this is a type that can work can be done on.
* @param t type (as a byte value) to test
@@ -1038,7 +1077,7 @@ public class DataType {
}
/**
- * Merge types if possible. Merging types means finding a type that one
+ * Merge types if possible. Merging types means finding a type that one
* or both types can be upcast to.
* @param type1
* @param type2
@@ -1074,7 +1113,7 @@ public class DataType {
// else return just ERROR
return DataType.ERROR ;
}
-
+
/**
* Given a map, turn it into a String.
* @param m map
@@ -1110,15 +1149,23 @@ public class DataType {
* the same bytes.
*/
public static boolean equalByteArrays(byte[] lhs, byte[] rhs) {
- if(lhs == null && rhs == null) return true;
- if(lhs == null || rhs == null) return false;
- if(lhs.length != rhs.length) return false;
+ if(lhs == null && rhs == null) {
+ return true;
+ }
+ if(lhs == null || rhs == null) {
+ return false;
+ }
+ if(lhs.length != rhs.length) {
+ return false;
+ }
for(int i = 0; i < lhs.length; ++i) {
- if(lhs[i] != rhs[i]) return false;
+ if(lhs[i] != rhs[i]) {
+ return false;
+ }
}
return true;
}
-
+
/**
* Utility method that determines the schema from the passed in dataType.
@@ -1135,7 +1182,7 @@ public class DataType {
* @throws FrontendException
* @throws SchemaMergeException
*/
- private static Schema.FieldSchema determineFieldSchema(byte dataType, Iterator fieldIter,
+ private static Schema.FieldSchema determineFieldSchema(byte dataType, Iterator fieldIter,
long fieldNum, Class klass ) throws ExecException, FrontendException, SchemaMergeException {
switch (dataType) {
case NULL:
@@ -1148,14 +1195,14 @@ public class DataType {
case DOUBLE:
case BYTEARRAY:
case CHARARRAY:
- case MAP:
+ case MAP:
return new Schema.FieldSchema(null, dataType);
case TUPLE: {
Schema schema = null;
if(fieldNum != 0) {
schema = new Schema();
for(int i = 0; i < fieldNum; ++i) {
- schema.add(determineFieldSchema(klass.cast(fieldIter.next())));
+ schema.add(determineFieldSchema(klass.cast(fieldIter.next())));
}
}
return new Schema.FieldSchema(null, schema, TUPLE);
@@ -1187,7 +1234,7 @@ public class DataType {
bagSchema.setTwoLevelAccessRequired(true);
return new Schema.FieldSchema(null, bagSchema, BAG);
}
- schema = Schema.mergeSchema(schema, currSchema, false, false, false);
+ schema = Schema.mergeSchema(schema, currSchema, false, false, false);
}
Schema.FieldSchema tupleFs = new Schema.FieldSchema(null, schema, TUPLE);
bagSchema = new Schema(tupleFs);
@@ -1203,21 +1250,21 @@ public class DataType {
String msg = "Cannot determine field schema";
throw new ExecException(msg, errCode, PigException.INPUT);
}
-
+
}
}
-
+
/***
* Determine the field schema of an ResourceFieldSchema
* @param rcFieldSchema the rcFieldSchema we want translated
* @return the field schema corresponding to the object
* @throws ExecException,FrontendException,SchemaMergeException
*/
- public static Schema.FieldSchema determineFieldSchema(ResourceSchema.ResourceFieldSchema rcFieldSchema)
+ public static Schema.FieldSchema determineFieldSchema(ResourceSchema.ResourceFieldSchema rcFieldSchema)
throws ExecException, FrontendException, SchemaMergeException {
byte dt = rcFieldSchema.getType();
Iterator<ResourceSchema.ResourceFieldSchema> fieldIter = null;
- long fieldNum = 0;
+ long fieldNum = 0;
if (dt == TUPLE || dt == BAG ) {
fieldIter = Arrays.asList(rcFieldSchema.getSchema().getFields()).iterator();
fieldNum = rcFieldSchema.getSchema().getFields().length;
@@ -1225,14 +1272,14 @@ public class DataType {
return determineFieldSchema(dt, fieldIter, fieldNum, ResourceSchema.ResourceFieldSchema.class);
}
-
+
/***
* Determine the field schema of an object
* @param o the object whose field schema is to be determined
* @return the field schema corresponding to the object
* @throws ExecException,FrontendException,SchemaMergeException
*/
- public static Schema.FieldSchema determineFieldSchema(Object o)
+ public static Schema.FieldSchema determineFieldSchema(Object o)
throws ExecException, FrontendException, SchemaMergeException {
byte dt = findType(o);
Iterator fieldIter = null;