You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by ol...@apache.org on 2009/01/22 23:01:51 UTC

svn commit: r736817 - in /hadoop/pig/trunk: ./ src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/ src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/ src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relational...

Author: olga
Date: Thu Jan 22 14:01:50 2009
New Revision: 736817

URL: http://svn.apache.org/viewvc?rev=736817&view=rev
Log:
PIG-628: misc perf improvements

Modified:
    hadoop/pig/trunk/CHANGES.txt
    hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java
    hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigCombiner.java
    hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigMapReduce.java
    hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/LogToPhyTranslationVisitor.java
    hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POCombinerPackage.java
    hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POFRJoin.java
    hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POLocalRearrange.java
    hadoop/pig/trunk/src/org/apache/pig/backend/local/executionengine/physicalLayer/LocalLogToPhyTranslationVisitor.java
    hadoop/pig/trunk/src/org/apache/pig/data/DataReaderWriter.java
    hadoop/pig/trunk/src/org/apache/pig/data/DefaultTuple.java

Modified: hadoop/pig/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/CHANGES.txt?rev=736817&r1=736816&r2=736817&view=diff
==============================================================================
--- hadoop/pig/trunk/CHANGES.txt (original)
+++ hadoop/pig/trunk/CHANGES.txt Thu Jan 22 14:01:50 2009
@@ -370,3 +370,5 @@
     PIG-623: Fix spelling errors in output messages (tomwhite via sms)
 
     PIG-622: Include pig executable in distribution (tomwhite via sms)
+
+    PIG-628: misc performance improvements (pradeepk via olgan)

Modified: hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java?rev=736817&r1=736816&r2=736817&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java Thu Jan 22 14:01:50 2009
@@ -38,6 +38,7 @@
 import org.apache.pig.impl.builtin.RandomSampleLoader;
 import org.apache.pig.impl.io.FileLocalizer;
 import org.apache.pig.impl.io.FileSpec;
+import org.apache.pig.backend.executionengine.ExecException;
 import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.plans.MROperPlan;
 import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.plans.MROpPlanVisitor;
 import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.plans.UDFFinder;
@@ -652,7 +653,11 @@
         eps.add(ep);
         
         POLocalRearrange lr = new POLocalRearrange(new OperatorKey(scope,nig.getNextNodeId(scope)));
-        lr.setIndex(0);
+        try {
+            lr.setIndex(0);
+        } catch (ExecException e) {
+            throw new PlanException("Unable to set index on the newly created POLocalRearrange.", e);
+        }
         lr.setKeyType(DataType.TUPLE);
         lr.setPlans(eps);
         lr.setResultType(DataType.TUPLE);
@@ -1019,7 +1024,11 @@
         }
         
         POLocalRearrange lr = new POLocalRearrange(new OperatorKey(scope,nig.getNextNodeId(scope)));
-        lr.setIndex(0);
+        try {
+            lr.setIndex(0);
+        } catch (ExecException e) {
+            throw new PlanException("Unable to set index on newly create POLocalRearrange.", e);
+        }
         lr.setKeyType((fields == null || fields.length>1) ? DataType.TUPLE :
             keyType);
         lr.setPlans(eps1);
@@ -1060,7 +1069,11 @@
             eps_c2.addAll(sort.getSortPlans());
         
 	        POLocalRearrange lr_c2 = new POLocalRearrange(new OperatorKey(scope,nig.getNextNodeId(scope)));
-	        lr_c2.setIndex(0);
+	        try {
+                lr_c2.setIndex(0);
+            } catch (ExecException e) {
+                throw new PlanException("Unable to set index on newly created POLocalRearrange.", e);
+            }
 	        lr_c2.setKeyType((fields.length>1) ? DataType.TUPLE : keyType);
 	        lr_c2.setPlans(eps_c2);
 	        lr_c2.setResultType(DataType.TUPLE);
@@ -1149,7 +1162,11 @@
         eps.add(ep1);
         
         POLocalRearrange lr = new POLocalRearrange(new OperatorKey(scope,nig.getNextNodeId(scope)));
-        lr.setIndex(0);
+        try {
+            lr.setIndex(0);
+        } catch (ExecException e) {
+            throw new PlanException("Unable to set index on newly created POLocalRearrange.", e);
+        }
         lr.setKeyType(DataType.CHARARRAY);
         lr.setPlans(eps);
         lr.setResultType(DataType.TUPLE);

