You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by ga...@apache.org on 2009/05/11 22:46:57 UTC

svn commit: r773683 - in /hadoop/pig/trunk: ./ src/org/apache/pig/backend/executionengine/ src/org/apache/pig/backend/hadoop/executionengine/ src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/ src/org/apache/pig/backend/local/executionen...

Author: gates
Date: Mon May 11 20:46:56 2009
New Revision: 773683

URL: http://svn.apache.org/viewvc?rev=773683&view=rev
Log:
PIG-626: Add access to hadoop counters.


Added:
    hadoop/pig/trunk/src/org/apache/pig/backend/local/executionengine/physicalLayer/counters/
    hadoop/pig/trunk/src/org/apache/pig/backend/local/executionengine/physicalLayer/counters/POCounter.java
    hadoop/pig/trunk/src/org/apache/pig/tools/pigstats/
    hadoop/pig/trunk/src/org/apache/pig/tools/pigstats/PigStats.java
    hadoop/pig/trunk/test/org/apache/pig/test/TestCounters.java
Modified:
    hadoop/pig/trunk/CHANGES.txt
    hadoop/pig/trunk/src/org/apache/pig/backend/executionengine/ExecJob.java
    hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/HExecutionEngine.java
    hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/HJob.java
    hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/Launcher.java
    hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceLauncher.java
    hadoop/pig/trunk/src/org/apache/pig/backend/local/executionengine/LocalExecutionEngine.java
    hadoop/pig/trunk/src/org/apache/pig/backend/local/executionengine/LocalJob.java
    hadoop/pig/trunk/src/org/apache/pig/backend/local/executionengine/LocalPigLauncher.java
    hadoop/pig/trunk/src/org/apache/pig/backend/local/executionengine/physicalLayer/LocalLogToPhyTranslationVisitor.java
    hadoop/pig/trunk/test/org/apache/pig/test/TestMultiQueryLocal.java
    hadoop/pig/trunk/test/org/apache/pig/test/TestStore.java
    hadoop/pig/trunk/test/org/apache/pig/test/Util.java

Modified: hadoop/pig/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/CHANGES.txt?rev=773683&r1=773682&r2=773683&view=diff
==============================================================================
--- hadoop/pig/trunk/CHANGES.txt (original)
+++ hadoop/pig/trunk/CHANGES.txt Mon May 11 20:46:56 2009
@@ -40,6 +40,8 @@
 
 PIG-701: Implement IVY for resolving pig dependencies (gkesavan)
 
+PIG-626: Add access to hadoop counters (shubhamc via gates).
+
 BUG FIXES
 
 PIG-733: Order by sampling dumps entire sample to hdfs which causes dfs

Modified: hadoop/pig/trunk/src/org/apache/pig/backend/executionengine/ExecJob.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/executionengine/ExecJob.java?rev=773683&r1=773682&r2=773683&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/backend/executionengine/ExecJob.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/backend/executionengine/ExecJob.java Mon May 11 20:46:56 2009
@@ -24,6 +24,7 @@
 import java.io.OutputStream;
 
 import org.apache.pig.data.Tuple;
+import org.apache.pig.tools.pigstats.PigStats;
 
 /**
  * Abstraction on a job that the execution engine runs. It allows the front-end to
@@ -78,7 +79,7 @@
      * 
      * @return statistics relevant to the execution engine
      */
-    public Map<String, Object> getStatistics();
+    public PigStats getStatistics();
 
     /**
      * hook for asynchronous notification of job completion pushed from the back-end

Modified: hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/HExecutionEngine.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/HExecutionEngine.java?rev=773683&r1=773682&r2=773683&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/HExecutionEngine.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/HExecutionEngine.java Mon May 11 20:46:56 2009
@@ -72,6 +72,7 @@
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStore;
 import org.apache.pig.impl.plan.VisitorException;
 import org.apache.pig.shock.SSHSocketImplFactory;
+import org.apache.pig.tools.pigstats.PigStats;
 
 
 public class HExecutionEngine implements ExecutionEngine {
@@ -258,9 +259,9 @@
             FileSpec spec = ExecTools.checkLeafIsStore(plan, pigContext);
 
             MapReduceLauncher launcher = new MapReduceLauncher();
-            boolean success = launcher.launchPig(plan, jobName, pigContext);
-            if(success)
-                return new HJob(ExecJob.JOB_STATUS.COMPLETED, pigContext, spec);
+            PigStats stats = launcher.launchPig(plan, jobName, pigContext);
+            if(stats != null)
+                return new HJob(ExecJob.JOB_STATUS.COMPLETED, pigContext, spec, stats);
             else
                 return new HJob(ExecJob.JOB_STATUS.FAILED, pigContext, null);
 

Modified: hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/HJob.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/HJob.java?rev=773683&r1=773682&r2=773683&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/HJob.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/HJob.java Mon May 11 20:46:56 2009
@@ -35,6 +35,7 @@
 import org.apache.pig.PigException;
 import org.apache.pig.impl.io.FileLocalizer;
 import org.apache.pig.impl.io.BufferedPositionedInputStream;
+import org.apache.pig.tools.pigstats.PigStats;
 
 
 public class HJob implements ExecJob {
@@ -44,6 +45,7 @@
     protected JOB_STATUS status;
     protected PigContext pigContext;
     protected FileSpec outFileSpec;
+    private PigStats stats;
     
     public HJob(JOB_STATUS status,
                 PigContext pigContext,
@@ -53,6 +55,16 @@
         this.outFileSpec = outFileSpec;
     }
     
+    public HJob(JOB_STATUS status,
+            PigContext pigContext,
+            FileSpec outFileSpec,
+            PigStats stats) {
+        this.status = status;
+        this.pigContext = pigContext;
+        this.outFileSpec = outFileSpec;
+        this.stats = stats;
+    }
+    
     public JOB_STATUS getStatus() {
         return status;
     }
@@ -125,8 +137,9 @@
         return props;
     }
 
-    public Map<String, Object> getStatistics() {
-        throw new UnsupportedOperationException();
+    public PigStats getStatistics() {
+        //throw new UnsupportedOperationException();
+        return stats;
     }
 
     public void completionNotification(Object cookie) {

Modified: hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/Launcher.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/Launcher.java?rev=773683&r1=773682&r2=773683&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/Launcher.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/Launcher.java Mon May 11 20:46:56 2009
@@ -48,6 +48,7 @@
 import org.apache.pig.impl.plan.PlanException;
 import org.apache.pig.impl.plan.VisitorException;
 import org.apache.pig.impl.util.LogUtils;
+import org.apache.pig.tools.pigstats.PigStats;
 
 public abstract class Launcher {
     private static final Log log = LogFactory.getLog(Launcher.class);
@@ -94,7 +95,7 @@
      * @throws ExecException
      * @throws JobCreationException
      */
-    public abstract boolean launchPig(PhysicalPlan php, String grpName, PigContext pc)
+    public abstract PigStats launchPig(PhysicalPlan php, String grpName, PigContext pc)
             throws PlanException, VisitorException, IOException, ExecException,
             JobCreationException, Exception;
 

Modified: hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceLauncher.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceLauncher.java?rev=773683&r1=773682&r2=773683&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceLauncher.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceLauncher.java Mon May 11 20:46:56 2009
@@ -56,6 +56,7 @@
 import org.apache.pig.impl.plan.CompilationMessageCollector.Message;
 import org.apache.pig.impl.plan.CompilationMessageCollector.MessageType;
 import org.apache.pig.impl.util.ConfigurationValidator;
