You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by ro...@apache.org on 2014/04/11 19:08:46 UTC

svn commit: r1586720 - in /pig/trunk: ./ src/org/apache/pig/tools/pigstats/ src/org/apache/pig/tools/pigstats/mapreduce/

Author: rohini
Date: Fri Apr 11 17:08:46 2014
New Revision: 1586720

URL: http://svn.apache.org/r1586720
Log:
PIG-3884: Move multi store counters to PigStatsUtil from MRPigStatsUtil (rohini)

Modified:
    pig/trunk/CHANGES.txt
    pig/trunk/src/org/apache/pig/tools/pigstats/JobStats.java
    pig/trunk/src/org/apache/pig/tools/pigstats/PigStatsUtil.java
    pig/trunk/src/org/apache/pig/tools/pigstats/mapreduce/MRJobStats.java
    pig/trunk/src/org/apache/pig/tools/pigstats/mapreduce/MRPigStatsUtil.java

Modified: pig/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/pig/trunk/CHANGES.txt?rev=1586720&r1=1586719&r2=1586720&view=diff
==============================================================================
--- pig/trunk/CHANGES.txt (original)
+++ pig/trunk/CHANGES.txt Fri Apr 11 17:08:46 2014
@@ -30,7 +30,7 @@ PIG-2207: Support custom counters for ag
 
 IMPROVEMENTS
 
-PIG-3868: Fix Iterator_1 e2e test on windows (ssvinarchukhorton via rohini)
+PIG-3884: Move multi store counters to PigStatsUtil from MRPigStatsUtil (rohini)
 
 PIG-3591: Refactor POPackage to separate MR specific code from packaging (mwagner via cheolsoo)
 
@@ -105,6 +105,8 @@ PIG-3882: Multiquery off mode execution 
  
 BUG FIXES
 
+PIG-3868: Fix Iterator_1 e2e test on windows (ssvinarchukhorton via rohini)
+
 PIG-3871: Replace org.python.google.* with com.google.* in imports (cheolsoo)
 
 PIG-3858: PigLogger/PigStatusReporter is not set for fetch tasks (lbendig via cheolsoo)

Modified: pig/trunk/src/org/apache/pig/tools/pigstats/JobStats.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/tools/pigstats/JobStats.java?rev=1586720&r1=1586719&r2=1586720&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/tools/pigstats/JobStats.java (original)
+++ pig/trunk/src/org/apache/pig/tools/pigstats/JobStats.java Fri Apr 11 17:08:46 2014
@@ -18,19 +18,27 @@
 
 package org.apache.pig.tools.pigstats;
 
+import java.io.FileNotFoundException;
+import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.mapred.Counters;
+import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.FileBasedOutputSizeReader;
+import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigStatsOutputSizeReader;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStore;
 import org.apache.pig.classification.InterfaceAudience;
 import org.apache.pig.classification.InterfaceStability;
+import org.apache.pig.impl.PigContext;
+import org.apache.pig.impl.logicalLayer.FrontendException;
 import org.apache.pig.newplan.Operator;
 import org.apache.pig.newplan.PlanVisitor;
