You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by ga...@apache.org on 2009/05/11 22:46:57 UTC
svn commit: r773683 - in /hadoop/pig/trunk: ./
src/org/apache/pig/backend/executionengine/
src/org/apache/pig/backend/hadoop/executionengine/
src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/
src/org/apache/pig/backend/local/executionen...
Author: gates
Date: Mon May 11 20:46:56 2009
New Revision: 773683
URL: http://svn.apache.org/viewvc?rev=773683&view=rev
Log:
PIG-626: Add access to hadoop counters.
Added:
hadoop/pig/trunk/src/org/apache/pig/backend/local/executionengine/physicalLayer/counters/
hadoop/pig/trunk/src/org/apache/pig/backend/local/executionengine/physicalLayer/counters/POCounter.java
hadoop/pig/trunk/src/org/apache/pig/tools/pigstats/
hadoop/pig/trunk/src/org/apache/pig/tools/pigstats/PigStats.java
hadoop/pig/trunk/test/org/apache/pig/test/TestCounters.java
Modified:
hadoop/pig/trunk/CHANGES.txt
hadoop/pig/trunk/src/org/apache/pig/backend/executionengine/ExecJob.java
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/HExecutionEngine.java
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/HJob.java
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/Launcher.java
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceLauncher.java
hadoop/pig/trunk/src/org/apache/pig/backend/local/executionengine/LocalExecutionEngine.java
hadoop/pig/trunk/src/org/apache/pig/backend/local/executionengine/LocalJob.java
hadoop/pig/trunk/src/org/apache/pig/backend/local/executionengine/LocalPigLauncher.java
hadoop/pig/trunk/src/org/apache/pig/backend/local/executionengine/physicalLayer/LocalLogToPhyTranslationVisitor.java
hadoop/pig/trunk/test/org/apache/pig/test/TestMultiQueryLocal.java
hadoop/pig/trunk/test/org/apache/pig/test/TestStore.java
hadoop/pig/trunk/test/org/apache/pig/test/Util.java
Modified: hadoop/pig/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/CHANGES.txt?rev=773683&r1=773682&r2=773683&view=diff
==============================================================================
--- hadoop/pig/trunk/CHANGES.txt (original)
+++ hadoop/pig/trunk/CHANGES.txt Mon May 11 20:46:56 2009
@@ -40,6 +40,8 @@
PIG-701: Implement IVY for resolving pig dependencies (gkesavan)
+PIG-626: Add access to hadoop counters (shubhamc via gates).
+
BUG FIXES
PIG-733: Order by sampling dumps entire sample to hdfs which causes dfs
Modified: hadoop/pig/trunk/src/org/apache/pig/backend/executionengine/ExecJob.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/executionengine/ExecJob.java?rev=773683&r1=773682&r2=773683&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/backend/executionengine/ExecJob.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/backend/executionengine/ExecJob.java Mon May 11 20:46:56 2009
@@ -24,6 +24,7 @@
import java.io.OutputStream;
import org.apache.pig.data.Tuple;
+import org.apache.pig.tools.pigstats.PigStats;
/**
* Abstraction on a job that the execution engine runs. It allows the front-end to
@@ -78,7 +79,7 @@
*
* @return statistics relevant to the execution engine
*/
- public Map<String, Object> getStatistics();
+ public PigStats getStatistics();
/**
* hook for asynchronous notification of job completion pushed from the back-end
Modified: hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/HExecutionEngine.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/HExecutionEngine.java?rev=773683&r1=773682&r2=773683&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/HExecutionEngine.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/HExecutionEngine.java Mon May 11 20:46:56 2009
@@ -72,6 +72,7 @@
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStore;
import org.apache.pig.impl.plan.VisitorException;
import org.apache.pig.shock.SSHSocketImplFactory;
+import org.apache.pig.tools.pigstats.PigStats;
public class HExecutionEngine implements ExecutionEngine {
@@ -258,9 +259,9 @@
FileSpec spec = ExecTools.checkLeafIsStore(plan, pigContext);
MapReduceLauncher launcher = new MapReduceLauncher();
- boolean success = launcher.launchPig(plan, jobName, pigContext);
- if(success)
- return new HJob(ExecJob.JOB_STATUS.COMPLETED, pigContext, spec);
+ PigStats stats = launcher.launchPig(plan, jobName, pigContext);
+ if(stats != null)
+ return new HJob(ExecJob.JOB_STATUS.COMPLETED, pigContext, spec, stats);
else
return new HJob(ExecJob.JOB_STATUS.FAILED, pigContext, null);
Modified: hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/HJob.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/HJob.java?rev=773683&r1=773682&r2=773683&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/HJob.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/HJob.java Mon May 11 20:46:56 2009
@@ -35,6 +35,7 @@
import org.apache.pig.PigException;
import org.apache.pig.impl.io.FileLocalizer;
import org.apache.pig.impl.io.BufferedPositionedInputStream;
+import org.apache.pig.tools.pigstats.PigStats;
public class HJob implements ExecJob {
@@ -44,6 +45,7 @@
protected JOB_STATUS status;
protected PigContext pigContext;
protected FileSpec outFileSpec;
+ private PigStats stats;
public HJob(JOB_STATUS status,
PigContext pigContext,
@@ -53,6 +55,16 @@
this.outFileSpec = outFileSpec;
}
+ public HJob(JOB_STATUS status,
+ PigContext pigContext,
+ FileSpec outFileSpec,
+ PigStats stats) {
+ this.status = status;
+ this.pigContext = pigContext;
+ this.outFileSpec = outFileSpec;
+ this.stats = stats;
+ }
+
public JOB_STATUS getStatus() {
return status;
}
@@ -125,8 +137,9 @@
return props;
}
- public Map<String, Object> getStatistics() {
- throw new UnsupportedOperationException();
+ public PigStats getStatistics() {
+ //throw new UnsupportedOperationException();
+ return stats;
}
public void completionNotification(Object cookie) {
Modified: hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/Launcher.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/Launcher.java?rev=773683&r1=773682&r2=773683&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/Launcher.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/Launcher.java Mon May 11 20:46:56 2009
@@ -48,6 +48,7 @@
import org.apache.pig.impl.plan.PlanException;
import org.apache.pig.impl.plan.VisitorException;
import org.apache.pig.impl.util.LogUtils;
+import org.apache.pig.tools.pigstats.PigStats;
public abstract class Launcher {
private static final Log log = LogFactory.getLog(Launcher.class);
@@ -94,7 +95,7 @@
* @throws ExecException
* @throws JobCreationException
*/
- public abstract boolean launchPig(PhysicalPlan php, String grpName, PigContext pc)
+ public abstract PigStats launchPig(PhysicalPlan php, String grpName, PigContext pc)
throws PlanException, VisitorException, IOException, ExecException,
JobCreationException, Exception;
Modified: hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceLauncher.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceLauncher.java?rev=773683&r1=773682&r2=773683&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceLauncher.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceLauncher.java Mon May 11 20:46:56 2009
@@ -56,6 +56,7 @@
import org.apache.pig.impl.plan.CompilationMessageCollector.Message;
import org.apache.pig.impl.plan.CompilationMessageCollector.MessageType;
import org.apache.pig.impl.util.ConfigurationValidator;
+import org.apache.pig.tools.pigstats.PigStats;
/**
* Main class that launches pig for Map Reduce
@@ -69,7 +70,7 @@
private boolean aggregateWarning = false;
@Override
- public boolean launchPig(PhysicalPlan php,
+ public PigStats launchPig(PhysicalPlan php,
String grpName,
PigContext pc) throws PlanException,
VisitorException,
@@ -80,6 +81,10 @@
long sleepTime = 500;
aggregateWarning = "true".equalsIgnoreCase(pc.getProperties().getProperty("aggregate.warning"));
MROperPlan mrp = compile(php, pc);
+ PigStats stats = new PigStats();
+ stats.setMROperatorPlan(mrp);
+ stats.setExecType(pc.getExecType());
+ stats.setPhysicalPlan(php);
ExecutionEngine exe = pc.getExecutionEngine();
ConfigurationValidator.validatePigProperties(exe.getConfiguration());
@@ -137,6 +142,11 @@
failedJobs.addAll(jc.getFailedJobs());
succJobs.addAll(jc.getSuccessfulJobs());
jcc.moveResults();
+
+ stats.setJobClient(jobClient);
+ stats.setJobControl(jc);
+ stats.accumulateStats();
+
jc.stop();
}
@@ -146,7 +156,7 @@
for (Job fj : failedJobs) {
getStats(fj, jobClient, true, pc);
}
- return false;
+ return null;
}
Map<Enum, Long> warningAggMap = new HashMap<Enum, Long>();
@@ -163,10 +173,18 @@
if(aggregateWarning) {
CompilationMessageCollector.logAggregate(warningAggMap, MessageType.Warning, log) ;
}
+
+ if(stats.getPigStats().get(stats.getLastJobID()) == null)
+ log
+ .warn("Jobs not found in the JobClient. Please try to use Local, Hadoop Distributed or Hadoop MiniCluster modes instead of Hadoop LocalExecution");
+ else {
+ log.info("Records written : " + stats.getRecordsWritten());
+ log.info("Bytes written : " + stats.getBytesWritten());
+ }
log.info( "100% complete");
log.info("Success!");
- return true;
+ return stats;
}
@Override
Modified: hadoop/pig/trunk/src/org/apache/pig/backend/local/executionengine/LocalExecutionEngine.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/local/executionengine/LocalExecutionEngine.java?rev=773683&r1=773682&r2=773683&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/backend/local/executionengine/LocalExecutionEngine.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/backend/local/executionengine/LocalExecutionEngine.java Mon May 11 20:46:56 2009
@@ -55,6 +55,8 @@
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStore;
import org.apache.pig.backend.local.executionengine.physicalLayer.LocalLogToPhyTranslationVisitor;
import org.apache.pig.impl.plan.VisitorException;
+import org.apache.pig.tools.pigstats.PigStats;
+
import java.util.Iterator;
public class LocalExecutionEngine implements ExecutionEngine {
@@ -160,10 +162,10 @@
}
LocalPigLauncher launcher = new LocalPigLauncher();
- boolean success = launcher.launchPig(plan, jobName, pigContext);
- if (success)
+ PigStats stats = launcher.launchPig(plan, jobName, pigContext);
+ if (stats != null)
return new LocalJob(ExecJob.JOB_STATUS.COMPLETED, pigContext,
- spec);
+ spec, stats);
else
return new LocalJob(ExecJob.JOB_STATUS.FAILED, pigContext, null);
} catch (Exception e) {
Modified: hadoop/pig/trunk/src/org/apache/pig/backend/local/executionengine/LocalJob.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/local/executionengine/LocalJob.java?rev=773683&r1=773682&r2=773683&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/backend/local/executionengine/LocalJob.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/backend/local/executionengine/LocalJob.java Mon May 11 20:46:56 2009
@@ -34,6 +34,7 @@
import org.apache.pig.LoadFunc;
import org.apache.pig.impl.io.FileLocalizer;
import org.apache.pig.impl.io.BufferedPositionedInputStream;
+import org.apache.pig.tools.pigstats.PigStats;
public class LocalJob implements ExecJob {
@@ -43,6 +44,7 @@
protected JOB_STATUS status;
protected PigContext pigContext;
protected FileSpec outFileSpec;
+ private PigStats stats;
public LocalJob(JOB_STATUS status,
PigContext pigContext,
@@ -52,6 +54,16 @@
this.outFileSpec = outFileSpec;
}
+ public LocalJob(JOB_STATUS status,
+ PigContext pigContext,
+ FileSpec outFileSpec,
+ PigStats stats) {
+ this.status = status;
+ this.pigContext = pigContext;
+ this.outFileSpec = outFileSpec;
+ this.stats = stats;
+ }
+
public JOB_STATUS getStatus() {
return status;
}
@@ -122,8 +134,9 @@
return props;
}
- public Map<String, Object> getStatistics() {
- throw new UnsupportedOperationException();
+ public PigStats getStatistics() {
+ //throw new UnsupportedOperationException();
+ return stats;
}
public void completionNotification(Object cookie) {
Modified: hadoop/pig/trunk/src/org/apache/pig/backend/local/executionengine/LocalPigLauncher.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/local/executionengine/LocalPigLauncher.java?rev=773683&r1=773682&r2=773683&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/backend/local/executionengine/LocalPigLauncher.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/backend/local/executionengine/LocalPigLauncher.java Mon May 11 20:46:56 2009
@@ -42,6 +42,7 @@
import org.apache.pig.impl.plan.DependencyOrderWalker;
import org.apache.pig.impl.plan.PlanException;
import org.apache.pig.impl.plan.VisitorException;
+import org.apache.pig.tools.pigstats.PigStats;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.util.PlanHelper;
public class LocalPigLauncher extends Launcher {
@@ -58,7 +59,7 @@
}
@Override
- public boolean launchPig(PhysicalPlan php, String grpName, PigContext pc)
+ public PigStats launchPig(PhysicalPlan php, String grpName, PigContext pc)
throws PlanException, VisitorException, IOException, ExecException,
JobCreationException {
@@ -69,6 +70,10 @@
int noJobs = stores.size();
int failedJobs = 0;
+
+ PigStats stats = new PigStats();
+ stats.setPhysicalPlan(php);
+ stats.setExecType(pc.getExecType());
for (POStore op : stores) {
op.setStoreImpl(new LocalPOStoreImpl(pc));
@@ -100,23 +105,26 @@
// The remaining stores can be run together.
failedJobs += runPipeline(stores.toArray(new POStore[0]));
+
+ stats.accumulateStats();
UDFFinishVisitor finisher = new UDFFinishVisitor(php, new DependencyOrderWalker<PhysicalOperator, PhysicalPlan>(php));
finisher.visit();
if (failedJobs == 0) {
+ log.info("Records written : " + stats.getRecordsWritten());
+ log.info("Bytes written : " + stats.getBytesWritten());
log.info("100% complete!");
log.info("Success!!");
- return true;
+ return stats;
} else {
log.info("Failed jobs!!");
log.info(failedJobs + " out of " + noJobs + " failed!");
}
- return false;
+ return null;
}
-
private int runPipeline(POStore[] leaves) throws IOException, ExecException {
BitSet bs = new BitSet(leaves.length);
Modified: hadoop/pig/trunk/src/org/apache/pig/backend/local/executionengine/physicalLayer/LocalLogToPhyTranslationVisitor.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/local/executionengine/physicalLayer/LocalLogToPhyTranslationVisitor.java?rev=773683&r1=773682&r2=773683&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/backend/local/executionengine/physicalLayer/LocalLogToPhyTranslationVisitor.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/backend/local/executionengine/physicalLayer/LocalLogToPhyTranslationVisitor.java Mon May 11 20:46:56 2009
@@ -26,22 +26,28 @@
import org.apache.pig.PigException;
import org.apache.pig.backend.executionengine.ExecException;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.LogToPhyTranslationVisitor;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.LogicalToPhysicalTranslatorException;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POLocalRearrangeForIllustrate;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStore;
+import org.apache.pig.backend.local.executionengine.physicalLayer.counters.POCounter;
import org.apache.pig.backend.local.executionengine.physicalLayer.relationalOperators.POCogroup;
import org.apache.pig.backend.local.executionengine.physicalLayer.relationalOperators.POCross;
import org.apache.pig.backend.local.executionengine.physicalLayer.relationalOperators.POSplit;
import org.apache.pig.backend.local.executionengine.physicalLayer.relationalOperators.POSplitOutput;
import org.apache.pig.backend.local.executionengine.physicalLayer.relationalOperators.POStreamLocal;
import org.apache.pig.data.DataType;
+import org.apache.pig.impl.logicalLayer.FrontendException;
import org.apache.pig.impl.logicalLayer.LOCogroup;
import org.apache.pig.impl.logicalLayer.LOCross;
import org.apache.pig.impl.logicalLayer.LOSplit;
import org.apache.pig.impl.logicalLayer.LOSplitOutput;
+import org.apache.pig.impl.logicalLayer.LOStore;
import org.apache.pig.impl.logicalLayer.LOStream;
import org.apache.pig.impl.logicalLayer.LogicalOperator;
import org.apache.pig.impl.logicalLayer.LogicalPlan;
+import org.apache.pig.impl.logicalLayer.schema.Schema;
import org.apache.pig.impl.plan.OperatorKey;
import org.apache.pig.impl.plan.PlanException;
import org.apache.pig.impl.plan.PlanWalker;
@@ -215,5 +221,45 @@
}
//currentPlan.explain(System.out);
}
+
+ @Override
+ public void visit(LOStore loStore) throws VisitorException {
+ String scope = loStore.getOperatorKey().scope;
+ POStore store = new POStore(new OperatorKey(scope, nodeGen
+ .getNextNodeId(scope)));
+ store.setSFile(loStore.getOutputFile());
+ store.setInputSpec(loStore.getInputSpec());
+ try {
+ // create a new schema for ourselves so that when
+ // we serialize we are not serializing objects that
+ // contain the schema - apparently Java tries to
+ // serialize the object containing the schema if
+ // we are trying to serialize the schema reference in
+ // the containing object. The schema here will be serialized
+ // in JobControlCompiler
+ store.setSchema(new Schema(loStore.getSchema()));
+ } catch (FrontendException e1) {
+ int errorCode = 1060;
+ String message = "Cannot resolve Store output schema";
+ throw new VisitorException(message, errorCode, PigException.BUG, e1);
+ }
+ //store.setPc(pc);
+ currentPlan.add(store);
+ PhysicalOperator from = LogToPhyMap.get(loStore
+ .getPlan().getPredecessors(loStore).get(0));
+
+ POCounter counter = new POCounter(new OperatorKey(scope, nodeGen.getNextNodeId(scope)));
+ currentPlan.add(counter);
+ try {
+ currentPlan.connect(from, counter);
+ currentPlan.connect(counter, store);
+ } catch (PlanException e) {
+ int errCode = 2015;
+ String msg = "Invalid physical operators in the physical plan" ;
+ throw new LogicalToPhysicalTranslatorException(msg, errCode, PigException.BUG, e);
+ }
+ LogToPhyMap.put(loStore, store);
+
+ }
}
Added: hadoop/pig/trunk/src/org/apache/pig/backend/local/executionengine/physicalLayer/counters/POCounter.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/local/executionengine/physicalLayer/counters/POCounter.java?rev=773683&view=auto
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/backend/local/executionengine/physicalLayer/counters/POCounter.java (added)
+++ hadoop/pig/trunk/src/org/apache/pig/backend/local/executionengine/physicalLayer/counters/POCounter.java Mon May 11 20:46:56 2009
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.pig.backend.local.executionengine.physicalLayer.counters;
+
+import java.util.List;
+import java.util.Map;
+
+import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.POStatus;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.Result;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhyPlanVisitor;
+import org.apache.pig.data.DataBag;
+import org.apache.pig.data.DataByteArray;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.impl.plan.OperatorKey;
+import org.apache.pig.impl.plan.VisitorException;
+
+public class POCounter extends PhysicalOperator {
+
+ static final long serialVersionUID = 1L;
+
+ private long count = 0;
+
+ public POCounter(OperatorKey k, int rp, List<PhysicalOperator> inp) {
+ super(k, rp, inp);
+ // TODO Auto-generated constructor stub
+ }
+
+ public POCounter(OperatorKey k, int rp) {
+ super(k, rp);
+ // TODO Auto-generated constructor stub
+ }
+
+ public POCounter(OperatorKey k, List<PhysicalOperator> inp) {
+ super(k, inp);
+ // TODO Auto-generated constructor stub
+ }
+
+ public POCounter(OperatorKey k) {
+ super(k);
+ // TODO Auto-generated constructor stub
+ }
+
+ @Override
+ public void visit(PhyPlanVisitor v) throws VisitorException {
+ // TODO Auto-generated method stub
+
+ }
+
+
+ @Override
+ public Result getNext(Tuple t) throws ExecException {
+ // TODO Auto-generated method stub
+ return getNext(processInput());
+ }
+
+ private Result getNext(Result res) {
+ //System.out.println("Status = " + res.returnStatus);
+ if(res.returnStatus == POStatus.STATUS_OK) {
+ //System.out.println("Incrementing counter");
+ count++;
+ }
+ return res;
+ }
+
+ @Override
+ public String name() {
+ // TODO Auto-generated method stub
+ return "POCounter - " + mKey.toString();
+ }
+
+ @Override
+ public boolean supportsMultipleInputs() {
+ // TODO Auto-generated method stub
+ return false;
+ }
+
+ @Override
+ public boolean supportsMultipleOutputs() {
+ // TODO Auto-generated method stub
+ return false;
+ }
+
+ public long getCount() {
+ return count;
+ }
+
+}
Added: hadoop/pig/trunk/src/org/apache/pig/tools/pigstats/PigStats.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/tools/pigstats/PigStats.java?rev=773683&view=auto
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/tools/pigstats/PigStats.java (added)
+++ hadoop/pig/trunk/src/org/apache/pig/tools/pigstats/PigStats.java Mon May 11 20:46:56 2009
@@ -0,0 +1,262 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.pig.tools.pigstats;
+
+import java.io.ByteArrayOutputStream;
+import java.io.File;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.io.PrintStream;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.hadoop.mapred.Counters;
+import org.apache.hadoop.mapred.JobClient;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.RunningJob;
+import org.apache.hadoop.mapred.Counters.Counter;
+import org.apache.hadoop.mapred.Counters.Group;
+import org.apache.hadoop.mapred.jobcontrol.Job;
+import org.apache.hadoop.mapred.jobcontrol.JobControl;
+import org.apache.pig.ExecType;
+import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.plans.MROperPlan;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStore;
+import org.apache.pig.backend.local.executionengine.physicalLayer.counters.POCounter;
+import org.apache.pig.impl.io.FileSpec;
+import org.apache.pig.impl.util.ObjectSerializer;
+
+public class PigStats {
+ MROperPlan mrp;
+ PhysicalPlan php;
+ JobControl jc;
+ JobClient jobClient;
+ Map<String, Map<String, String>> stats = new HashMap<String, Map<String,String>>();
+ String lastJobID;
+ ExecType mode;
+
+ public void setMROperatorPlan(MROperPlan mrp) {
+ this.mrp = mrp;
+ }
+
+ public void setJobControl(JobControl jc) {
+ this.jc = jc;
+ }
+
+ public void setJobClient(JobClient jobClient) {
+ this.jobClient = jobClient;
+ }
+
+ public String getMRPlan() {
+ ByteArrayOutputStream baos = new ByteArrayOutputStream();
+ mrp.dump(new PrintStream(baos));
+ return baos.toString();
+ }
+
+ public void setExecType(ExecType mode) {
+ this.mode = mode;
+ }
+
+ public void setPhysicalPlan(PhysicalPlan php) {
+ this.php = php;
+ }
+
+ public String getPhysicalPlan() {
+ ByteArrayOutputStream baos = new ByteArrayOutputStream();
+ php.explain(baos);
+ return baos.toString();
+ }
+
+ public Map<String, Map<String, String>> accumulateStats() throws ExecException {
+ if(mode == ExecType.MAPREDUCE)
+ return accumulateMRStats();
+ else if(mode == ExecType.LOCAL)
+ return accumulateLocalStats();
+ else
+ throw new RuntimeException("Unrecognized mode. Either MapReduce or Local mode expected.");
+ }
+
+ private Map<String, Map<String, String>> accumulateLocalStats() {
+ //The counter placed before a store in the local plan should be able to get the number of records
+ for(PhysicalOperator op : php.getLeaves()) {
+ Map<String, String> jobStats = new HashMap<String, String>();
+ stats.put(op.toString(), jobStats);
+ POCounter counter = (POCounter) php.getPredecessors(op).get(0);
+ jobStats.put("PIG_STATS_LOCAL_OUTPUT_RECORDS", (Long.valueOf(counter.getCount())).toString());
+ jobStats.put("PIG_STATS_LOCAL_BYTES_WRITTEN", (Long.valueOf((new File(((POStore)op).getSFile().getFileName())).length())).toString());
+ }
+ return stats;
+ }
+
+ private Map<String, Map<String, String>> accumulateMRStats() throws ExecException {
+
+ Job lastJob = getLastJob(jc.getSuccessfulJobs());
+
+ for(Job job : jc.getSuccessfulJobs()) {
+
+
+ JobConf jobConf = job.getJobConf();
+
+
+ RunningJob rj = null;
+ try {
+ rj = jobClient.getJob(job.getAssignedJobID());
+ } catch (IOException e1) {
+ String error = "Unable to get the job statistics from JobClient.";
+ throw new ExecException(error, e1);
+ }
+ if(rj == null)
+ continue;
+
+ Map<String, String> jobStats = new HashMap<String, String>();
+ stats.put(job.getAssignedJobID().toString(), jobStats);
+
+ try {
+ PhysicalPlan plan = (PhysicalPlan) ObjectSerializer.deserialize(jobConf.get("pig.mapPlan"));
+ jobStats.put("PIG_STATS_MAP_PLAN", plan.toString());
+ plan = (PhysicalPlan) ObjectSerializer.deserialize(jobConf.get("pig.combinePlan"));
+ if(plan != null) {
+ jobStats.put("PIG_STATS_COMBINE_PLAN", plan.toString());
+ }
+ plan = (PhysicalPlan) ObjectSerializer.deserialize(jobConf.get("pig.reducePlan"));
+ if(plan != null) {
+ jobStats.put("PIG_STATS_REDUCE_PLAN", plan.toString());
+ }
+ } catch (IOException e2) {
+ String error = "Error deserializing plans from the JobConf.";
+ throw new RuntimeException(error, e2);
+ }
+
+ Counters counters = null;
+ try {
+ counters = rj.getCounters();
+ Counters.Group taskgroup = counters.getGroup("org.apache.hadoop.mapred.Task$Counter");
+ Counters.Group hdfsgroup = counters.getGroup("org.apache.hadoop.mapred.Task$FileSystemCounter");
+
+ System.out.println("BYTES WRITTEN : " + hdfsgroup.getCounterForName("HDFS_WRITE").getCounter());
+
+ jobStats.put("PIG_STATS_MAP_INPUT_RECORDS", (Long.valueOf(taskgroup.getCounterForName("MAP_INPUT_RECORDS").getCounter())).toString());
+ jobStats.put("PIG_STATS_MAP_OUTPUT_RECORDS", (Long.valueOf(taskgroup.getCounterForName("MAP_OUTPUT_RECORDS").getCounter())).toString());
+ jobStats.put("PIG_STATS_REDUCE_INPUT_RECORDS", (Long.valueOf(taskgroup.getCounterForName("REDUCE_INPUT_RECORDS").getCounter())).toString());
+ jobStats.put("PIG_STATS_REDUCE_OUTPUT_RECORDS", (Long.valueOf(taskgroup.getCounterForName("REDUCE_OUTPUT_RECORDS").getCounter())).toString());
+ jobStats.put("PIG_STATS_BYTES_WRITTEN", (Long.valueOf(hdfsgroup.getCounterForName("HDFS_WRITE").getCounter())).toString());
+ } catch (IOException e) {
+ // TODO Auto-generated catch block
+ String error = "Unable to get the counters.";
+ throw new ExecException(error, e);
+ }
+
+
+
+ }
+
+ lastJobID = lastJob.getAssignedJobID().toString();
+ return stats;
+ }
+
+
+ private Job getLastJob(List<Job> jobs) {
+ Set<Job> temp = new HashSet<Job>();
+ for(Job job : jobs) {
+ if(job.getDependingJobs() != null && job.getDependingJobs().size() > 0)
+ temp.addAll(job.getDependingJobs());
+ }
+
+ //difference between temp and jobs would be the set of leaves
+ //we can safely assume there would be only one leaf
+ for(Job job : jobs) {
+ if(temp.contains(job))
+ continue;
+ else
+ //this means a leaf
+ return job;
+ }
+ return null;
+ }
+
+ public String getLastJobID() {
+ return lastJobID;
+ }
+
+ public Map<String, Map<String, String>> getPigStats() {
+ return stats;
+ }
+
+ public long getRecordsWritten() {
+ if(mode == ExecType.LOCAL)
+ return getRecordsCountLocal();
+ else if(mode == ExecType.MAPREDUCE)
+ return getRecordsCountMR();
+ else
+ throw new RuntimeException("Unrecognized mode. Either MapReduce or Local mode expected.");
+ }
+
+ private long getRecordsCountLocal() {
+ //System.out.println(getPhysicalPlan());
+ //because of the nature of the parser, there will always be only one store
+
+ for(PhysicalOperator op : php.getLeaves()) {
+ return Long.parseLong(stats.get(op.toString()).get("PIG_STATS_LOCAL_OUTPUT_RECORDS"));
+ }
+ return 0;
+ }
+
+ /**
+ * Returns the no. of records written by the pig script in MR mode
+ * @return
+ */
+ private long getRecordsCountMR() {
+ String reducePlan = stats.get(lastJobID).get("PIG_STATS_REDUCE_PLAN");
+ String records = null;
+ if(reducePlan == null) {
+ records = stats.get(lastJobID).get("PIG_STATS_MAP_OUTPUT_RECORDS");
+ } else {
+ records = stats.get(lastJobID).get("PIG_STATS_REDUCE_OUTPUT_RECORDS");
+ }
+ return Long.parseLong(records);
+ }
+
+ public long getBytesWritten() {
+ if(mode == ExecType.LOCAL) {
+ return getLocalBytesWritten();
+ } else if(mode == ExecType.MAPREDUCE) {
+ return getMapReduceBytesWritten();
+ } else {
+ throw new RuntimeException("Unrecognized mode. Either MapReduce or Local mode expected.");
+ }
+
+ }
+
+ private long getLocalBytesWritten() {
+ for(PhysicalOperator op : php.getLeaves())
+ return Long.parseLong(stats.get(op.toString()).get("PIG_STATS_LOCAL_BYTES_WRITTEN"));
+ return 0;
+ }
+
+ private long getMapReduceBytesWritten() {
+ return Long.parseLong(stats.get(lastJobID).get("PIG_STATS_BYTES_WRITTEN"));
+ }
+
+}
Added: hadoop/pig/trunk/test/org/apache/pig/test/TestCounters.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/test/org/apache/pig/test/TestCounters.java?rev=773683&view=auto
==============================================================================
--- hadoop/pig/trunk/test/org/apache/pig/test/TestCounters.java (added)
+++ hadoop/pig/trunk/test/org/apache/pig/test/TestCounters.java Mon May 11 20:46:56 2009
@@ -0,0 +1,562 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.pig.test;
+
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.PrintWriter;
+import java.util.Map;
+import java.util.Random;
+
+import junit.framework.TestCase;
+
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.Path;
+import org.apache.pig.ExecType;
+import org.apache.pig.PigServer;
+import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.impl.io.FileLocalizer;
+import org.apache.pig.tools.pigstats.PigStats;
+import org.junit.Before;
+import org.junit.Test;
+
+public class TestCounters extends TestCase {
+ String file = "input.txt";
+
+ MiniCluster cluster = MiniCluster.buildCluster();
+
+ final int MAX = 100*1000;
+ Random r = new Random();
+
+ @Test
+ public void testMapOnly() throws IOException, ExecException {
+ int count = 0;
+ PrintWriter pw = new PrintWriter(Util.createInputFile(cluster, file));
+ for(int i = 0; i < MAX; i++) {
+ int t = r.nextInt(100);
+ pw.println(t);
+ if(t > 50)
+ count ++;
+ }
+ pw.close();
+ PigServer pigServer = new PigServer(ExecType.MAPREDUCE, cluster.getProperties());
+ pigServer.registerQuery("a = load '" + file + "';");
+ pigServer.registerQuery("b = filter a by $0 > 50;");
+ pigServer.registerQuery("c = foreach b generate $0 - 50;");
+ PigStats pigStats = pigServer.store("c", "output_map_only").getStatistics();
+
+ //PigStats pigStats = pigServer.getPigStats();
+ Map<String, Map<String, String>> stats = pigStats.getPigStats();
+
+ //counting the no. of bytes in the output file
+ //long filesize = cluster.getFileSystem().getFileStatus(new Path("output_map_only")).getLen();
+ InputStream is = FileLocalizer.open(FileLocalizer.fullPath("output_map_only", pigServer.getPigContext()), ExecType.MAPREDUCE, pigServer.getPigContext().getDfs());
+ long filesize = 0;
+ while(is.read() != -1) filesize++;
+
+ is.close();
+
+ cluster.getFileSystem().delete(new Path(file), true);
+ cluster.getFileSystem().delete(new Path("output_map_only"), true);
+
+ System.out.println("============================================");
+ System.out.println("Test case Map Only");
+ System.out.println("============================================");
+ System.out.println("MRPlan : \n" + pigStats.getMRPlan());
+ for(Map.Entry<String, Map<String, String>> entry : stats.entrySet()) {
+ System.out.println("============================================");
+ System.out.println("Job : " + entry.getKey());
+ for(Map.Entry<String, String> e1 : entry.getValue().entrySet()) {
+ System.out.println(" - " + e1.getKey() + " : \n" + e1.getValue());
+ }
+ System.out.println("============================================");
+ }
+
+ Map.Entry<String, Map<String, String>> e = stats.entrySet().iterator().next();
+
+ //System.out.println("Job Name : " + e.getKey());
+
+ Map<String, String> jobStats = e.getValue();
+
+ System.out.println("Map input records : " + jobStats.get("PIG_STATS_MAP_INPUT_RECORDS"));
+ assertEquals(MAX, Integer.parseInt(jobStats.get("PIG_STATS_MAP_INPUT_RECORDS")));
+ System.out.println("Map output records : " + jobStats.get("PIG_STATS_MAP_OUTPUT_RECORDS"));
+ assertEquals(count, Integer.parseInt(jobStats.get("PIG_STATS_MAP_OUTPUT_RECORDS")));
+ assertNull(jobStats.get("PIG_STATS_REDUCE_PLAN"));
+ assertNull(jobStats.get("PIG_STATS_COMBINE_PLAN"));
+ assertNotNull(jobStats.get("PIG_STATS_MAP_PLAN"));
+ assertEquals(0, Integer.parseInt(jobStats.get("PIG_STATS_REDUCE_INPUT_RECORDS")));
+ assertEquals(0, Integer.parseInt(jobStats.get("PIG_STATS_REDUCE_OUTPUT_RECORDS")));
+
+ assertEquals(count, pigStats.getRecordsWritten());
+ assertEquals(filesize, pigStats.getBytesWritten());
+
+ }
+
+ @Test
+ public void testMapOnlyBinStorage() throws IOException, ExecException {
+ int count = 0;
+ PrintWriter pw = new PrintWriter(Util.createInputFile(cluster, file));
+ for(int i = 0; i < MAX; i++) {
+ int t = r.nextInt(100);
+ pw.println(t);
+ if(t > 50)
+ count ++;
+ }
+ pw.close();
+ PigServer pigServer = new PigServer(ExecType.MAPREDUCE, cluster.getProperties());
+ pigServer.registerQuery("a = load '" + file + "';");
+ pigServer.registerQuery("b = filter a by $0 > 50;");
+ pigServer.registerQuery("c = foreach b generate $0 - 50;");
+ //pigServer.store("c", "output_map_only");
+ PigStats pigStats = pigServer.store("c", "output_map_only", "BinStorage").getStatistics();
+
+ InputStream is = FileLocalizer.open(FileLocalizer.fullPath("output_map_only", pigServer.getPigContext()), ExecType.MAPREDUCE, pigServer.getPigContext().getDfs());
+ long filesize = 0;
+ while(is.read() != -1) filesize++;
+
+ is.close();
+
+ Map<String, Map<String, String>> stats = pigStats.getPigStats();
+ cluster.getFileSystem().delete(new Path(file), true);
+ cluster.getFileSystem().delete(new Path("output_map_only"), true);
+
+ System.out.println("============================================");
+ System.out.println("Test case Map Only");
+ System.out.println("============================================");
+ System.out.println("MRPlan : \n" + pigStats.getMRPlan());
+ for(Map.Entry<String, Map<String, String>> entry : stats.entrySet()) {
+ System.out.println("============================================");
+ System.out.println("Job : " + entry.getKey());
+ for(Map.Entry<String, String> e1 : entry.getValue().entrySet()) {
+ System.out.println(" - " + e1.getKey() + " : \n" + e1.getValue());
+ }
+ System.out.println("============================================");
+ }
+
+ Map.Entry<String, Map<String, String>> e = stats.entrySet().iterator().next();
+
+ //System.out.println("Job Name : " + e.getKey());
+
+ Map<String, String> jobStats = e.getValue();
+
+ System.out.println("Map input records : " + jobStats.get("PIG_STATS_MAP_INPUT_RECORDS"));
+ assertEquals(MAX, Integer.parseInt(jobStats.get("PIG_STATS_MAP_INPUT_RECORDS")));
+ System.out.println("Map output records : " + jobStats.get("PIG_STATS_MAP_OUTPUT_RECORDS"));
+ assertEquals(count, Integer.parseInt(jobStats.get("PIG_STATS_MAP_OUTPUT_RECORDS")));
+ assertNull(jobStats.get("PIG_STATS_REDUCE_PLAN"));
+ assertNull(jobStats.get("PIG_STATS_COMBINE_PLAN"));
+ assertNotNull(jobStats.get("PIG_STATS_MAP_PLAN"));
+ assertEquals(0, Integer.parseInt(jobStats.get("PIG_STATS_REDUCE_INPUT_RECORDS")));
+ assertEquals(0, Integer.parseInt(jobStats.get("PIG_STATS_REDUCE_OUTPUT_RECORDS")));
+
+ assertEquals(count, pigStats.getRecordsWritten());
+ assertEquals(filesize, pigStats.getBytesWritten());
+ }
+
+ @Test
+ public void testMapReduceOnly() throws IOException, ExecException {
+ int count = 0;
+ PrintWriter pw = new PrintWriter(Util.createInputFile(cluster, file));
+ int [] nos = new int[10];
+ for(int i = 0; i < 10; i++)
+ nos[i] = 0;
+
+ for(int i = 0; i < MAX; i++) {
+ int index = r.nextInt(10);
+ int value = r.nextInt(100);
+ nos[index] += value;
+ pw.println(index + "\t" + value);
+ }
+ pw.close();
+
+ for(int i = 0; i < 10; i++)
+ if(nos[i] > 0)
+ count ++;
+
+ PigServer pigServer = new PigServer(ExecType.MAPREDUCE, cluster.getProperties());
+ pigServer.registerQuery("a = load '" + file + "';");
+ pigServer.registerQuery("b = group a by $0;");
+ pigServer.registerQuery("c = foreach b generate group;");
+ PigStats pigStats = pigServer.store("c", "output").getStatistics();
+ InputStream is = FileLocalizer.open(FileLocalizer.fullPath("output", pigServer.getPigContext()), ExecType.MAPREDUCE, pigServer.getPigContext().getDfs());
+ long filesize = 0;
+ while(is.read() != -1) filesize++;
+
+ is.close();
+
+ Map<String, Map<String, String>> stats = pigStats.getPigStats();
+ cluster.getFileSystem().delete(new Path(file), true);
+ cluster.getFileSystem().delete(new Path("output"), true);
+
+ System.out.println("============================================");
+ System.out.println("Test case MapReduce");
+ System.out.println("============================================");
+ System.out.println("MRPlan : \n" + pigStats.getMRPlan());
+ for(Map.Entry<String, Map<String, String>> entry : stats.entrySet()) {
+ System.out.println("============================================");
+ System.out.println("Job : " + entry.getKey());
+ for(Map.Entry<String, String> e1 : entry.getValue().entrySet()) {
+ System.out.println(" - " + e1.getKey() + " : \n" + e1.getValue());
+ }
+ System.out.println("============================================");
+ }
+
+ Map.Entry<String, Map<String, String>> e = stats.entrySet().iterator().next();
+
+ Map<String, String> jobStats = e.getValue();
+
+ System.out.println("Map input records : " + jobStats.get("PIG_STATS_MAP_INPUT_RECORDS"));
+ assertEquals(MAX, Integer.parseInt(jobStats.get("PIG_STATS_MAP_INPUT_RECORDS")));
+ System.out.println("Map output records : " + jobStats.get("PIG_STATS_MAP_OUTPUT_RECORDS"));
+ assertEquals(MAX, Integer.parseInt(jobStats.get("PIG_STATS_MAP_OUTPUT_RECORDS")));
+ System.out.println("Reduce input records : " + jobStats.get("PIG_STATS_REDUCE_INPUT_RECORDS"));
+ assertEquals(MAX, Integer.parseInt(jobStats.get("PIG_STATS_REDUCE_INPUT_RECORDS")));
+ System.out.println("Reduce output records : " + jobStats.get("PIG_STATS_REDUCE_OUTPUT_RECORDS"));
+ assertEquals(count, Integer.parseInt(jobStats.get("PIG_STATS_REDUCE_OUTPUT_RECORDS")));
+
+ assertNull(jobStats.get("PIG_STATS_COMBINE_PLAN"));
+ assertNotNull(jobStats.get("PIG_STATS_MAP_PLAN"));
+ assertNotNull(jobStats.get("PIG_STATS_REDUCE_PLAN"));
+
+ assertEquals(count, pigStats.getRecordsWritten());
+ assertEquals(filesize, pigStats.getBytesWritten());
+ }
+
+ @Test
+ public void testMapReduceOnlyBinStorage() throws IOException, ExecException {
+ int count = 0;
+ PrintWriter pw = new PrintWriter(Util.createInputFile(cluster, file));
+ int [] nos = new int[10];
+ for(int i = 0; i < 10; i++)
+ nos[i] = 0;
+
+ for(int i = 0; i < MAX; i++) {
+ int index = r.nextInt(10);
+ int value = r.nextInt(100);
+ nos[index] += value;
+ pw.println(index + "\t" + value);
+ }
+ pw.close();
+
+ for(int i = 0; i < 10; i++)
+ if(nos[i] > 0)
+ count ++;
+
+ PigServer pigServer = new PigServer(ExecType.MAPREDUCE, cluster.getProperties());
+ pigServer.registerQuery("a = load '" + file + "';");
+ pigServer.registerQuery("b = group a by $0;");
+ pigServer.registerQuery("c = foreach b generate group;");
+ PigStats pigStats = pigServer.store("c", "output", "BinStorage").getStatistics();
+
+ InputStream is = FileLocalizer.open(FileLocalizer.fullPath("output", pigServer.getPigContext()), ExecType.MAPREDUCE, pigServer.getPigContext().getDfs());
+ long filesize = 0;
+ while(is.read() != -1) filesize++;
+
+ is.close();
+
+ Map<String, Map<String, String>> stats = pigStats.getPigStats();
+ cluster.getFileSystem().delete(new Path(file), true);
+ cluster.getFileSystem().delete(new Path("output"), true);
+
+ System.out.println("============================================");
+ System.out.println("Test case MapReduce");
+ System.out.println("============================================");
+ System.out.println("MRPlan : \n" + pigStats.getMRPlan());
+ for(Map.Entry<String, Map<String, String>> entry : stats.entrySet()) {
+ System.out.println("============================================");
+ System.out.println("Job : " + entry.getKey());
+ for(Map.Entry<String, String> e1 : entry.getValue().entrySet()) {
+ System.out.println(" - " + e1.getKey() + " : \n" + e1.getValue());
+ }
+ System.out.println("============================================");
+ }
+
+ Map.Entry<String, Map<String, String>> e = stats.entrySet().iterator().next();
+
+ Map<String, String> jobStats = e.getValue();
+
+ System.out.println("Map input records : " + jobStats.get("PIG_STATS_MAP_INPUT_RECORDS"));
+ assertEquals(MAX, Integer.parseInt(jobStats.get("PIG_STATS_MAP_INPUT_RECORDS")));
+ System.out.println("Map output records : " + jobStats.get("PIG_STATS_MAP_OUTPUT_RECORDS"));
+ assertEquals(MAX, Integer.parseInt(jobStats.get("PIG_STATS_MAP_OUTPUT_RECORDS")));
+ System.out.println("Reduce input records : " + jobStats.get("PIG_STATS_REDUCE_INPUT_RECORDS"));
+ assertEquals(MAX, Integer.parseInt(jobStats.get("PIG_STATS_REDUCE_INPUT_RECORDS")));
+ System.out.println("Reduce output records : " + jobStats.get("PIG_STATS_REDUCE_OUTPUT_RECORDS"));
+ assertEquals(count, Integer.parseInt(jobStats.get("PIG_STATS_REDUCE_OUTPUT_RECORDS")));
+
+ assertNull(jobStats.get("PIG_STATS_COMBINE_PLAN"));
+ assertNotNull(jobStats.get("PIG_STATS_MAP_PLAN"));
+ assertNotNull(jobStats.get("PIG_STATS_REDUCE_PLAN"));
+
+ assertEquals(count, pigStats.getRecordsWritten());
+ assertEquals(filesize, pigStats.getBytesWritten());
+
+ }
+
+ @Test
+ public void testMapCombineReduce() throws IOException, ExecException {
+ int count = 0;
+ PrintWriter pw = new PrintWriter(Util.createInputFile(cluster, file));
+ int [] nos = new int[10];
+ for(int i = 0; i < 10; i++)
+ nos[i] = 0;
+
+ for(int i = 0; i < MAX; i++) {
+ int index = r.nextInt(10);
+ int value = r.nextInt(100);
+ nos[index] += value;
+ pw.println(index + "\t" + value);
+ }
+ pw.close();
+
+ for(int i = 0; i < 10; i++)
+ if(nos[i] > 0)
+ count ++;
+
+ PigServer pigServer = new PigServer(ExecType.MAPREDUCE, cluster.getProperties());
+ pigServer.registerQuery("a = load '" + file + "';");
+ pigServer.registerQuery("b = group a by $0;");
+ pigServer.registerQuery("c = foreach b generate group, SUM(a.$1);");
+ PigStats pigStats = pigServer.store("c", "output").getStatistics();
+
+ InputStream is = FileLocalizer.open(FileLocalizer.fullPath("output", pigServer.getPigContext()), ExecType.MAPREDUCE, pigServer.getPigContext().getDfs());
+ long filesize = 0;
+ while(is.read() != -1) filesize++;
+
+ is.close();
+ Map<String, Map<String, String>> stats = pigStats.getPigStats();
+ cluster.getFileSystem().delete(new Path(file), true);
+ cluster.getFileSystem().delete(new Path("output"), true);
+
+ System.out.println("============================================");
+ System.out.println("Test case MapCombineReduce");
+ System.out.println("============================================");
+ System.out.println("MRPlan : \n" + pigStats.getMRPlan());
+ for(Map.Entry<String, Map<String, String>> entry : stats.entrySet()) {
+ System.out.println("============================================");
+ System.out.println("Job : " + entry.getKey());
+ for(Map.Entry<String, String> e1 : entry.getValue().entrySet()) {
+ System.out.println(" - " + e1.getKey() + " : \n" + e1.getValue());
+ }
+ System.out.println("============================================");
+ }
+
+ Map.Entry<String, Map<String, String>> e = stats.entrySet().iterator().next();
+
+ Map<String, String> jobStats = e.getValue();
+
+ System.out.println("Map input records : " + jobStats.get("PIG_STATS_MAP_INPUT_RECORDS"));
+ assertEquals(MAX, Integer.parseInt(jobStats.get("PIG_STATS_MAP_INPUT_RECORDS")));
+ System.out.println("Map output records : " + jobStats.get("PIG_STATS_MAP_OUTPUT_RECORDS"));
+ assertEquals(MAX, Integer.parseInt(jobStats.get("PIG_STATS_MAP_OUTPUT_RECORDS")));
+ System.out.println("Reduce input records : " + jobStats.get("PIG_STATS_REDUCE_INPUT_RECORDS"));
+ assertEquals(count, Integer.parseInt(jobStats.get("PIG_STATS_REDUCE_INPUT_RECORDS")));
+ System.out.println("Reduce output records : " + jobStats.get("PIG_STATS_REDUCE_OUTPUT_RECORDS"));
+ assertEquals(count, Integer.parseInt(jobStats.get("PIG_STATS_REDUCE_OUTPUT_RECORDS")));
+
+ assertNotNull(jobStats.get("PIG_STATS_COMBINE_PLAN"));
+ assertNotNull(jobStats.get("PIG_STATS_MAP_PLAN"));
+ assertNotNull(jobStats.get("PIG_STATS_REDUCE_PLAN"));
+
+ assertEquals(count, pigStats.getRecordsWritten());
+ assertEquals(filesize, pigStats.getBytesWritten());
+ }
+
+ @Test
+ public void testMapCombineReduceBinStorage() throws IOException, ExecException {
+ int count = 0;
+ PrintWriter pw = new PrintWriter(Util.createInputFile(cluster, file));
+ int [] nos = new int[10];
+ for(int i = 0; i < 10; i++)
+ nos[i] = 0;
+
+ for(int i = 0; i < MAX; i++) {
+ int index = r.nextInt(10);
+ int value = r.nextInt(100);
+ nos[index] += value;
+ pw.println(index + "\t" + value);
+ }
+ pw.close();
+
+ for(int i = 0; i < 10; i++)
+ if(nos[i] > 0)
+ count ++;
+
+ PigServer pigServer = new PigServer(ExecType.MAPREDUCE, cluster.getProperties());
+ pigServer.registerQuery("a = load '" + file + "';");
+ pigServer.registerQuery("b = group a by $0;");
+ pigServer.registerQuery("c = foreach b generate group, SUM(a.$1);");
+ PigStats pigStats = pigServer.store("c", "output", "BinStorage").getStatistics();
+
+ InputStream is = FileLocalizer.open(FileLocalizer.fullPath("output", pigServer.getPigContext()), ExecType.MAPREDUCE, pigServer.getPigContext().getDfs());
+ long filesize = 0;
+ while(is.read() != -1) filesize++;
+
+ is.close();
+ Map<String, Map<String, String>> stats = pigStats.getPigStats();
+ cluster.getFileSystem().delete(new Path(file), true);
+ cluster.getFileSystem().delete(new Path("output"), true);
+
+ System.out.println("============================================");
+ System.out.println("Test case MapCombineReduce");
+ System.out.println("============================================");
+ System.out.println("MRPlan : \n" + pigStats.getMRPlan());
+ for(Map.Entry<String, Map<String, String>> entry : stats.entrySet()) {
+ System.out.println("============================================");
+ System.out.println("Job : " + entry.getKey());
+ for(Map.Entry<String, String> e1 : entry.getValue().entrySet()) {
+ System.out.println(" - " + e1.getKey() + " : \n" + e1.getValue());
+ }
+ System.out.println("============================================");
+ }
+
+ Map.Entry<String, Map<String, String>> e = stats.entrySet().iterator().next();
+
+ Map<String, String> jobStats = e.getValue();
+
+ System.out.println("Map input records : " + jobStats.get("PIG_STATS_MAP_INPUT_RECORDS"));
+ assertEquals(MAX, Integer.parseInt(jobStats.get("PIG_STATS_MAP_INPUT_RECORDS")));
+ System.out.println("Map output records : " + jobStats.get("PIG_STATS_MAP_OUTPUT_RECORDS"));
+ assertEquals(MAX, Integer.parseInt(jobStats.get("PIG_STATS_MAP_OUTPUT_RECORDS")));
+ System.out.println("Reduce input records : " + jobStats.get("PIG_STATS_REDUCE_INPUT_RECORDS"));
+ assertEquals(count, Integer.parseInt(jobStats.get("PIG_STATS_REDUCE_INPUT_RECORDS")));
+ System.out.println("Reduce output records : " + jobStats.get("PIG_STATS_REDUCE_OUTPUT_RECORDS"));
+ assertEquals(count, Integer.parseInt(jobStats.get("PIG_STATS_REDUCE_OUTPUT_RECORDS")));
+
+ assertNotNull(jobStats.get("PIG_STATS_COMBINE_PLAN"));
+ assertNotNull(jobStats.get("PIG_STATS_MAP_PLAN"));
+ assertNotNull(jobStats.get("PIG_STATS_REDUCE_PLAN"));
+
+ assertEquals(count, pigStats.getRecordsWritten());
+ assertEquals(filesize, pigStats.getBytesWritten());
+ }
+
+ @Test
+ public void testMultipleMRJobs() throws IOException, ExecException {
+ int count = 0;
+ PrintWriter pw = new PrintWriter(Util.createInputFile(cluster, file));
+ int [] nos = new int[10];
+ for(int i = 0; i < 10; i++)
+ nos[i] = 0;
+
+ for(int i = 0; i < MAX; i++) {
+ int index = r.nextInt(10);
+ int value = r.nextInt(100);
+ nos[index] += value;
+ pw.println(index + "\t" + value);
+ }
+ pw.close();
+
+ for(int i = 0; i < 10; i++)
+ if(nos[i] > 0)
+ count ++;
+
+ PigServer pigServer = new PigServer(ExecType.MAPREDUCE, cluster.getProperties());
+ pigServer.registerQuery("a = load '" + file + "';");
+ pigServer.registerQuery("b = order a by $0;");
+ pigServer.registerQuery("c = group b by $0;");
+ pigServer.registerQuery("d = foreach c generate group, SUM(b.$1);");
+ PigStats pigStats = pigServer.store("d", "output").getStatistics();
+
+ InputStream is = FileLocalizer.open(FileLocalizer.fullPath("output", pigServer.getPigContext()), ExecType.MAPREDUCE, pigServer.getPigContext().getDfs());
+ long filesize = 0;
+ while(is.read() != -1) filesize++;
+
+ is.close();
+ Map<String, Map<String, String>> stats = pigStats.getPigStats();
+ cluster.getFileSystem().delete(new Path(file), true);
+ cluster.getFileSystem().delete(new Path("output"), true);
+
+ System.out.println("============================================");
+ System.out.println("Test case MultipleMRJobs");
+ System.out.println("============================================");
+ System.out.println("MRPlan : \n" + pigStats.getMRPlan());
+ for(Map.Entry<String, Map<String, String>> entry : stats.entrySet()) {
+ System.out.println("============================================");
+ System.out.println("Job : " + entry.getKey());
+ for(Map.Entry<String, String> e1 : entry.getValue().entrySet()) {
+ System.out.println(" - " + e1.getKey() + " : \n" + e1.getValue());
+ }
+ System.out.println("============================================");
+ }
+
+ Map<String, String> jobStats = stats.get(pigStats.getLastJobID());
+
+ System.out.println("Map input records : " + jobStats.get("PIG_STATS_MAP_INPUT_RECORDS"));
+ assertEquals(MAX, Integer.parseInt(jobStats.get("PIG_STATS_MAP_INPUT_RECORDS")));
+ System.out.println("Map output records : " + jobStats.get("PIG_STATS_MAP_OUTPUT_RECORDS"));
+ assertEquals(MAX, Integer.parseInt(jobStats.get("PIG_STATS_MAP_OUTPUT_RECORDS")));
+ System.out.println("Reduce input records : " + jobStats.get("PIG_STATS_REDUCE_INPUT_RECORDS"));
+ assertEquals(count, Integer.parseInt(jobStats.get("PIG_STATS_REDUCE_INPUT_RECORDS")));
+ System.out.println("Reduce output records : " + jobStats.get("PIG_STATS_REDUCE_OUTPUT_RECORDS"));
+ assertEquals(count, Integer.parseInt(jobStats.get("PIG_STATS_REDUCE_OUTPUT_RECORDS")));
+
+ assertEquals(count, pigStats.getRecordsWritten());
+ assertEquals(filesize, pigStats.getBytesWritten());
+
+ }
+
+ @Test
+ public void testLocal() throws IOException, ExecException {
+ int count = 0;
+ //PrintWriter pw = new PrintWriter(Util.createInputFile(cluster, file));
+ File file = File.createTempFile("data", ".txt");
+ PrintWriter pw = new PrintWriter(new FileOutputStream(file));
+ int [] nos = new int[10];
+ for(int i = 0; i < 10; i++)
+ nos[i] = 0;
+
+ for(int i = 0; i < MAX; i++) {
+ int index = r.nextInt(10);
+ int value = r.nextInt(100);
+ nos[index] += value;
+ pw.println(index + "\t" + value);
+ }
+ pw.close();
+
+ for(int i = 0; i < 10; i++)
+ if(nos[i] > 0)
+ count ++;
+
+ File out = File.createTempFile("output", ".txt");
+ out.delete();
+ PigServer pigServer = new PigServer("local");
+ pigServer.registerQuery("a = load '" + file + "';");
+ pigServer.registerQuery("b = order a by $0;");
+ pigServer.registerQuery("c = group b by $0;");
+ pigServer.registerQuery("d = foreach c generate group, SUM(b.$1);");
+ PigStats pigStats = pigServer.store("d", out.getAbsolutePath()).getStatistics();
+ InputStream is = FileLocalizer.open(FileLocalizer.fullPath(out.getAbsolutePath(), pigServer.getPigContext()), ExecType.MAPREDUCE, pigServer.getPigContext().getDfs());
+ long filesize = 0;
+ while(is.read() != -1) filesize++;
+
+ is.close();
+ out.delete();
+
+ //Map<String, Map<String, String>> stats = pigStats.getPigStats();
+
+ assertEquals(count, pigStats.getRecordsWritten());
+ assertEquals(filesize, pigStats.getBytesWritten());
+
+ }
+
+}
Modified: hadoop/pig/trunk/test/org/apache/pig/test/TestMultiQueryLocal.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/test/org/apache/pig/test/TestMultiQueryLocal.java?rev=773683&r1=773682&r2=773683&view=diff
==============================================================================
--- hadoop/pig/trunk/test/org/apache/pig/test/TestMultiQueryLocal.java (original)
+++ hadoop/pig/trunk/test/org/apache/pig/test/TestMultiQueryLocal.java Mon May 11 20:46:56 2009
@@ -85,7 +85,7 @@
LogicalPlan lp = checkLogicalPlan(1, 2, 9);
// XXX Physical plan has one less node in the local case
- PhysicalPlan pp = checkPhysicalPlan(lp, 1, 2, 10);
+ PhysicalPlan pp = checkPhysicalPlan(lp, 1, 2, 12);
Assert.assertTrue(executePlan(pp));
@@ -142,7 +142,7 @@
public void testMultiQueryWithTwoStores2Execs() {
System.out.println("===== test multi-query with 2 stores (2) =====");
-
+
try {
myPig.setBatchOn();
@@ -185,7 +185,7 @@
LogicalPlan lp = checkLogicalPlan(1, 3, 14);
- PhysicalPlan pp = checkPhysicalPlan(lp, 1, 3, 14);
+ PhysicalPlan pp = checkPhysicalPlan(lp, 1, 3, 17);
Assert.assertTrue(executePlan(pp));
@@ -247,7 +247,7 @@
LogicalPlan lp = checkLogicalPlan(2, 3, 16);
// XXX the total number of ops is one less in the local case
- PhysicalPlan pp = checkPhysicalPlan(lp, 2, 3, 18);
+ PhysicalPlan pp = checkPhysicalPlan(lp, 2, 3, 21);
Assert.assertTrue(executePlan(pp));
@@ -458,7 +458,7 @@
myPig.registerQuery("store c into '/tmp/output5';");
LogicalPlan lp = checkLogicalPlan(1, 3, 14);
- PhysicalPlan pp = checkPhysicalPlan(lp, 1, 3, 16);
+ PhysicalPlan pp = checkPhysicalPlan(lp, 1, 3, 21);
myPig.executeBatch();
myPig.discardBatch();
Modified: hadoop/pig/trunk/test/org/apache/pig/test/TestStore.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/test/org/apache/pig/test/TestStore.java?rev=773683&r1=773682&r2=773683&view=diff
==============================================================================
--- hadoop/pig/trunk/test/org/apache/pig/test/TestStore.java (original)
+++ hadoop/pig/trunk/test/org/apache/pig/test/TestStore.java Mon May 11 20:46:56 2009
@@ -49,6 +49,7 @@
import org.apache.pig.impl.io.FileSpec;
import org.apache.pig.backend.local.executionengine.LocalPigLauncher;
import org.apache.pig.backend.local.executionengine.LocalPOStoreImpl;
+import org.apache.pig.backend.local.executionengine.physicalLayer.counters.POCounter;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.POStatus;
@@ -59,6 +60,7 @@
import org.apache.pig.test.utils.GenPhyOp;
import org.apache.pig.test.utils.GenRandomData;
import org.apache.pig.test.utils.TestHelper;
+import org.apache.pig.tools.pigstats.PigStats;
import org.apache.pig.impl.logicalLayer.LOStore;
import org.apache.pig.impl.logicalLayer.LogicalOperator;
import org.apache.pig.impl.logicalLayer.LogicalPlan;
@@ -79,10 +81,12 @@
PigContext pc;
POProject proj;
PigServer pig;
+ POCounter pcount;
@Before
public void setUp() throws Exception {
st = GenPhyOp.topStoreOp();
+ pcount = new POCounter(new OperatorKey("", (new Random()).nextLong()));
fSpec = new FileSpec("file:/tmp/storeTest.txt",
new FuncSpec(PigStorage.class.getName(), new String[]{":"}));
st.setSFile(fSpec);
@@ -104,11 +108,15 @@
public void tearDown() throws Exception {
}
- private boolean store() throws Exception {
+ private PigStats store() throws Exception {
PhysicalPlan pp = new PhysicalPlan();
pp.add(proj);
pp.add(st);
- pp.connect(proj, st);
+ pp.add(pcount);
+ //pp.connect(proj, st);
+ pp.connect(proj, pcount);
+ pp.connect(pcount, st);
+ pc.setExecType(ExecType.LOCAL);
return new LocalPigLauncher().launchPig(pp, "TestStore", pc);
}
@@ -118,7 +126,7 @@
Tuple t = new DefaultTuple();
t.append(inpDB);
proj.attachInput(t);
- assertTrue(store());
+ assertTrue(store() != null);
int size = 0;
BufferedReader br = new BufferedReader(new FileReader("/tmp/storeTest.txt"));
@@ -145,7 +153,7 @@
Tuple t = new DefaultTuple();
t.append(inpDB);
proj.attachInput(t);
- assertTrue(store());
+ assertTrue(store() != null);
PigStorage ps = new PigStorage(":");
int size = 0;
@@ -178,7 +186,7 @@
Tuple t = new DefaultTuple();
t.append(inpDB);
proj.attachInput(t);
- assertTrue(store());
+ assertTrue(store() != null);
PigStorage ps = new PigStorage(":");
int size = 0;
Modified: hadoop/pig/trunk/test/org/apache/pig/test/Util.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/test/org/apache/pig/test/Util.java?rev=773683&r1=773682&r2=773683&view=diff
==============================================================================
--- hadoop/pig/trunk/test/org/apache/pig/test/Util.java (original)
+++ hadoop/pig/trunk/test/org/apache/pig/test/Util.java Mon May 11 20:46:56 2009
@@ -25,6 +25,7 @@
import java.io.FileOutputStream;
import java.io.FileReader;
import java.io.IOException;
+import java.io.OutputStream;
import java.io.OutputStreamWriter;
import java.io.PrintWriter;
import java.util.ArrayList;
@@ -237,6 +238,28 @@
}
/**
+ * Helper to create a dfs file on the MiniCluster dfs. This returns an
+ * outputstream that can be used in test cases to write data.
+ *
+ * @param cluster
+ * reference to the MiniCluster where the file should be created
+ * @param fileName
+ * pathname of the file to be created
+ * @return OutputStream to write any data to the file created on the
+ * MiniCluster.
+ * @throws IOException
+ */
+ static public OutputStream createInputFile(MiniCluster cluster,
+ String fileName) throws IOException {
+ FileSystem fs = cluster.getFileSystem();
+ if (fs.exists(new Path(fileName))) {
+ throw new IOException("File " + fileName
+ + " already exists on the minicluster");
+ }
+ return fs.create(new Path(fileName));
+ }
+
+ /**
* Helper to remove a dfs file from the minicluster DFS
*
* @param miniCluster reference to the Minicluster where the file should be deleted