+import org.apache.pig.tools.pigstats.PigStats;
 
 /**
  * Main class that launches pig for Map Reduce
@@ -69,7 +70,7 @@
     private boolean aggregateWarning = false;
     
     @Override
-    public boolean launchPig(PhysicalPlan php,
+    public PigStats launchPig(PhysicalPlan php,
                              String grpName,
                              PigContext pc) throws PlanException,
                                                    VisitorException,
@@ -80,6 +81,10 @@
         long sleepTime = 500;
         aggregateWarning = "true".equalsIgnoreCase(pc.getProperties().getProperty("aggregate.warning"));
         MROperPlan mrp = compile(php, pc);
+        PigStats stats = new PigStats();
+        stats.setMROperatorPlan(mrp);
+        stats.setExecType(pc.getExecType());
+        stats.setPhysicalPlan(php);
         
         ExecutionEngine exe = pc.getExecutionEngine();
         ConfigurationValidator.validatePigProperties(exe.getConfiguration());
@@ -137,6 +142,11 @@
             failedJobs.addAll(jc.getFailedJobs());
             succJobs.addAll(jc.getSuccessfulJobs());
             jcc.moveResults();
+            
+            stats.setJobClient(jobClient);
+            stats.setJobControl(jc);
+            stats.accumulateStats();
+            
             jc.stop(); 
         }
 
@@ -146,7 +156,7 @@
             for (Job fj : failedJobs) {
                 getStats(fj, jobClient, true, pc);
             }
-            return false;
+            return null;
         }
 
         Map<Enum, Long> warningAggMap = new HashMap<Enum, Long>();
@@ -163,10 +173,18 @@
         if(aggregateWarning) {
         	CompilationMessageCollector.logAggregate(warningAggMap, MessageType.Warning, log) ;
         }
+        
+        if(stats.getPigStats().get(stats.getLastJobID()) == null)
+            log
+                    .warn("Jobs not found in the JobClient. Please try to use Local, Hadoop Distributed or Hadoop MiniCluster modes instead of Hadoop LocalExecution");
+        else {
+            log.info("Records written : " + stats.getRecordsWritten());
+            log.info("Bytes written : " + stats.getBytesWritten());
+        }
 
         log.info( "100% complete");
         log.info("Success!");
-        return true;
+        return stats;
     }
 
     @Override

Modified: hadoop/pig/trunk/src/org/apache/pig/backend/local/executionengine/LocalExecutionEngine.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/local/executionengine/LocalExecutionEngine.java?rev=773683&r1=773682&r2=773683&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/backend/local/executionengine/LocalExecutionEngine.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/backend/local/executionengine/LocalExecutionEngine.java Mon May 11 20:46:56 2009
@@ -55,6 +55,8 @@
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStore;
 import org.apache.pig.backend.local.executionengine.physicalLayer.LocalLogToPhyTranslationVisitor;
 import org.apache.pig.impl.plan.VisitorException;
+import org.apache.pig.tools.pigstats.PigStats;
+
 import java.util.Iterator;
 
 public class LocalExecutionEngine implements ExecutionEngine {
@@ -160,10 +162,10 @@
             }
 
             LocalPigLauncher launcher = new LocalPigLauncher();
-            boolean success = launcher.launchPig(plan, jobName, pigContext);
-            if (success)
+            PigStats stats = launcher.launchPig(plan, jobName, pigContext);
+            if (stats != null)
                 return new LocalJob(ExecJob.JOB_STATUS.COMPLETED, pigContext,
-                        spec);
+                        spec, stats);
             else
                 return new LocalJob(ExecJob.JOB_STATUS.FAILED, pigContext, null);
         } catch (Exception e) {

Modified: hadoop/pig/trunk/src/org/apache/pig/backend/local/executionengine/LocalJob.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/local/executionengine/LocalJob.java?rev=773683&r1=773682&r2=773683&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/backend/local/executionengine/LocalJob.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/backend/local/executionengine/LocalJob.java Mon May 11 20:46:56 2009
@@ -34,6 +34,7 @@
 import org.apache.pig.LoadFunc;
 import org.apache.pig.impl.io.FileLocalizer;
 import org.apache.pig.impl.io.BufferedPositionedInputStream;
+import org.apache.pig.tools.pigstats.PigStats;
 
 
 public class LocalJob implements ExecJob {
@@ -43,6 +44,7 @@
     protected JOB_STATUS status;
     protected PigContext pigContext;
     protected FileSpec outFileSpec;
+    private PigStats stats;
     
     public LocalJob(JOB_STATUS status,
                 PigContext pigContext,
@@ -52,6 +54,16 @@
         this.outFileSpec = outFileSpec;
     }
     
+    public LocalJob(JOB_STATUS status,
+            PigContext pigContext,
+            FileSpec outFileSpec,
+            PigStats stats) {
+        this.status = status;
+        this.pigContext = pigContext;
+        this.outFileSpec = outFileSpec;
+        this.stats = stats;
+    } 
+    
     public JOB_STATUS getStatus() {
         return status;
     }
@@ -122,8 +134,9 @@
         return props;
     }
 
-    public Map<String, Object> getStatistics() {
-        throw new UnsupportedOperationException();
+    public PigStats getStatistics() {
+        //throw new UnsupportedOperationException();
+        return stats;
     }
 
     public void completionNotification(Object cookie) {

Modified: hadoop/pig/trunk/src/org/apache/pig/backend/local/executionengine/LocalPigLauncher.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/local/executionengine/LocalPigLauncher.java?rev=773683&r1=773682&r2=773683&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/backend/local/executionengine/LocalPigLauncher.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/backend/local/executionengine/LocalPigLauncher.java Mon May 11 20:46:56 2009
@@ -42,6 +42,7 @@
 import org.apache.pig.impl.plan.DependencyOrderWalker;
 import org.apache.pig.impl.plan.PlanException;
 import org.apache.pig.impl.plan.VisitorException;
+import org.apache.pig.tools.pigstats.PigStats;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.util.PlanHelper;
 
 public class LocalPigLauncher extends Launcher {
@@ -58,7 +59,7 @@
     }
 
     @Override
-    public boolean launchPig(PhysicalPlan php, String grpName, PigContext pc)
+    public PigStats launchPig(PhysicalPlan php, String grpName, PigContext pc)
             throws PlanException, VisitorException, IOException, ExecException,
             JobCreationException {
 
@@ -69,6 +70,10 @@
 
         int noJobs = stores.size();
         int failedJobs = 0;
+        
+        PigStats stats = new PigStats();
+        stats.setPhysicalPlan(php);
+        stats.setExecType(pc.getExecType());
 
         for (POStore op : stores) {
             op.setStoreImpl(new LocalPOStoreImpl(pc));
@@ -100,23 +105,26 @@
                 
         // The remaining stores can be run together.
         failedJobs += runPipeline(stores.toArray(new POStore[0]));
+        
+        stats.accumulateStats();
 
         UDFFinishVisitor finisher = new UDFFinishVisitor(php, new DependencyOrderWalker<PhysicalOperator, PhysicalPlan>(php));
         finisher.visit();
 
         if (failedJobs == 0) {
+            log.info("Records written : " + stats.getRecordsWritten());
+            log.info("Bytes written : " + stats.getBytesWritten());
             log.info("100% complete!");
             log.info("Success!!");
-            return true;
+            return stats;
         } else {
             log.info("Failed jobs!!");
             log.info(failedJobs + " out of " + noJobs + " failed!");
         }
-        return false;
+        return null;
 
     }
     
-    
 
     private int runPipeline(POStore[] leaves) throws IOException, ExecException {
         BitSet bs = new BitSet(leaves.length);

Modified: hadoop/pig/trunk/src/org/apache/pig/backend/local/executionengine/physicalLayer/LocalLogToPhyTranslationVisitor.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/local/executionengine/physicalLayer/LocalLogToPhyTranslationVisitor.java?rev=773683&r1=773682&r2=773683&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/backend/local/executionengine/physicalLayer/LocalLogToPhyTranslationVisitor.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/backend/local/executionengine/physicalLayer/LocalLogToPhyTranslationVisitor.java Mon May 11 20:46:56 2009
@@ -26,22 +26,28 @@
 import org.apache.pig.PigException;
 import org.apache.pig.backend.executionengine.ExecException;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.LogToPhyTranslationVisitor;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.LogicalToPhysicalTranslatorException;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POLocalRearrangeForIllustrate;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStore;
+import org.apache.pig.backend.local.executionengine.physicalLayer.counters.POCounter;
 import org.apache.pig.backend.local.executionengine.physicalLayer.relationalOperators.POCogroup;
 import org.apache.pig.backend.local.executionengine.physicalLayer.relationalOperators.POCross;
 import org.apache.pig.backend.local.executionengine.physicalLayer.relationalOperators.POSplit;
 import org.apache.pig.backend.local.executionengine.physicalLayer.relationalOperators.POSplitOutput;
 import org.apache.pig.backend.local.executionengine.physicalLayer.relationalOperators.POStreamLocal;
 import org.apache.pig.data.DataType;
+import org.apache.pig.impl.logicalLayer.FrontendException;
 import org.apache.pig.impl.logicalLayer.LOCogroup;
 import org.apache.pig.impl.logicalLayer.LOCross;
 import org.apache.pig.impl.logicalLayer.LOSplit;
 import org.apache.pig.impl.logicalLayer.LOSplitOutput;
+import org.apache.pig.impl.logicalLayer.LOStore;
 import org.apache.pig.impl.logicalLayer.LOStream;
 import org.apache.pig.impl.logicalLayer.LogicalOperator;
 import org.apache.pig.impl.logicalLayer.LogicalPlan;
+import org.apache.pig.impl.logicalLayer.schema.Schema;
 import org.apache.pig.impl.plan.OperatorKey;
 import org.apache.pig.impl.plan.PlanException;
 import org.apache.pig.impl.plan.PlanWalker;
@@ -215,5 +221,45 @@
         }
         //currentPlan.explain(System.out);
     }
+    
+    @Override
+    public void visit(LOStore loStore) throws VisitorException {
+        String scope = loStore.getOperatorKey().scope;
+        POStore store = new POStore(new OperatorKey(scope, nodeGen
+                .getNextNodeId(scope)));
+        store.setSFile(loStore.getOutputFile());
+        store.setInputSpec(loStore.getInputSpec());
+        try {
+            // create a new schema for ourselves so that when
+            // we serialize we are not serializing objects that
+            // contain the schema - apparently Java tries to
+            // serialize the object containing the schema if
+            // we are trying to serialize the schema reference in
+            // the containing object. The schema here will be serialized
+            // in JobControlCompiler
+            store.setSchema(new Schema(loStore.getSchema()));
+        } catch (FrontendException e1) {
+            int errorCode = 1060;
+            String message = "Cannot resolve Store output schema";  
+            throw new VisitorException(message, errorCode, PigException.BUG, e1);    
+        }
+        //store.setPc(pc);
+        currentPlan.add(store);
+        PhysicalOperator from = LogToPhyMap.get(loStore
+                .getPlan().getPredecessors(loStore).get(0));
+        
+        POCounter counter = new POCounter(new OperatorKey(scope, nodeGen.getNextNodeId(scope)));
+        currentPlan.add(counter);
+        try {
+            currentPlan.connect(from, counter);
+            currentPlan.connect(counter, store);
+        } catch (PlanException e) {
+            int errCode = 2015;
+            String msg = "Invalid physical operators in the physical plan" ;
+            throw new LogicalToPhysicalTranslatorException(msg, errCode, PigException.BUG, e);
+        }
+        LogToPhyMap.put(loStore, store);
+        
+    }
 
 }

Added: hadoop/pig/trunk/src/org/apache/pig/backend/local/executionengine/physicalLayer/counters/POCounter.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/local/executionengine/physicalLayer/counters/POCounter.java?rev=773683&view=auto
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/backend/local/executionengine/physicalLayer/counters/POCounter.java (added)
+++ hadoop/pig/trunk/src/org/apache/pig/backend/local/executionengine/physicalLayer/counters/POCounter.java Mon May 11 20:46:56 2009
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.pig.backend.local.executionengine.physicalLayer.counters;
+
+import java.util.List;
+import java.util.Map;
+
+import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.POStatus;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.Result;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhyPlanVisitor;
+import org.apache.pig.data.DataBag;
+import org.apache.pig.data.DataByteArray;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.impl.plan.OperatorKey;
+import org.apache.pig.impl.plan.VisitorException;
+
+public class POCounter extends PhysicalOperator {
+
+	static final long serialVersionUID = 1L;
+
+    private long count = 0;
+    
+    public POCounter(OperatorKey k, int rp, List<PhysicalOperator> inp) {
+        super(k, rp, inp);
+        // TODO Auto-generated constructor stub
+    }
+
+    public POCounter(OperatorKey k, int rp) {
+        super(k, rp);
+        // TODO Auto-generated constructor stub
+    }
+
+    public POCounter(OperatorKey k, List<PhysicalOperator> inp) {
+        super(k, inp);
+        // TODO Auto-generated constructor stub
+    }
+
+    public POCounter(OperatorKey k) {
+        super(k);
+        // TODO Auto-generated constructor stub
+    }
+
+    @Override
+    public void visit(PhyPlanVisitor v) throws VisitorException {
+        // TODO Auto-generated method stub
+
+    }
+
+    
+    @Override
+    public Result getNext(Tuple t) throws ExecException {
+        // TODO Auto-generated method stub
+        return getNext(processInput());
+    }
+    
+    private Result getNext(Result res) {
+        //System.out.println("Status = " + res.returnStatus);
+        if(res.returnStatus == POStatus.STATUS_OK) {
+            //System.out.println("Incrementing counter");
+            count++;
+        }
+        return res;
+    }
+
+    @Override
+    public String name() {
+        // TODO Auto-generated method stub
+        return "POCounter - " + mKey.toString();
+    }
+
+    @Override
+    public boolean supportsMultipleInputs() {
+        // TODO Auto-generated method stub
+        return false;
+    }
+
+    @Override
+    public boolean supportsMultipleOutputs() {
+        // TODO Auto-generated method stub
+        return false;
+    }
+    
+    public long getCount() {
+        return count;
+    }
+
+}

Added: hadoop/pig/trunk/src/org/apache/pig/tools/pigstats/PigStats.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/tools/pigstats/PigStats.java?rev=773683&view=auto
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/tools/pigstats/PigStats.java (added)
+++ hadoop/pig/trunk/src/org/apache/pig/tools/pigstats/PigStats.java Mon May 11 20:46:56 2009
@@ -0,0 +1,262 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.pig.tools.pigstats;
+
+import java.io.ByteArrayOutputStream;
+import java.io.File;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.io.PrintStream;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.hadoop.mapred.Counters;
+import org.apache.hadoop.mapred.JobClient;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.RunningJob;
+import org.apache.hadoop.mapred.Counters.Counter;
+import org.apache.hadoop.mapred.Counters.Group;
+import org.apache.hadoop.mapred.jobcontrol.Job;
+import org.apache.hadoop.mapred.jobcontrol.JobControl;
+import org.apache.pig.ExecType;
+import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.plans.MROperPlan;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStore;
+import org.apache.pig.backend.local.executionengine.physicalLayer.counters.POCounter;
+import org.apache.pig.impl.io.FileSpec;
+import org.apache.pig.impl.util.ObjectSerializer;
+
+public class PigStats {
+    MROperPlan mrp;
+    PhysicalPlan php;
+    JobControl jc;
+    JobClient jobClient;
+    Map<String, Map<String, String>> stats = new HashMap<String, Map<String,String>>();
+    String lastJobID;
+    ExecType mode;
+    
+    public void setMROperatorPlan(MROperPlan mrp) {
+        this.mrp = mrp;
+    }
+    
+    public void setJobControl(JobControl jc) {
+        this.jc = jc;
+    }
+    
+    public void setJobClient(JobClient jobClient) {
+        this.jobClient = jobClient;
+    }
+    
+    public String getMRPlan() {
+        ByteArrayOutputStream baos = new ByteArrayOutputStream();
+        mrp.dump(new PrintStream(baos));
+        return baos.toString();
+    }
+    
+    public void setExecType(ExecType mode) {
+        this.mode = mode;
+    }
+    
+    public void setPhysicalPlan(PhysicalPlan php) {
+        this.php = php;
+    }
+    
+    public String getPhysicalPlan() {
+        ByteArrayOutputStream baos = new ByteArrayOutputStream();
+        php.explain(baos);
+        return baos.toString();
+    }
+    
+    public Map<String, Map<String, String>> accumulateStats() throws ExecException {
+        if(mode == ExecType.MAPREDUCE)
+            return accumulateMRStats();
+        else if(mode == ExecType.LOCAL)
+            return accumulateLocalStats();
+        else
+            throw new RuntimeException("Unrecognized mode. Either MapReduce or Local mode expected.");
+    }
+    
+    private Map<String, Map<String, String>> accumulateLocalStats() {
+        //The counter placed before a store in the local plan should be able to get the number of records
+        for(PhysicalOperator op : php.getLeaves()) {
+            Map<String, String> jobStats = new HashMap<String, String>();
+            stats.put(op.toString(), jobStats);
+            POCounter counter = (POCounter) php.getPredecessors(op).get(0);
+            jobStats.put("PIG_STATS_LOCAL_OUTPUT_RECORDS", (Long.valueOf(counter.getCount())).toString());
+            jobStats.put("PIG_STATS_LOCAL_BYTES_WRITTEN", (Long.valueOf((new File(((POStore)op).getSFile().getFileName())).length())).toString());
+        }
+        return stats;
+    }
+    
+    private Map<String, Map<String, String>> accumulateMRStats() throws ExecException {
+        
+        Job lastJob = getLastJob(jc.getSuccessfulJobs());
+        
+        for(Job job : jc.getSuccessfulJobs()) {
+            
+            
+            JobConf jobConf = job.getJobConf();
+            
+            
+                RunningJob rj = null;
+                try {
+                    rj = jobClient.getJob(job.getAssignedJobID());
+                } catch (IOException e1) {
+                    String error = "Unable to get the job statistics from JobClient.";
+                    throw new ExecException(error, e1);
+                }
+                if(rj == null)
+                    continue;
+                
+                Map<String, String> jobStats = new HashMap<String, String>();
+                stats.put(job.getAssignedJobID().toString(), jobStats);
+                
+                try {
+                    PhysicalPlan plan = (PhysicalPlan) ObjectSerializer.deserialize(jobConf.get("pig.mapPlan"));
+                    jobStats.put("PIG_STATS_MAP_PLAN", plan.toString());
+                    plan = (PhysicalPlan) ObjectSerializer.deserialize(jobConf.get("pig.combinePlan"));
+                    if(plan != null) {
+                        jobStats.put("PIG_STATS_COMBINE_PLAN", plan.toString());
+                    }
+                    plan = (PhysicalPlan) ObjectSerializer.deserialize(jobConf.get("pig.reducePlan"));
+                    if(plan != null) {
+                        jobStats.put("PIG_STATS_REDUCE_PLAN", plan.toString());
+                    }
+                } catch (IOException e2) {
+                    String error = "Error deserializing plans from the JobConf.";
+                    throw new RuntimeException(error, e2);
+                }
+                
+                Counters counters = null;
+                try {
+                    counters = rj.getCounters();
+                    Counters.Group taskgroup = counters.getGroup("org.apache.hadoop.mapred.Task$Counter");
+                    Counters.Group hdfsgroup = counters.getGroup("org.apache.hadoop.mapred.Task$FileSystemCounter");
+
+                    System.out.println("BYTES WRITTEN : " + hdfsgroup.getCounterForName("HDFS_WRITE").getCounter());
+                    
+                    jobStats.put("PIG_STATS_MAP_INPUT_RECORDS", (Long.valueOf(taskgroup.getCounterForName("MAP_INPUT_RECORDS").getCounter())).toString());
+                    jobStats.put("PIG_STATS_MAP_OUTPUT_RECORDS", (Long.valueOf(taskgroup.getCounterForName("MAP_OUTPUT_RECORDS").getCounter())).toString());
+                    jobStats.put("PIG_STATS_REDUCE_INPUT_RECORDS", (Long.valueOf(taskgroup.getCounterForName("REDUCE_INPUT_RECORDS").getCounter())).toString());
+                    jobStats.put("PIG_STATS_REDUCE_OUTPUT_RECORDS", (Long.valueOf(taskgroup.getCounterForName("REDUCE_OUTPUT_RECORDS").getCounter())).toString());
+                    jobStats.put("PIG_STATS_BYTES_WRITTEN", (Long.valueOf(hdfsgroup.getCounterForName("HDFS_WRITE").getCounter())).toString());
+                } catch (IOException e) {
+                    // TODO Auto-generated catch block
+                    String error = "Unable to get the counters.";
+                    throw new ExecException(error, e);
+                }
+                
+            
+            
+        }
+        
+        lastJobID = lastJob.getAssignedJobID().toString();
+        return stats;
+    }
+    
+
+    private Job getLastJob(List<Job> jobs) {
+         Set<Job> temp = new HashSet<Job>();
+         for(Job job : jobs) {
+             if(job.getDependingJobs() != null && job.getDependingJobs().size() > 0)
+                 temp.addAll(job.getDependingJobs());
+         }
+         
+         //difference between temp and jobs would be the set of leaves
+         //we can safely assume there would be only one leaf
+         for(Job job : jobs) {
+             if(temp.contains(job))
+                 continue;
+             else
+                 //this means a leaf
+                 return job;
+         }
+         return null;
+    }
+    
+    public String getLastJobID() {
+        return lastJobID;
+    }
+    
+    public Map<String, Map<String, String>> getPigStats() {
+        return stats;
+    }
+    
+    public long getRecordsWritten() {
+        if(mode == ExecType.LOCAL)
+            return getRecordsCountLocal();
+        else if(mode == ExecType.MAPREDUCE)
+            return getRecordsCountMR();
+        else
+            throw new RuntimeException("Unrecognized mode. Either MapReduce or Local mode expected.");
+    }
+    
+    private long getRecordsCountLocal() {
+        //System.out.println(getPhysicalPlan());
+        //because of the nature of the parser, there will always be only one store
+
+        for(PhysicalOperator op : php.getLeaves()) {
+            return Long.parseLong(stats.get(op.toString()).get("PIG_STATS_LOCAL_OUTPUT_RECORDS"));
+        }
+        return 0;
+    }
+    
+    /**
+     * Returns the no. of records written by the pig script in MR mode
+     * @return
+     */
+    private long getRecordsCountMR() {
+        String reducePlan = stats.get(lastJobID).get("PIG_STATS_REDUCE_PLAN");
+        String records = null;
+        if(reducePlan == null) {
+            records = stats.get(lastJobID).get("PIG_STATS_MAP_OUTPUT_RECORDS");
+        } else {
+            records = stats.get(lastJobID).get("PIG_STATS_REDUCE_OUTPUT_RECORDS");
+        }
+        return Long.parseLong(records);
+    }
+    
+    public long getBytesWritten() {
+    	if(mode == ExecType.LOCAL) {
+    		return getLocalBytesWritten();
+    	} else if(mode == ExecType.MAPREDUCE) {
+    		return getMapReduceBytesWritten();
+    	} else {
+    		throw new RuntimeException("Unrecognized mode. Either MapReduce or Local mode expected.");
+    	}
+    	
+    }
+    
+    private long getLocalBytesWritten() {
+    	for(PhysicalOperator op : php.getLeaves())
+    		return Long.parseLong(stats.get(op.toString()).get("PIG_STATS_LOCAL_BYTES_WRITTEN"));
+    	return 0;
+    }
+    
+    private long getMapReduceBytesWritten() {
+        return Long.parseLong(stats.get(lastJobID).get("PIG_STATS_BYTES_WRITTEN"));
+    }
+    
+}