-import org.apache.pig.impl.logicalLayer.FrontendException;
 import org.apache.pig.tools.pigstats.PigStats.JobGraph;
 
 /**
@@ -41,6 +49,8 @@ import org.apache.pig.tools.pigstats.Pig
 @InterfaceStability.Evolving
 public abstract class JobStats extends Operator {
 
+    private static final Log LOG = LogFactory.getLog(JobStats.class);
+
     public static final String ALIAS = "JobStatistics:alias";
     public static final String ALIAS_LOCATION = "JobStatistics:alias_location";
     public static final String FEATURE = "JobStatistics:feature";
@@ -321,4 +331,37 @@ public abstract class JobStats extends O
     @Deprecated
     abstract public Map<String, Long> getMultiInputCounters();
 
+    /**
+     * Looks up the output size reader from OUTPUT_SIZE_READER_KEY and invokes
+     * it to get the size of output. If OUTPUT_SIZE_READER_KEY is not set,
+     * defaults to FileBasedOutputSizeReader.
+     * @param sto POStore
+     * @param conf configuration
+     */
+    public static long getOutputSize(POStore sto, Configuration conf) {
+        PigStatsOutputSizeReader reader = null;
+        String readerNames = conf.get(
+                PigStatsOutputSizeReader.OUTPUT_SIZE_READER_KEY,
+                FileBasedOutputSizeReader.class.getCanonicalName());
+
+        for (String className : readerNames.split(",")) {
+            reader = (PigStatsOutputSizeReader) PigContext.instantiateFuncFromSpec(className);
+            if (reader.supports(sto)) {
+                LOG.info("using output size reader: " + className);
+                try {
+                    return reader.getOutputSize(sto, conf);
+                } catch (FileNotFoundException e) {
+                    LOG.warn("unable to find the output file", e);
+                    return -1;
+                } catch (IOException e) {
+                    LOG.warn("unable to get byte written of the job", e);
+                    return -1;
+                }
+            }
+        }
+
+        LOG.warn("unable to find an output size reader");
+        return -1;
+    }
+
 }

Modified: pig/trunk/src/org/apache/pig/tools/pigstats/PigStatsUtil.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/tools/pigstats/PigStatsUtil.java?rev=1586720&r1=1586719&r2=1586720&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/tools/pigstats/PigStatsUtil.java (original)
+++ pig/trunk/src/org/apache/pig/tools/pigstats/PigStatsUtil.java Fri Apr 11 17:08:46 2014
@@ -23,8 +23,8 @@ import java.util.Map;
 import java.util.regex.Matcher;
 import java.util.regex.Pattern;
 
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStore;
 import org.apache.pig.backend.hadoop.executionengine.shims.HadoopShims;
-import org.apache.pig.tools.pigstats.PigStats;
 import org.apache.pig.tools.pigstats.mapreduce.SimplePigStats;
 
 /**
@@ -45,31 +45,12 @@ public class PigStatsUtil {
     public static final String HDFS_BYTES_READ
             = "HDFS_BYTES_READ";
 
-    /**
-     * @deprecated use {@link org.apache.pig.tools.pigstats.mapreduce.MRPigStatsUtil#MULTI_INPUT_RECORD_COUNTER} instead.
-     */
-    @Deprecated
     public static final String MULTI_INPUTS_RECORD_COUNTER
             = "Input records from ";
-
-    /**
-     * @deprecated use {@link org.apache.pig.tools.pigstats.mapreduce.MRPigStatsUtil#MULTI_INPUT_COUNTER_GROUP} instead.
-     */
-    @Deprecated
     public static final String MULTI_INPUTS_COUNTER_GROUP
             = "MultiInputCounters";
-
-    /**
-     * @deprecated use {@link org.apache.pig.tools.pigstats.mapreduce.MRPigStatsUtil#MULTI_STORE_RECORD_COUNTER} instead.
-     */
-    @Deprecated
     public static final String MULTI_STORE_RECORD_COUNTER
             = "Output records in ";
-
-    /**
-     * @deprecated use {@link org.apache.pig.tools.pigstats.mapreduce.MRPigStatsUtil#MULTI_STORE_COUNTER_GROUP} instead.
-     */
-    @Deprecated
     public static final String MULTI_STORE_COUNTER_GROUP
             = "MultiStoreCounters";
 
@@ -149,4 +130,56 @@ public class PigStatsUtil {
         PigStats.start(new EmbeddedPigStats(statsMap));
     }
 