Modified: hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigCombiner.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigCombiner.java?rev=736817&r1=736816&r2=736817&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigCombiner.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigCombiner.java Thu Jan 22 14:01:50 2009
@@ -75,6 +75,9 @@
         
         ProgressableReporter pigReporter;
         
+        PhysicalOperator[] roots;
+        PhysicalOperator leaf;
+        
         /**
          * Configures the Reduce plan, the POPackage operator
          * and the reporter thread
@@ -102,6 +105,10 @@
                 long sleepTime = jConf.getLong("pig.reporter.sleep.time", 10000);
 
                 pigReporter = new ProgressableReporter();
+                if(!(cp.isEmpty())) {
+                    roots = cp.getRoots().toArray(new PhysicalOperator[1]);
+                    leaf = cp.getLeaves().get(0);
+                }
             } catch (IOException e) {
                 log.error(e.getMessage() + "was caused by:");
                 log.error(e.getCause().getMessage());
@@ -157,11 +164,9 @@
                         return false;
                     }
                     
-                    cp.attachInput(packRes);
-
-                    List<PhysicalOperator> leaves = cp.getLeaves();
-
-                    PhysicalOperator leaf = leaves.get(0);
+                    for (int i = 0; i < roots.length; i++) {
+                        roots[i].attachInput(packRes);
+                    }
                     while(true){
                         Result redRes = leaf.getNext(DUMMYTUPLE);
                         

Modified: hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigMapReduce.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigMapReduce.java?rev=736817&r1=736816&r2=736817&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigMapReduce.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigMapReduce.java Thu Jan 22 14:01:50 2009
@@ -154,6 +154,8 @@
 
         protected boolean errorInReduce = false;
         
+        PhysicalOperator[] roots;
+        private PhysicalOperator leaf;
         /**
          * Configures the Reduce plan, the POPackage operator
          * and the reporter thread
@@ -180,6 +182,10 @@
                 long sleepTime = jConf.getLong("pig.reporter.sleep.time", 10000);
 
                 pigReporter = new ProgressableReporter();
+                if(!(rp.isEmpty())) {
+                    roots = rp.getRoots().toArray(new PhysicalOperator[1]);
+                    leaf = rp.getLeaves().get(0);
+                }
             } catch (IOException e) {
                 log.error(e.getMessage() + "was caused by:");
                 log.error(e.getCause().getMessage());
@@ -238,12 +244,9 @@
                         oc.collect(null, packRes);
                         return false;
                     }
-                    
-                    rp.attachInput(packRes);
-
-                    List<PhysicalOperator> leaves = rp.getLeaves();
-                    
-                    PhysicalOperator leaf = leaves.get(0);
+                    for (int i = 0; i < roots.length; i++) {
+                        roots[i].attachInput(packRes);
+                    }
                     runPipeline(leaf);
                     
                 }
@@ -336,8 +339,6 @@
                 // This will result in nothing happening in the case
                 // where there is no stream in the pipeline
                 rp.endOfAllInput = true;
-                List<PhysicalOperator> leaves = rp.getLeaves();
-                PhysicalOperator leaf = leaves.get(0);
                 try {
                     runPipeline(leaf);
                 } catch (ExecException e) {

Modified: hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/LogToPhyTranslationVisitor.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/LogToPhyTranslationVisitor.java?rev=736817&r1=736816&r2=736817&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/LogToPhyTranslationVisitor.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/LogToPhyTranslationVisitor.java Thu Jan 22 14:01:50 2009
@@ -594,6 +594,8 @@
             log.error("Invalid physical operators in the physical plan"
                     + e1.getMessage());
             throw new VisitorException(e1);
+        } catch (ExecException e) {
+            throw new VisitorException(e);
         }
         
         poPackage.setKeyType(DataType.TUPLE);
@@ -674,7 +676,11 @@
             }
             currentPlan = currentPlans.pop();
             physOp.setPlans(exprPlans);
-            physOp.setIndex(count++);
+            try {
+                physOp.setIndex(count++);
+            } catch (ExecException e1) {
+                throw new VisitorException(e1);
+            }
             if (plans.size() > 1) {
                 type = DataType.TUPLE;
                 physOp.setKeyType(type);
@@ -743,8 +749,13 @@
                 keyTypes.add(exprPlans.get(0).getLeaves().get(0).getResultType());
             }
         }
-        POFRJoin pfrj = new POFRJoin(new OperatorKey(scope,nodeGen.getNextNodeId(scope)),frj.getRequestedParallelism(),
-                                    inp, ppLists, keyTypes, null, fragment);
+        POFRJoin pfrj;
+        try {
+            pfrj = new POFRJoin(new OperatorKey(scope,nodeGen.getNextNodeId(scope)),frj.getRequestedParallelism(),
+                                        inp, ppLists, keyTypes, null, fragment);
+        } catch (ExecException e1) {
+            throw new VisitorException(e1);
+        }
         pfrj.setResultType(DataType.TUPLE);
         currentPlan.add(pfrj);
         for (LogicalOperator op : inputs) {

Modified: hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POCombinerPackage.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POCombinerPackage.java?rev=736817&r1=736816&r2=736817&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POCombinerPackage.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POCombinerPackage.java Thu Jan 22 14:01:50 2009
@@ -57,6 +57,8 @@
     private static TupleFactory mTupleFactory = TupleFactory.getInstance();
 
     private boolean[] mBags; // For each field, indicates whether or not it
+
+    private Map<Integer, Integer> keyLookup;
                              // needs to be put in a bag.
 
     /**
@@ -84,6 +86,22 @@
     public String name() {
         return "PostCombinerPackage" + "[" + DataType.findTypeName(resultType) + "]" + "{" + DataType.findTypeName(keyType) + "}" +" - " + mKey.toString();
     }
+    
+    /**
+     * @param keyInfo the keyInfo to set
+     */
+    public void setKeyInfo(Map<Integer, Pair<Boolean, Map<Integer, Integer>>> keyInfo) {
+        this.keyInfo = keyInfo;
+        // TODO: IMPORTANT ASSUMPTION: Currently we only combine in the
+        // group case and not in cogroups. So there should only
+        // be one LocalRearrange from which we get the keyInfo for
+        // which field in the value is in the key. This LocalRearrange
+        // has an index of 0. When we do support combiner in Cogroups
+        // THIS WILL NEED TO BE REVISITED.
+        Pair<Boolean, Map<Integer, Integer>> lrKeyInfo =
+            keyInfo.get(0); // assumption: only group are "combinable", hence index 0
+        keyLookup = lrKeyInfo.second;
+    }
 
     @Override
     public Result getNext(Tuple t) throws ExecException {
@@ -100,15 +118,7 @@
         while (tupIter.hasNext()) {
             NullableTuple ntup = tupIter.next();
             Tuple tup = (Tuple)ntup.getValueAsPigType();
-            // TODO: IMPORTANT ASSUMPTION: Currently we only combine in the
-            // group case and not in cogroups. So there should only
-            // be one LocalRearrange from which we get the keyInfo for
-            // which field in the value is in the key. This LocalRearrange
-            // has an index of -1. When we do support combiner in Cogroups
-            // THIS WILL NEED TO BE REVISITED.
-            Pair<Boolean, Map<Integer, Integer>> lrKeyInfo =
-                keyInfo.get(0); // assumption: only group are "combinable", hence index 0
-            Map<Integer, Integer> keyLookup = lrKeyInfo.second;
+            
             int tupIndex = 0; // an index for accessing elements from 
                               // the value (tup) that we have currently
             for(int i = 0; i < mBags.length; i++) {

Modified: hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POFRJoin.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POFRJoin.java?rev=736817&r1=736816&r2=736817&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POFRJoin.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POFRJoin.java Thu Jan 22 14:01:50 2009
@@ -92,23 +92,23 @@
     private transient BagFactory mBagFactory;
     private boolean setUp;
     
-    public POFRJoin(OperatorKey k) throws PlanException {
+    public POFRJoin(OperatorKey k) throws PlanException, ExecException {
         this(k,-1,null, null, null, null, -1);
     }
 
-    public POFRJoin(OperatorKey k, int rp) throws PlanException {
+    public POFRJoin(OperatorKey k, int rp) throws PlanException, ExecException {
         this(k, rp, null, null, null, null, -1);
     }
 
-    public POFRJoin(OperatorKey k, List<PhysicalOperator> inp) throws PlanException {
+    public POFRJoin(OperatorKey k, List<PhysicalOperator> inp) throws PlanException, ExecException {
         this(k, -1, inp, null, null, null, -1);
     }
 
-    public POFRJoin(OperatorKey k, int rp, List<PhysicalOperator> inp) throws PlanException {
+    public POFRJoin(OperatorKey k, int rp, List<PhysicalOperator> inp) throws PlanException, ExecException {
         this(k,rp,inp,null, null, null, -1);
     }
     
-    public POFRJoin(OperatorKey k, int rp, List<PhysicalOperator> inp, List<List<PhysicalPlan>> ppLists, List<Byte> keyTypes, FileSpec[] replFiles, int fragment){
+    public POFRJoin(OperatorKey k, int rp, List<PhysicalOperator> inp, List<List<PhysicalPlan>> ppLists, List<Byte> keyTypes, FileSpec[] replFiles, int fragment) throws ExecException{
         super(k,rp,inp);
         
         phyPlanLists = ppLists;
@@ -135,8 +135,9 @@
     /**
      * Configures the Local Rearrange operators & the foreach operator
      * @param old
+     * @throws ExecException 
      */
-    private void createJoinPlans(OperatorKey old){
+    private void createJoinPlans(OperatorKey old) throws ExecException{
         List<PhysicalPlan> fePlans = new ArrayList<PhysicalPlan>();
         List<Boolean> flatList = new ArrayList<Boolean>();
         

Modified: hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POLocalRearrange.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POLocalRearrange.java?rev=736817&r1=736816&r2=736817&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POLocalRearrange.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POLocalRearrange.java Thu Jan 22 14:01:50 2009
@@ -104,6 +104,8 @@
     private ArrayList<Integer> minValuePositions;
     private int minValuePositionsSize = 0;
 
+    private Tuple lrOutput;
+    
     public POLocalRearrange(OperatorKey k) {
         this(k, -1, null);
     }
@@ -121,6 +123,7 @@
         index = -1;
         leafOps = new ArrayList<ExpressionOperator>();
         mProjectedColsMap = new HashMap<Integer, Integer>();
+        lrOutput = mTupleFactory.newTuple(3);
     }
 
     @Override
@@ -149,13 +152,14 @@
         return index;
     }
 
-    public void setIndex(int index) {
+    public void setIndex(int index) throws ExecException {
         if (index > 0x40) {
             throw new RuntimeException("Cogroups with more than 127 inputs "
                 + " not supported.");
         } else {
             this.index = (byte)index;
         }
+        lrOutput.set(0, new Byte(this.index));
     }
 
     public boolean isDistinct() { 
@@ -258,31 +262,27 @@
             key = resLst.get(0).result;
         }
         
-        Tuple output = mTupleFactory.newTuple(3);
         if (mIsDistinct) {
 
             //Put the key and the indexed tuple
             //in a tuple and return
-            output.set(0, new Byte((byte)0));
-            output.set(1, key);
-            output.set(2, mFakeTuple);
-            return output;
+            lrOutput.set(1, key);
+            lrOutput.set(2, mFakeTuple);
+            return lrOutput;
         } else if(isCross){
         
             for(int i=0;i<plans.size();i++)
                 value.getAll().remove(0);
             //Put the index, key, and value
             //in a tuple and return
-            output.set(0, new Byte(index));
-            output.set(1, key);
-            output.set(2, value);
-            return output;
+            lrOutput.set(1, key);
+            lrOutput.set(2, value);
+            return lrOutput;
         } else {
 
             //Put the index, key, and value
             //in a tuple and return
-            output.set(0, new Byte(index));
-            output.set(1, key);
+            lrOutput.set(1, key);
             
             // strip off the columns in the "value" which 
             // are present in the "key"
@@ -321,17 +321,17 @@
                     minimalValue = mTupleFactory.newTuple();
     
                 }
-                output.set(2, minimalValue);
+                lrOutput.set(2, minimalValue);
             
             } else {
             
                 // there were no columns in the "key"
                 // which we can strip off from the "value"
                 // so just send the value we got
-                output.set(2, value);
+                lrOutput.set(2, value);
                 
             }
-            return output;
+            return lrOutput;
         }
     }
 
@@ -417,6 +417,13 @@
         clone.setPlans(clonePlans);
         clone.keyType = keyType;
         clone.index = index;
+        try {
+            clone.lrOutput.set(0, index);
+        } catch (ExecException e) {
+            CloneNotSupportedException cnse = new CloneNotSupportedException();
+            cnse.initCause(e);
+            throw cnse;
+        }
         // Needs to be called as setDistinct so that the fake index tuple gets
         // created.
         clone.setDistinct(mIsDistinct);

Modified: hadoop/pig/trunk/src/org/apache/pig/backend/local/executionengine/physicalLayer/LocalLogToPhyTranslationVisitor.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/local/executionengine/physicalLayer/LocalLogToPhyTranslationVisitor.java?rev=736817&r1=736816&r2=736817&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/backend/local/executionengine/physicalLayer/LocalLogToPhyTranslationVisitor.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/backend/local/executionengine/physicalLayer/LocalLogToPhyTranslationVisitor.java Thu Jan 22 14:01:50 2009
@@ -23,6 +23,7 @@
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.pig.backend.executionengine.ExecException;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.LogToPhyTranslationVisitor;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
@@ -90,7 +91,11 @@
             }
             currentPlan = currentPlans.pop();
             physOp.setPlans(exprPlans);
-            physOp.setIndex(count++);
+            try {
+                physOp.setIndex(count++);
+            } catch (ExecException e1) {
+                throw new VisitorException(e1);
+            }
             if (plans.size() > 1) {
                 type = DataType.TUPLE;
                 physOp.setKeyType(type);

Modified: hadoop/pig/trunk/src/org/apache/pig/data/DataReaderWriter.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/data/DataReaderWriter.java?rev=736817&r1=736816&r2=736817&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/data/DataReaderWriter.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/data/DataReaderWriter.java Thu Jan 22 14:01:50 2009
@@ -42,11 +42,6 @@
         switch (b) {
             case DataType.TUPLE: {
                 
-                // check if it is a null tuple
-                byte nullMarker = in.readByte();
-                if(nullMarker == Tuple.NULL) {
-                    return null;
-                }
                 // Don't use Tuple.readFields, because it requires you to
                 // create a tuple with no size and then append fields.
                 // That's less efficient than allocating the tuple size up

Modified: hadoop/pig/trunk/src/org/apache/pig/data/DefaultTuple.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/data/DefaultTuple.java?rev=736817&r1=736816&r2=736817&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/data/DefaultTuple.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/data/DefaultTuple.java Thu Jan 22 14:01:50 2009
@@ -284,20 +284,10 @@
 
     public void write(DataOutput out) throws IOException {
         out.writeByte(DataType.TUPLE);
-        if(isNull == true) {
-            out.writeByte(NULL);            
-        } else {
-            out.writeByte(NOTNULL);
-            int sz = size();
-            out.writeInt(sz);
-            for (int i = 0; i < sz; i++) {
-                try {
-                    Object d = get(i);
-                } catch (ExecException ee) {
-                    throw new RuntimeException(ee);
-                }
-                DataReaderWriter.writeDatum(out, mFields.get(i));
-            }
+        int sz = size();
+        out.writeInt(sz);
+        for (int i = 0; i < sz; i++) {
+            DataReaderWriter.writeDatum(out, mFields.get(i));
         }
     }
 
@@ -311,20 +301,13 @@
             throw new IOException("Unexpected data while reading tuple " +
                 "from binary file");
         }
-        byte nullMarker = in.readByte();
-        if(nullMarker == NULL) {
-            isNull = true;
-            return;
-        } else {
-            isNull = false;
-            // Read the number of fields
-            int sz = in.readInt();
-            for (int i = 0; i < sz; i++) {
-                try {
-                    append(DataReaderWriter.readDatum(in));
-                } catch (ExecException ee) {
-                    throw new RuntimeException(ee);
-                }
+        // Read the number of fields
+        int sz = in.readInt();
+        for (int i = 0; i < sz; i++) {
+            try {
+                append(DataReaderWriter.readDatum(in));
+            } catch (ExecException ee) {
+                throw new RuntimeException(ee);
             }
         }
     }