Added: hadoop/pig/trunk/test/org/apache/pig/test/TestCounters.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/test/org/apache/pig/test/TestCounters.java?rev=773683&view=auto
==============================================================================
--- hadoop/pig/trunk/test/org/apache/pig/test/TestCounters.java (added)
+++ hadoop/pig/trunk/test/org/apache/pig/test/TestCounters.java Mon May 11 20:46:56 2009
@@ -0,0 +1,562 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.pig.test;
+
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.PrintWriter;
+import java.util.Map;
+import java.util.Random;
+
+import junit.framework.TestCase;
+
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.Path;
+import org.apache.pig.ExecType;
+import org.apache.pig.PigServer;
+import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.impl.io.FileLocalizer;
+import org.apache.pig.tools.pigstats.PigStats;
+import org.junit.Before;
+import org.junit.Test;
+
+public class TestCounters extends TestCase {
+    String file = "input.txt";
+
+    MiniCluster cluster = MiniCluster.buildCluster();
+
+    final int MAX = 100*1000;
+    Random r = new Random();
+
+    @Test
+    public void testMapOnly() throws IOException, ExecException {
+        int count = 0;
+        PrintWriter pw = new PrintWriter(Util.createInputFile(cluster, file));
+        for(int i = 0; i < MAX; i++) {
+            int t = r.nextInt(100);
+            pw.println(t);
+            if(t > 50)
+                count ++;
+        }
+        pw.close();
+        PigServer pigServer = new PigServer(ExecType.MAPREDUCE, cluster.getProperties());
+        pigServer.registerQuery("a = load '" + file + "';");
+        pigServer.registerQuery("b = filter a by $0 > 50;");
+        pigServer.registerQuery("c = foreach b generate $0 - 50;");
+        PigStats pigStats = pigServer.store("c", "output_map_only").getStatistics();
+
+        //PigStats pigStats = pigServer.getPigStats();
+        Map<String, Map<String, String>> stats = pigStats.getPigStats();
+        
+        //counting the no. of bytes in the output file
+        //long filesize = cluster.getFileSystem().getFileStatus(new Path("output_map_only")).getLen();
+        InputStream is = FileLocalizer.open(FileLocalizer.fullPath("output_map_only", pigServer.getPigContext()), ExecType.MAPREDUCE, pigServer.getPigContext().getDfs());
+        long filesize = 0;
+        while(is.read() != -1) filesize++;
+        
+        is.close();
+        
+        cluster.getFileSystem().delete(new Path(file), true);
+        cluster.getFileSystem().delete(new Path("output_map_only"), true);
+
+        System.out.println("============================================");
+        System.out.println("Test case Map Only");
+        System.out.println("============================================");
+        System.out.println("MRPlan : \n" + pigStats.getMRPlan());
+        for(Map.Entry<String, Map<String, String>> entry : stats.entrySet()) {
+            System.out.println("============================================");
+            System.out.println("Job : " + entry.getKey());
+            for(Map.Entry<String, String> e1 : entry.getValue().entrySet()) {
+                System.out.println(" - " + e1.getKey() + " : \n" + e1.getValue());
+            }
+            System.out.println("============================================");
+        }
+
+        Map.Entry<String, Map<String, String>> e = stats.entrySet().iterator().next();
+
+        //System.out.println("Job Name : " + e.getKey());
+
+        Map<String, String> jobStats = e.getValue();
+
+        System.out.println("Map input records : " + jobStats.get("PIG_STATS_MAP_INPUT_RECORDS"));
+        assertEquals(MAX, Integer.parseInt(jobStats.get("PIG_STATS_MAP_INPUT_RECORDS")));
+        System.out.println("Map output records : " + jobStats.get("PIG_STATS_MAP_OUTPUT_RECORDS"));
+        assertEquals(count, Integer.parseInt(jobStats.get("PIG_STATS_MAP_OUTPUT_RECORDS")));
+        assertNull(jobStats.get("PIG_STATS_REDUCE_PLAN"));
+        assertNull(jobStats.get("PIG_STATS_COMBINE_PLAN"));
+        assertNotNull(jobStats.get("PIG_STATS_MAP_PLAN"));
+        assertEquals(0, Integer.parseInt(jobStats.get("PIG_STATS_REDUCE_INPUT_RECORDS")));
+        assertEquals(0, Integer.parseInt(jobStats.get("PIG_STATS_REDUCE_OUTPUT_RECORDS")));
+
+        assertEquals(count, pigStats.getRecordsWritten());
+        assertEquals(filesize, pigStats.getBytesWritten());
+
+    }
+
+    @Test
+    public void testMapOnlyBinStorage() throws IOException, ExecException {
+        int count = 0;
+        PrintWriter pw = new PrintWriter(Util.createInputFile(cluster, file));
+        for(int i = 0; i < MAX; i++) {
+            int t = r.nextInt(100);
+            pw.println(t);
+            if(t > 50)
+                count ++;
+        }
+        pw.close();
+        PigServer pigServer = new PigServer(ExecType.MAPREDUCE, cluster.getProperties());
+        pigServer.registerQuery("a = load '" + file + "';");
+        pigServer.registerQuery("b = filter a by $0 > 50;");
+        pigServer.registerQuery("c = foreach b generate $0 - 50;");
+        //pigServer.store("c", "output_map_only");
+        PigStats pigStats = pigServer.store("c", "output_map_only", "BinStorage").getStatistics();
+        
+        InputStream is = FileLocalizer.open(FileLocalizer.fullPath("output_map_only", pigServer.getPigContext()), ExecType.MAPREDUCE, pigServer.getPigContext().getDfs());
+        long filesize = 0;
+        while(is.read() != -1) filesize++;
+        
+        is.close();
+
+        Map<String, Map<String, String>> stats = pigStats.getPigStats();
+        cluster.getFileSystem().delete(new Path(file), true);
+        cluster.getFileSystem().delete(new Path("output_map_only"), true);
+
+        System.out.println("============================================");
+        System.out.println("Test case Map Only");
+        System.out.println("============================================");
+        System.out.println("MRPlan : \n" + pigStats.getMRPlan());
+        for(Map.Entry<String, Map<String, String>> entry : stats.entrySet()) {
+            System.out.println("============================================");
+            System.out.println("Job : " + entry.getKey());
+            for(Map.Entry<String, String> e1 : entry.getValue().entrySet()) {
+                System.out.println(" - " + e1.getKey() + " : \n" + e1.getValue());
+            }
+            System.out.println("============================================");
+        }
+
+        Map.Entry<String, Map<String, String>> e = stats.entrySet().iterator().next();
+
+        //System.out.println("Job Name : " + e.getKey());
+
+        Map<String, String> jobStats = e.getValue();
+
+        System.out.println("Map input records : " + jobStats.get("PIG_STATS_MAP_INPUT_RECORDS"));
+        assertEquals(MAX, Integer.parseInt(jobStats.get("PIG_STATS_MAP_INPUT_RECORDS")));
+        System.out.println("Map output records : " + jobStats.get("PIG_STATS_MAP_OUTPUT_RECORDS"));
+        assertEquals(count, Integer.parseInt(jobStats.get("PIG_STATS_MAP_OUTPUT_RECORDS")));
+        assertNull(jobStats.get("PIG_STATS_REDUCE_PLAN"));
+        assertNull(jobStats.get("PIG_STATS_COMBINE_PLAN"));
+        assertNotNull(jobStats.get("PIG_STATS_MAP_PLAN"));
+        assertEquals(0, Integer.parseInt(jobStats.get("PIG_STATS_REDUCE_INPUT_RECORDS")));
+        assertEquals(0, Integer.parseInt(jobStats.get("PIG_STATS_REDUCE_OUTPUT_RECORDS")));
+
+        assertEquals(count, pigStats.getRecordsWritten());
+        assertEquals(filesize, pigStats.getBytesWritten());
+    }
+
+    @Test
+    public void testMapReduceOnly() throws IOException, ExecException {
+        int count = 0;
+        PrintWriter pw = new PrintWriter(Util.createInputFile(cluster, file));
+        int [] nos = new int[10];
+        for(int i = 0; i < 10; i++)
+            nos[i] = 0;
+
+        for(int i = 0; i < MAX; i++) {
+            int index = r.nextInt(10);
+            int value = r.nextInt(100);
+            nos[index] += value;
+            pw.println(index + "\t" + value);
+        }
+        pw.close();
+
+        for(int i = 0; i < 10; i++) 
+            if(nos[i] > 0)
+                count ++;
+
+        PigServer pigServer = new PigServer(ExecType.MAPREDUCE, cluster.getProperties());
+        pigServer.registerQuery("a = load '" + file + "';");
+        pigServer.registerQuery("b = group a by $0;");
+        pigServer.registerQuery("c = foreach b generate group;");
+        PigStats pigStats = pigServer.store("c", "output").getStatistics();
+        InputStream is = FileLocalizer.open(FileLocalizer.fullPath("output", pigServer.getPigContext()), ExecType.MAPREDUCE, pigServer.getPigContext().getDfs());
+        long filesize = 0;
+        while(is.read() != -1) filesize++;
+        
+        is.close();
+
+        Map<String, Map<String, String>> stats = pigStats.getPigStats();
+        cluster.getFileSystem().delete(new Path(file), true);
+        cluster.getFileSystem().delete(new Path("output"), true);
+
+        System.out.println("============================================");
+        System.out.println("Test case MapReduce");
+        System.out.println("============================================");
+        System.out.println("MRPlan : \n" + pigStats.getMRPlan());
+        for(Map.Entry<String, Map<String, String>> entry : stats.entrySet()) {
+            System.out.println("============================================");
+            System.out.println("Job : " + entry.getKey());
+            for(Map.Entry<String, String> e1 : entry.getValue().entrySet()) {
+                System.out.println(" - " + e1.getKey() + " : \n" + e1.getValue());
+            }
+            System.out.println("============================================");
+        }
+
+        Map.Entry<String, Map<String, String>> e = stats.entrySet().iterator().next();
+
+        Map<String, String> jobStats = e.getValue();
+
+        System.out.println("Map input records : " + jobStats.get("PIG_STATS_MAP_INPUT_RECORDS"));
+        assertEquals(MAX, Integer.parseInt(jobStats.get("PIG_STATS_MAP_INPUT_RECORDS")));
+        System.out.println("Map output records : " + jobStats.get("PIG_STATS_MAP_OUTPUT_RECORDS"));
+        assertEquals(MAX, Integer.parseInt(jobStats.get("PIG_STATS_MAP_OUTPUT_RECORDS")));
+        System.out.println("Reduce input records : " + jobStats.get("PIG_STATS_REDUCE_INPUT_RECORDS"));
+        assertEquals(MAX, Integer.parseInt(jobStats.get("PIG_STATS_REDUCE_INPUT_RECORDS")));
+        System.out.println("Reduce output records : " + jobStats.get("PIG_STATS_REDUCE_OUTPUT_RECORDS"));
+        assertEquals(count, Integer.parseInt(jobStats.get("PIG_STATS_REDUCE_OUTPUT_RECORDS")));
+
+        assertNull(jobStats.get("PIG_STATS_COMBINE_PLAN"));
+        assertNotNull(jobStats.get("PIG_STATS_MAP_PLAN"));
+        assertNotNull(jobStats.get("PIG_STATS_REDUCE_PLAN"));
+
+        assertEquals(count, pigStats.getRecordsWritten());
+        assertEquals(filesize, pigStats.getBytesWritten());
+    }
+
+    @Test
+    public void testMapReduceOnlyBinStorage() throws IOException, ExecException {
+        int count = 0;
+        PrintWriter pw = new PrintWriter(Util.createInputFile(cluster, file));
+        int [] nos = new int[10];
+        for(int i = 0; i < 10; i++)
+            nos[i] = 0;
+
+        for(int i = 0; i < MAX; i++) {
+            int index = r.nextInt(10);
+            int value = r.nextInt(100);
+            nos[index] += value;
+            pw.println(index + "\t" + value);
+        }
+        pw.close();
+
+        for(int i = 0; i < 10; i++) 
+            if(nos[i] > 0)
+                count ++;
+
+        PigServer pigServer = new PigServer(ExecType.MAPREDUCE, cluster.getProperties());
+        pigServer.registerQuery("a = load '" + file + "';");
+        pigServer.registerQuery("b = group a by $0;");
+        pigServer.registerQuery("c = foreach b generate group;");
+        PigStats pigStats = pigServer.store("c", "output", "BinStorage").getStatistics();
+
+        InputStream is = FileLocalizer.open(FileLocalizer.fullPath("output", pigServer.getPigContext()), ExecType.MAPREDUCE, pigServer.getPigContext().getDfs());
+        long filesize = 0;
+        while(is.read() != -1) filesize++;
+        
+        is.close();
+        
+        Map<String, Map<String, String>> stats = pigStats.getPigStats();
+        cluster.getFileSystem().delete(new Path(file), true);
+        cluster.getFileSystem().delete(new Path("output"), true);
+
+        System.out.println("============================================");
+        System.out.println("Test case MapReduce");
+        System.out.println("============================================");
+        System.out.println("MRPlan : \n" + pigStats.getMRPlan());
+        for(Map.Entry<String, Map<String, String>> entry : stats.entrySet()) {
+            System.out.println("============================================");
+            System.out.println("Job : " + entry.getKey());
+            for(Map.Entry<String, String> e1 : entry.getValue().entrySet()) {
+                System.out.println(" - " + e1.getKey() + " : \n" + e1.getValue());
+            }
+            System.out.println("============================================");
+        }
+
+        Map.Entry<String, Map<String, String>> e = stats.entrySet().iterator().next();
+
+        Map<String, String> jobStats = e.getValue();
+
+        System.out.println("Map input records : " + jobStats.get("PIG_STATS_MAP_INPUT_RECORDS"));
+        assertEquals(MAX, Integer.parseInt(jobStats.get("PIG_STATS_MAP_INPUT_RECORDS")));
+        System.out.println("Map output records : " + jobStats.get("PIG_STATS_MAP_OUTPUT_RECORDS"));
+        assertEquals(MAX, Integer.parseInt(jobStats.get("PIG_STATS_MAP_OUTPUT_RECORDS")));
+        System.out.println("Reduce input records : " + jobStats.get("PIG_STATS_REDUCE_INPUT_RECORDS"));
+        assertEquals(MAX, Integer.parseInt(jobStats.get("PIG_STATS_REDUCE_INPUT_RECORDS")));
+        System.out.println("Reduce output records : " + jobStats.get("PIG_STATS_REDUCE_OUTPUT_RECORDS"));
+        assertEquals(count, Integer.parseInt(jobStats.get("PIG_STATS_REDUCE_OUTPUT_RECORDS")));
+
+        assertNull(jobStats.get("PIG_STATS_COMBINE_PLAN"));
+        assertNotNull(jobStats.get("PIG_STATS_MAP_PLAN"));
+        assertNotNull(jobStats.get("PIG_STATS_REDUCE_PLAN"));
+        
+        assertEquals(count, pigStats.getRecordsWritten());
+        assertEquals(filesize, pigStats.getBytesWritten());
+
+    }
+
+    @Test
+    public void testMapCombineReduce() throws IOException, ExecException {
+        int count = 0;
+        PrintWriter pw = new PrintWriter(Util.createInputFile(cluster, file));
+        int [] nos = new int[10];
+        for(int i = 0; i < 10; i++)
+            nos[i] = 0;
+
+        for(int i = 0; i < MAX; i++) {
+            int index = r.nextInt(10);
+            int value = r.nextInt(100);
+            nos[index] += value;
+            pw.println(index + "\t" + value);
+        }
+        pw.close();
+
+        for(int i = 0; i < 10; i++) 
+            if(nos[i] > 0)
+                count ++;
+
+        PigServer pigServer = new PigServer(ExecType.MAPREDUCE, cluster.getProperties());
+        pigServer.registerQuery("a = load '" + file + "';");
+        pigServer.registerQuery("b = group a by $0;");
+        pigServer.registerQuery("c = foreach b generate group, SUM(a.$1);");
+        PigStats pigStats = pigServer.store("c", "output").getStatistics();
+
+        InputStream is = FileLocalizer.open(FileLocalizer.fullPath("output", pigServer.getPigContext()), ExecType.MAPREDUCE, pigServer.getPigContext().getDfs());
+        long filesize = 0;
+        while(is.read() != -1) filesize++;
+        
+        is.close();
+        Map<String, Map<String, String>> stats = pigStats.getPigStats();
+        cluster.getFileSystem().delete(new Path(file), true);
+        cluster.getFileSystem().delete(new Path("output"), true);
+
+        System.out.println("============================================");
+        System.out.println("Test case MapCombineReduce");
+        System.out.println("============================================");
+        System.out.println("MRPlan : \n" + pigStats.getMRPlan());
+        for(Map.Entry<String, Map<String, String>> entry : stats.entrySet()) {
+            System.out.println("============================================");
+            System.out.println("Job : " + entry.getKey());
+            for(Map.Entry<String, String> e1 : entry.getValue().entrySet()) {
+                System.out.println(" - " + e1.getKey() + " : \n" + e1.getValue());
+            }
+            System.out.println("============================================");
+        }
+
+        Map.Entry<String, Map<String, String>> e = stats.entrySet().iterator().next();
+
+        Map<String, String> jobStats = e.getValue();
+
+        System.out.println("Map input records : " + jobStats.get("PIG_STATS_MAP_INPUT_RECORDS"));
+        assertEquals(MAX, Integer.parseInt(jobStats.get("PIG_STATS_MAP_INPUT_RECORDS")));
+        System.out.println("Map output records : " + jobStats.get("PIG_STATS_MAP_OUTPUT_RECORDS"));
+        assertEquals(MAX, Integer.parseInt(jobStats.get("PIG_STATS_MAP_OUTPUT_RECORDS")));
+        System.out.println("Reduce input records : " + jobStats.get("PIG_STATS_REDUCE_INPUT_RECORDS"));
+        assertEquals(count, Integer.parseInt(jobStats.get("PIG_STATS_REDUCE_INPUT_RECORDS")));
+        System.out.println("Reduce output records : " + jobStats.get("PIG_STATS_REDUCE_OUTPUT_RECORDS"));
+        assertEquals(count, Integer.parseInt(jobStats.get("PIG_STATS_REDUCE_OUTPUT_RECORDS")));
+
+        assertNotNull(jobStats.get("PIG_STATS_COMBINE_PLAN"));
+        assertNotNull(jobStats.get("PIG_STATS_MAP_PLAN"));
+        assertNotNull(jobStats.get("PIG_STATS_REDUCE_PLAN"));
+
+        assertEquals(count, pigStats.getRecordsWritten());
+        assertEquals(filesize, pigStats.getBytesWritten());
+    }
+
+    @Test
+    public void testMapCombineReduceBinStorage() throws IOException, ExecException {
+        int count = 0;
+        PrintWriter pw = new PrintWriter(Util.createInputFile(cluster, file));
+        int [] nos = new int[10];
+        for(int i = 0; i < 10; i++)
+            nos[i] = 0;
+
+        for(int i = 0; i < MAX; i++) {
+            int index = r.nextInt(10);
+            int value = r.nextInt(100);
+            nos[index] += value;
+            pw.println(index + "\t" + value);
+        }
+        pw.close();
+
+        for(int i = 0; i < 10; i++) 
+            if(nos[i] > 0)
+                count ++;
+
+        PigServer pigServer = new PigServer(ExecType.MAPREDUCE, cluster.getProperties());
+        pigServer.registerQuery("a = load '" + file + "';");
+        pigServer.registerQuery("b = group a by $0;");
+        pigServer.registerQuery("c = foreach b generate group, SUM(a.$1);");
+        PigStats pigStats = pigServer.store("c", "output", "BinStorage").getStatistics();
+
+        InputStream is = FileLocalizer.open(FileLocalizer.fullPath("output", pigServer.getPigContext()), ExecType.MAPREDUCE, pigServer.getPigContext().getDfs());
+        long filesize = 0;
+        while(is.read() != -1) filesize++;
+        
+        is.close();
+        Map<String, Map<String, String>> stats = pigStats.getPigStats();
+        cluster.getFileSystem().delete(new Path(file), true);
+        cluster.getFileSystem().delete(new Path("output"), true);
+
+        System.out.println("============================================");
+        System.out.println("Test case MapCombineReduce");
+        System.out.println("============================================");
+        System.out.println("MRPlan : \n" + pigStats.getMRPlan());
+        for(Map.Entry<String, Map<String, String>> entry : stats.entrySet()) {
+            System.out.println("============================================");
+            System.out.println("Job : " + entry.getKey());
+            for(Map.Entry<String, String> e1 : entry.getValue().entrySet()) {
+                System.out.println(" - " + e1.getKey() + " : \n" + e1.getValue());
+            }
+            System.out.println("============================================");
+        }
+
+        Map.Entry<String, Map<String, String>> e = stats.entrySet().iterator().next();
+
+        Map<String, String> jobStats = e.getValue();
+
+        System.out.println("Map input records : " + jobStats.get("PIG_STATS_MAP_INPUT_RECORDS"));
+        assertEquals(MAX, Integer.parseInt(jobStats.get("PIG_STATS_MAP_INPUT_RECORDS")));
+        System.out.println("Map output records : " + jobStats.get("PIG_STATS_MAP_OUTPUT_RECORDS"));
+        assertEquals(MAX, Integer.parseInt(jobStats.get("PIG_STATS_MAP_OUTPUT_RECORDS")));
+        System.out.println("Reduce input records : " + jobStats.get("PIG_STATS_REDUCE_INPUT_RECORDS"));
+        assertEquals(count, Integer.parseInt(jobStats.get("PIG_STATS_REDUCE_INPUT_RECORDS")));
+        System.out.println("Reduce output records : " + jobStats.get("PIG_STATS_REDUCE_OUTPUT_RECORDS"));
+        assertEquals(count, Integer.parseInt(jobStats.get("PIG_STATS_REDUCE_OUTPUT_RECORDS")));
+
+        assertNotNull(jobStats.get("PIG_STATS_COMBINE_PLAN"));
+        assertNotNull(jobStats.get("PIG_STATS_MAP_PLAN"));
+        assertNotNull(jobStats.get("PIG_STATS_REDUCE_PLAN"));
+
+        assertEquals(count, pigStats.getRecordsWritten());
+        assertEquals(filesize, pigStats.getBytesWritten());
+    }
+
+    @Test
+    public void testMultipleMRJobs() throws IOException, ExecException {
+        int count = 0;
+        PrintWriter pw = new PrintWriter(Util.createInputFile(cluster, file));
+        int [] nos = new int[10];
+        for(int i = 0; i < 10; i++)
+            nos[i] = 0;
+
+        for(int i = 0; i < MAX; i++) {
+            int index = r.nextInt(10);
+            int value = r.nextInt(100);
+            nos[index] += value;
+            pw.println(index + "\t" + value);
+        }
+        pw.close();
+
+        for(int i = 0; i < 10; i++) 
+            if(nos[i] > 0)
+                count ++;
+
+        PigServer pigServer = new PigServer(ExecType.MAPREDUCE, cluster.getProperties());
+        pigServer.registerQuery("a = load '" + file + "';");
+        pigServer.registerQuery("b = order a by $0;");
+        pigServer.registerQuery("c = group b by $0;");
+        pigServer.registerQuery("d = foreach c generate group, SUM(b.$1);");
+        PigStats pigStats = pigServer.store("d", "output").getStatistics();
+        
+        InputStream is = FileLocalizer.open(FileLocalizer.fullPath("output", pigServer.getPigContext()), ExecType.MAPREDUCE, pigServer.getPigContext().getDfs());
+        long filesize = 0;
+        while(is.read() != -1) filesize++;
+        
+        is.close();
+        Map<String, Map<String, String>> stats = pigStats.getPigStats();
+        cluster.getFileSystem().delete(new Path(file), true);
+        cluster.getFileSystem().delete(new Path("output"), true);
+        
+        System.out.println("============================================");
+        System.out.println("Test case MultipleMRJobs");
+        System.out.println("============================================");
+        System.out.println("MRPlan : \n" + pigStats.getMRPlan());
+        for(Map.Entry<String, Map<String, String>> entry : stats.entrySet()) {
+            System.out.println("============================================");
+            System.out.println("Job : " + entry.getKey());
+            for(Map.Entry<String, String> e1 : entry.getValue().entrySet()) {
+                System.out.println(" - " + e1.getKey() + " : \n" + e1.getValue());
+            }
+            System.out.println("============================================");
+        }
+
+        Map<String, String> jobStats = stats.get(pigStats.getLastJobID());
+
+        System.out.println("Map input records : " + jobStats.get("PIG_STATS_MAP_INPUT_RECORDS"));
+        assertEquals(MAX, Integer.parseInt(jobStats.get("PIG_STATS_MAP_INPUT_RECORDS")));
+        System.out.println("Map output records : " + jobStats.get("PIG_STATS_MAP_OUTPUT_RECORDS"));
+        assertEquals(MAX, Integer.parseInt(jobStats.get("PIG_STATS_MAP_OUTPUT_RECORDS")));
+        System.out.println("Reduce input records : " + jobStats.get("PIG_STATS_REDUCE_INPUT_RECORDS"));
+        assertEquals(count, Integer.parseInt(jobStats.get("PIG_STATS_REDUCE_INPUT_RECORDS")));
+        System.out.println("Reduce output records : " + jobStats.get("PIG_STATS_REDUCE_OUTPUT_RECORDS"));
+        assertEquals(count, Integer.parseInt(jobStats.get("PIG_STATS_REDUCE_OUTPUT_RECORDS")));
+        
+        assertEquals(count, pigStats.getRecordsWritten());
+        assertEquals(filesize, pigStats.getBytesWritten());
+
+    }
+    
+    @Test
+    public void testLocal() throws IOException, ExecException {
+        int count = 0;
+        //PrintWriter pw = new PrintWriter(Util.createInputFile(cluster, file));
+        File file = File.createTempFile("data", ".txt");
+        PrintWriter pw = new PrintWriter(new FileOutputStream(file));
+        int [] nos = new int[10];
+        for(int i = 0; i < 10; i++)
+            nos[i] = 0;
+
+        for(int i = 0; i < MAX; i++) {
+            int index = r.nextInt(10);
+            int value = r.nextInt(100);
+            nos[index] += value;
+            pw.println(index + "\t" + value);
+        }
+        pw.close();
+
+        for(int i = 0; i < 10; i++) 
+            if(nos[i] > 0)
+                count ++;
+
+        File out = File.createTempFile("output", ".txt");
+        out.delete();
+        PigServer pigServer = new PigServer("local");
+        pigServer.registerQuery("a = load '" + file + "';");
+        pigServer.registerQuery("b = order a by $0;");
+        pigServer.registerQuery("c = group b by $0;");
+        pigServer.registerQuery("d = foreach c generate group, SUM(b.$1);");
+        PigStats pigStats = pigServer.store("d", out.getAbsolutePath()).getStatistics();
+        InputStream is = FileLocalizer.open(FileLocalizer.fullPath(out.getAbsolutePath(), pigServer.getPigContext()), ExecType.MAPREDUCE, pigServer.getPigContext().getDfs());
+        long filesize = 0;
+        while(is.read() != -1) filesize++;
+        
+        is.close();
+        out.delete();
+        
+        //Map<String, Map<String, String>> stats = pigStats.getPigStats();
+        
+        assertEquals(count, pigStats.getRecordsWritten());
+        assertEquals(filesize, pigStats.getBytesWritten());
+
+    }
+
+}