+    /**
+     * Returns the counter name for the given input file name
+     *
+     * @param fname the input file name
+     * @return the counter name
+     */
+    public static String getMultiInputsCounterName(String fname, int index) {
+        String shortName = getShortName(fname);
+        return (shortName == null) ? null
+                : MULTI_INPUTS_RECORD_COUNTER + "_" + index + "_" + shortName;
+    }
+
+    /**
+     * Returns the counter name for the given {@link POStore}
+     *
+     * @param store the POStore
+     * @return the counter name
+     */
+    public static String getMultiStoreCounterName(POStore store) {
+        String shortName = getShortName(store.getSFile().getFileName());
+        return (shortName == null) ? null
+                : MULTI_STORE_RECORD_COUNTER + "_" + store.getIndex() + "_" + shortName;
+    }
+
+    // Restrict total string size of a counter name to 64 characters.
+    // Leave 24 characters for prefix string.
+    private static final int COUNTER_NAME_LIMIT = 40;
+    private static final String SEPARATOR = "/";
+    private static final String SEMICOLON = ";";
+
+    private static String getShortName(String uri) {
+        int scolon = uri.indexOf(SEMICOLON);
+        int slash;
+        if (scolon!=-1) {
+            slash = uri.lastIndexOf(SEPARATOR, scolon);
+        } else {
+            slash = uri.lastIndexOf(SEPARATOR);
+        }
+        String shortName = null;
+        if (scolon==-1) {
+            shortName = uri.substring(slash+1);
+        }
+        if (slash < scolon) {
+            shortName = uri.substring(slash+1, scolon);
+        }
+        if (shortName != null && shortName.length() > COUNTER_NAME_LIMIT) {
+            shortName = shortName.substring(shortName.length()
+                    - COUNTER_NAME_LIMIT);
+        }
+        return shortName;
+    }
+
 }

Modified: pig/trunk/src/org/apache/pig/tools/pigstats/mapreduce/MRJobStats.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/tools/pigstats/mapreduce/MRJobStats.java?rev=1586720&r1=1586719&r2=1586720&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/tools/pigstats/mapreduce/MRJobStats.java (original)
+++ pig/trunk/src/org/apache/pig/tools/pigstats/mapreduce/MRJobStats.java Fri Apr 11 17:08:46 2014
@@ -18,7 +18,6 @@
 
 package org.apache.pig.tools.pigstats.mapreduce;
 
-import java.io.FileNotFoundException;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collections;
@@ -31,24 +30,21 @@ import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.mapred.Counters;
+import org.apache.hadoop.mapred.Counters.Counter;
 import org.apache.hadoop.mapred.JobClient;
 import org.apache.hadoop.mapred.JobID;
 import org.apache.hadoop.mapred.RunningJob;
 import org.apache.hadoop.mapred.TaskReport;
-import org.apache.hadoop.mapred.Counters.Counter;
 import org.apache.pig.PigCounters;
-import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.FileBasedOutputSizeReader;
 import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.JobControlCompiler;
 import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MapReduceOper;
-import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigStatsOutputSizeReader;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStore;
 import org.apache.pig.classification.InterfaceAudience;
 import org.apache.pig.classification.InterfaceStability;
-import org.apache.pig.newplan.PlanVisitor;
-import org.apache.pig.impl.PigContext;
 import org.apache.pig.impl.io.FileSpec;
 import org.apache.pig.impl.logicalLayer.FrontendException;
 import org.apache.pig.impl.util.ObjectSerializer;
+import org.apache.pig.newplan.PlanVisitor;
 import org.apache.pig.tools.pigstats.InputStats;
 import org.apache.pig.tools.pigstats.JobStats;
 import org.apache.pig.tools.pigstats.OutputStats;
@@ -118,58 +114,80 @@ public final class MRJobStats extends Jo
 
     private Counters counters = null;
 
+    @Override
     public String getJobId() {
         return (jobId == null) ? null : jobId.toString();
     }
 
+    @Override
     public int getNumberMaps() { return numberMaps; }
 
+    @Override
     public int getNumberReduces() { return numberReduces; }
 
+    @Override
     public long getMaxMapTime() { return maxMapTime; }
 
+    @Override
     public long getMinMapTime() { return minMapTime; }
 
+    @Override
     public long getAvgMapTime() { return avgMapTime; }
 
+    @Override
     public long getMaxReduceTime() { return maxReduceTime; }
 
+    @Override
     public long getMinReduceTime() { return minReduceTime; }
 
+    @Override
     public long getAvgReduceTime() { return avgReduceTime; }
 
+    @Override
     public long getMapInputRecords() { return mapInputRecords; }
 
+    @Override
     public long getMapOutputRecords() { return mapOutputRecords; }
 
+    @Override
     public long getReduceInputRecords() { return reduceInputRecords; }
 
+    @Override
     public long getReduceOutputRecords() { return reduceOutputRecords; }
 
+    @Override
     public long getSMMSpillCount() { return spillCount; }
 
+    @Override
     public long getProactiveSpillCountObjects() { return activeSpillCountObj; }
 
+    @Override
     public long getProactiveSpillCountRecs() { return activeSpillCountRecs; }
 
+    @Override
     public Counters getHadoopCounters() { return counters; }
 
+    @Override
     public Map<String, Long> getMultiStoreCounters() {
         return Collections.unmodifiableMap(multiStoreCounters);
     }
 
+    @Override
     public Map<String, Long> getMultiInputCounters() {
         return Collections.unmodifiableMap(multiInputCounters);
     }
 
+    @Override
     public String getAlias() {
         return (String)getAnnotation(ALIAS);
     }
 
+    @Override
     public String getAliasLocation() {
         return (String)getAnnotation(ALIAS_LOCATION);
     }
 
+    @Override
     public String getFeature() {
         return (String)getAnnotation(FEATURE);
     }
@@ -219,6 +237,7 @@ public final class MRJobStats extends Jo
         medianReduceTime = median;
     }
 
+    @Override
     public String getDisplayString(boolean local) {
         StringBuilder sb = new StringBuilder();
         String id = (jobId == null) ? "N/A" : jobId.toString();
@@ -420,39 +439,6 @@ public final class MRJobStats extends Jo
         }
     }
 