Modified: hadoop/pig/trunk/test/org/apache/pig/test/TestMultiQueryLocal.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/test/org/apache/pig/test/TestMultiQueryLocal.java?rev=773683&r1=773682&r2=773683&view=diff
==============================================================================
--- hadoop/pig/trunk/test/org/apache/pig/test/TestMultiQueryLocal.java (original)
+++ hadoop/pig/trunk/test/org/apache/pig/test/TestMultiQueryLocal.java Mon May 11 20:46:56 2009
@@ -85,7 +85,7 @@
             LogicalPlan lp = checkLogicalPlan(1, 2, 9);
 
             // XXX Physical plan has one less node in the local case
-            PhysicalPlan pp = checkPhysicalPlan(lp, 1, 2, 10);
+            PhysicalPlan pp = checkPhysicalPlan(lp, 1, 2, 12);
 
             Assert.assertTrue(executePlan(pp));
 
@@ -142,7 +142,7 @@
     public void testMultiQueryWithTwoStores2Execs() {
 
         System.out.println("===== test multi-query with 2 stores (2) =====");
-
+        
         try {
             myPig.setBatchOn();
 
@@ -185,7 +185,7 @@
 
             LogicalPlan lp = checkLogicalPlan(1, 3, 14);
 
-            PhysicalPlan pp = checkPhysicalPlan(lp, 1, 3, 14);
+            PhysicalPlan pp = checkPhysicalPlan(lp, 1, 3, 17);
 
             Assert.assertTrue(executePlan(pp));
 
@@ -247,7 +247,7 @@
             LogicalPlan lp = checkLogicalPlan(2, 3, 16);
 
             // XXX the total number of ops is one less in the local case
-            PhysicalPlan pp = checkPhysicalPlan(lp, 2, 3, 18);
+            PhysicalPlan pp = checkPhysicalPlan(lp, 2, 3, 21);
 
             Assert.assertTrue(executePlan(pp));
 
@@ -458,7 +458,7 @@
             myPig.registerQuery("store c into '/tmp/output5';");
 
             LogicalPlan lp = checkLogicalPlan(1, 3, 14);
-            PhysicalPlan pp = checkPhysicalPlan(lp, 1, 3, 16);
+            PhysicalPlan pp = checkPhysicalPlan(lp, 1, 3, 21);
 
             myPig.executeBatch();
             myPig.discardBatch(); 

Modified: hadoop/pig/trunk/test/org/apache/pig/test/TestStore.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/test/org/apache/pig/test/TestStore.java?rev=773683&r1=773682&r2=773683&view=diff
==============================================================================
--- hadoop/pig/trunk/test/org/apache/pig/test/TestStore.java (original)
+++ hadoop/pig/trunk/test/org/apache/pig/test/TestStore.java Mon May 11 20:46:56 2009
@@ -49,6 +49,7 @@
 import org.apache.pig.impl.io.FileSpec;
 import org.apache.pig.backend.local.executionengine.LocalPigLauncher;
 import org.apache.pig.backend.local.executionengine.LocalPOStoreImpl;
+import org.apache.pig.backend.local.executionengine.physicalLayer.counters.POCounter;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.POStatus;
@@ -59,6 +60,7 @@
 import org.apache.pig.test.utils.GenPhyOp;
 import org.apache.pig.test.utils.GenRandomData;
 import org.apache.pig.test.utils.TestHelper;
+import org.apache.pig.tools.pigstats.PigStats;
 import org.apache.pig.impl.logicalLayer.LOStore;
 import org.apache.pig.impl.logicalLayer.LogicalOperator;
 import org.apache.pig.impl.logicalLayer.LogicalPlan;
@@ -79,10 +81,12 @@
     PigContext pc;
     POProject proj;
     PigServer pig;
+    POCounter pcount;
     
     @Before
     public void setUp() throws Exception {
         st = GenPhyOp.topStoreOp();
+        pcount = new POCounter(new OperatorKey("", (new Random()).nextLong()));
         fSpec = new FileSpec("file:/tmp/storeTest.txt",
                       new FuncSpec(PigStorage.class.getName(), new String[]{":"}));
         st.setSFile(fSpec);
@@ -104,11 +108,15 @@
     public void tearDown() throws Exception {
     }
 
-    private boolean store() throws Exception {
+    private PigStats store() throws Exception {
         PhysicalPlan pp = new PhysicalPlan();
         pp.add(proj);
         pp.add(st);
-        pp.connect(proj, st);
+        pp.add(pcount);
+        //pp.connect(proj, st);
+        pp.connect(proj, pcount);
+        pp.connect(pcount, st);
+        pc.setExecType(ExecType.LOCAL);
         return new LocalPigLauncher().launchPig(pp, "TestStore", pc);
     }
 
@@ -118,7 +126,7 @@
         Tuple t = new DefaultTuple();
         t.append(inpDB);
         proj.attachInput(t);
-        assertTrue(store());
+        assertTrue(store() != null);
         
         int size = 0;
         BufferedReader br = new BufferedReader(new FileReader("/tmp/storeTest.txt"));
@@ -145,7 +153,7 @@
         Tuple t = new DefaultTuple();
         t.append(inpDB);
         proj.attachInput(t);
-        assertTrue(store());
+        assertTrue(store() != null);
         PigStorage ps = new PigStorage(":");
         
         int size = 0;
@@ -178,7 +186,7 @@
         Tuple t = new DefaultTuple();
         t.append(inpDB);
         proj.attachInput(t);
-        assertTrue(store());
+        assertTrue(store() != null);
         PigStorage ps = new PigStorage(":");
         
         int size = 0;

Modified: hadoop/pig/trunk/test/org/apache/pig/test/Util.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/test/org/apache/pig/test/Util.java?rev=773683&r1=773682&r2=773683&view=diff
==============================================================================
--- hadoop/pig/trunk/test/org/apache/pig/test/Util.java (original)
+++ hadoop/pig/trunk/test/org/apache/pig/test/Util.java Mon May 11 20:46:56 2009
@@ -25,6 +25,7 @@
 import java.io.FileOutputStream;
 import java.io.FileReader;
 import java.io.IOException;
+import java.io.OutputStream;
 import java.io.OutputStreamWriter;
 import java.io.PrintWriter;
 import java.util.ArrayList;
@@ -237,6 +238,28 @@
     }
     
     /**
+     * Helper to create a dfs file on the MiniCluster dfs. This returns an
+     * outputstream that can be used in test cases to write data.
+     * 
+     * @param cluster
+     *            reference to the MiniCluster where the file should be created
+     * @param fileName
+     *            pathname of the file to be created
+     * @return OutputStream to write any data to the file created on the
+     *         MiniCluster.
+     * @throws IOException
+     */
+    static public OutputStream createInputFile(MiniCluster cluster,
+            String fileName) throws IOException {
+        FileSystem fs = cluster.getFileSystem();
+        if (fs.exists(new Path(fileName))) {
+            throw new IOException("File " + fileName
+                    + " already exists on the minicluster");
+        }
+        return fs.create(new Path(fileName));
+    }
+    
+    /**
      * Helper to remove a dfs file from the minicluster DFS
      * 
      * @param miniCluster reference to the Minicluster where the file should be deleted