-    /**
-     * Looks up the output size reader from OUTPUT_SIZE_READER_KEY and invokes
-     * it to get the size of output. If OUTPUT_SIZE_READER_KEY is not set,
-     * defaults to FileBasedOutputSizeReader.
-     * @param sto POStore
-     * @param conf configuration
-     */
-    static long getOutputSize(POStore sto, Configuration conf) {
-        PigStatsOutputSizeReader reader = null;
-        String readerNames = conf.get(
-                PigStatsOutputSizeReader.OUTPUT_SIZE_READER_KEY,
-                FileBasedOutputSizeReader.class.getCanonicalName());
-
-        for (String className : readerNames.split(",")) {
-            reader = (PigStatsOutputSizeReader) PigContext.instantiateFuncFromSpec(className);
-            if (reader.supports(sto)) {
-                LOG.info("using output size reader: " + className);
-                try {
-                    return reader.getOutputSize(sto, conf);
-                } catch (FileNotFoundException e) {
-                    LOG.warn("unable to find the output file", e);
-                    return -1;
-                } catch (IOException e) {
-                    LOG.warn("unable to get byte written of the job", e);
-                    return -1;
-                }
-            }
-        }
-
-        LOG.warn("unable to find an output size reader");
-        return -1;
-    }
-
     private void addOneOutputStats(POStore sto) {
         long records = -1;
         if (sto.isMultiStore()) {

Modified: pig/trunk/src/org/apache/pig/tools/pigstats/mapreduce/MRPigStatsUtil.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/tools/pigstats/mapreduce/MRPigStatsUtil.java?rev=1586720&r1=1586719&r2=1586720&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/tools/pigstats/mapreduce/MRPigStatsUtil.java (original)
+++ pig/trunk/src/org/apache/pig/tools/pigstats/mapreduce/MRPigStatsUtil.java Fri Apr 11 17:08:46 2014
@@ -20,6 +20,7 @@ package org.apache.pig.tools.pigstats.ma
 
 import java.io.IOException;
 import java.util.Map;
+
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.mapred.Counters;
@@ -32,16 +33,13 @@ import org.apache.pig.backend.hadoop.exe
 import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MapReduceOper;
 import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.NativeMapReduceOper;
 import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.plans.MROperPlan;
-import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStore;
 import org.apache.pig.backend.hadoop.executionengine.shims.HadoopShims;
 import org.apache.pig.classification.InterfaceAudience.Private;
 import org.apache.pig.impl.PigContext;
-import org.apache.pig.tools.pigstats.PigStats.JobGraph;
-import org.apache.pig.tools.pigstats.mapreduce.SimplePigStats;
-import org.apache.pig.tools.pigstats.mapreduce.MRScriptState;
+import org.apache.pig.tools.pigstats.JobStats;
 import org.apache.pig.tools.pigstats.PigStats;
+import org.apache.pig.tools.pigstats.PigStats.JobGraph;
 import org.apache.pig.tools.pigstats.PigStatsUtil;
-import org.apache.pig.tools.pigstats.JobStats;
 
 
 
@@ -50,37 +48,13 @@ import org.apache.pig.tools.pigstats.Job
  */
 public class MRPigStatsUtil extends PigStatsUtil {
 
-    public static final String MULTI_STORE_RECORD_COUNTER
-            = "Output records in ";
-    public static final String MULTI_STORE_COUNTER_GROUP
-            = "MultiStoreCounters";
     public static final String TASK_COUNTER_GROUP
             = "org.apache.hadoop.mapred.Task$Counter";
     public static final String FS_COUNTER_GROUP
             = HadoopShims.getFsCounterGroupName();
-    public static final String MAP_INPUT_RECORDS
-            = "MAP_INPUT_RECORDS";
-    public static final String MAP_OUTPUT_RECORDS
-            = "MAP_OUTPUT_RECORDS";
-    public static final String REDUCE_INPUT_RECORDS
-            = "REDUCE_INPUT_RECORDS";
-    public static final String REDUCE_OUTPUT_RECORDS
-            = "REDUCE_OUTPUT_RECORDS";
-    public static final String HDFS_BYTES_WRITTEN
-            = "HDFS_BYTES_WRITTEN";
-    public static final String HDFS_BYTES_READ
-            = "HDFS_BYTES_READ";
-    public static final String MULTI_INPUTS_RECORD_COUNTER
-            = "Input records from ";
-    public static final String MULTI_INPUTS_COUNTER_GROUP
-            = "MultiInputCounters";
 
     private static final Log LOG = LogFactory.getLog(MRPigStatsUtil.class);
 
-    // Restrict total string size of a counter name to 64 characters.
-    // Leave 24 characters for prefix string.
-    private static final int COUNTER_NAME_LIMIT = 40;
-
     /**
      * Returns the count for the given counter name in the counter group
      * 'MultiStoreCounters'
@@ -107,55 +81,6 @@ public class MRPigStatsUtil extends PigS
     }
 
     /**
-     * Returns the counter name for the given {@link POStore}
-     *
-     * @param store the POStore
-     * @return the counter name
-     */
-    public static String getMultiStoreCounterName(POStore store) {
-        String shortName = getShortName(store.getSFile().getFileName());
-        return (shortName == null) ? null
-                : MULTI_STORE_RECORD_COUNTER + "_" + store.getIndex() + "_" + shortName;
-    }
-
-    /**
-     * Returns the counter name for the given input file name
-     *
-     * @param fname the input file name
-     * @return the counter name
-     */
-    public static String getMultiInputsCounterName(String fname, int index) {
-        String shortName = getShortName(fname);
-        return (shortName == null) ? null
-                : MULTI_INPUTS_RECORD_COUNTER + "_" + index + "_" + shortName;
-    }
-
-    private static final String SEPARATOR = "/";
-    private static final String SEMICOLON = ";";
-
-    private static String getShortName(String uri) {
-        int scolon = uri.indexOf(SEMICOLON);
-        int slash;
-        if (scolon!=-1) {
-            slash = uri.lastIndexOf(SEPARATOR, scolon);
-        } else {
-            slash = uri.lastIndexOf(SEPARATOR);
-        }
-        String shortName = null;
-        if (scolon==-1) {
-            shortName = uri.substring(slash+1);
-        }
-        if (slash < scolon) {
-            shortName = uri.substring(slash+1, scolon);
-        }
-        if (shortName != null && shortName.length() > COUNTER_NAME_LIMIT) {
-            shortName = shortName.substring(shortName.length()
-                    - COUNTER_NAME_LIMIT);
-        }
-        return shortName;
-    }
-
-    /**
      * Starts collecting statistics for the given MR plan
      *
      * @param pc the Pig context