You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by da...@apache.org on 2014/02/03 20:24:34 UTC

svn commit: r1564016 - in /pig/trunk: ./ conf/ ivy/ src/org/apache/pig/ src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/ test/org/apache/pig/test/

Author: daijy
Date: Mon Feb  3 19:24:34 2014
New Revision: 1564016

URL: http://svn.apache.org/r1564016
Log:
PIG-3299: Provide support for LazyOutputFormat to avoid creating empty files

Modified:
    pig/trunk/CHANGES.txt
    pig/trunk/conf/pig.properties
    pig/trunk/ivy/libraries.properties
    pig/trunk/src/org/apache/pig/PigConfiguration.java
    pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java
    pig/trunk/test/org/apache/pig/test/TestStore.java

Modified: pig/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/pig/trunk/CHANGES.txt?rev=1564016&r1=1564015&r2=1564016&view=diff
==============================================================================
--- pig/trunk/CHANGES.txt (original)
+++ pig/trunk/CHANGES.txt Mon Feb  3 19:24:34 2014
@@ -30,6 +30,8 @@ PIG-2207: Support custom counters for ag
 
 IMPROVEMENTS
 
+PIG-3299: Provide support for LazyOutputFormat to avoid creating empty files (lbendig via daijy)
+
 PIG-3642: Direct HDFS access for small jobs (fetch) (lbendig via cheolsoo)
 
 PIG-3730: Performance issue in SelfSpillBag (rajesh.balamohan via rohini)

Modified: pig/trunk/conf/pig.properties
URL: http://svn.apache.org/viewvc/pig/trunk/conf/pig.properties?rev=1564016&r1=1564015&r2=1564016&view=diff
==============================================================================
--- pig/trunk/conf/pig.properties (original)
+++ pig/trunk/conf/pig.properties Mon Feb  3 19:24:34 2014
@@ -243,3 +243,7 @@ pig.location.check.strict=false
 # Set value in long as a threshold number of bytes to convert
 # jobs with smaller input data size to run in local mode
 # pig.auto.local.input.maxbytes=100000000
+
+# When enabled, jobs won't create empty part files if no output is written. In this case
+# PigOutputFormat will be wrapped with org.apache.hadoop.mapreduce.lib.output.LazyOutputFormat.
+# pig.output.lazy=true

Modified: pig/trunk/ivy/libraries.properties
URL: http://svn.apache.org/viewvc/pig/trunk/ivy/libraries.properties?rev=1564016&r1=1564015&r2=1564016&view=diff
==============================================================================
--- pig/trunk/ivy/libraries.properties (original)
+++ pig/trunk/ivy/libraries.properties Mon Feb  3 19:24:34 2014
@@ -37,8 +37,8 @@ jasper.version=6.1.14
 groovy.version=1.8.6
 guava.version=11.0
 jersey-core.version=1.8
-hadoop-core.version=1.0.0
-hadoop-test.version=1.0.0
+hadoop-core.version=1.0.4
+hadoop-test.version=1.0.4
 hadoop-common.version=2.0.3-alpha
 hadoop-hdfs.version=2.0.3-alpha
 hadoop-mapreduce.version=2.0.3-alpha

Modified: pig/trunk/src/org/apache/pig/PigConfiguration.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/PigConfiguration.java?rev=1564016&r1=1564015&r2=1564016&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/PigConfiguration.java (original)
+++ pig/trunk/src/org/apache/pig/PigConfiguration.java Mon Feb  3 19:24:34 2014
@@ -154,5 +154,12 @@ public class PigConfiguration {
      */
     public static final String OPT_FETCH = "opt.fetch";
     
+
+    /**
+     * This key is used to define whether PigOutputFormat will be wrapped with LazyOutputFormat
+     * so that jobs won't write empty part files if no output is generated
+     */
+    public static final String PIG_OUTPUT_LAZY = "pig.output.lazy";
+
 }
 

Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java?rev=1564016&r1=1564015&r2=1564016&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java Mon Feb  3 19:24:34 2014
@@ -21,6 +21,7 @@ import java.io.File;
 import java.io.FileOutputStream;
 import java.io.IOException;
 import java.io.OutputStream;
+import java.lang.reflect.Method;
 import java.net.URI;
 import java.net.URISyntaxException;
 import java.net.URL;
@@ -668,8 +669,26 @@ public class JobControlCompiler{
                 sFunc.setStoreLocation(st.getSFile().getFileName(), nwJob);
             }
 
-            // the OutputFormat we report to Hadoop is always PigOutputFormat
-            nwJob.setOutputFormatClass(PigOutputFormat.class);
+            // the OutputFormat we report to Hadoop is always PigOutputFormat which
+            // can be wrapped with LazyOutputFormat provided if it is supported by
+            // the Hadoop version and PigConfiguration.PIG_OUTPUT_LAZY is set 
+            if ("true".equalsIgnoreCase(conf.get(PigConfiguration.PIG_OUTPUT_LAZY))) {
+                try {
+                    Class<?> clazz = PigContext.resolveClassName(
+                            "org.apache.hadoop.mapreduce.lib.output.LazyOutputFormat");
+                    Method method = clazz.getMethod("setOutputFormatClass", nwJob.getClass(),
+                            Class.class);
+                    method.invoke(null, nwJob, PigOutputFormat.class);
+                }
+                catch (Exception e) {
+                    nwJob.setOutputFormatClass(PigOutputFormat.class);
+                    log.warn(PigConfiguration.PIG_OUTPUT_LAZY
+                            + " is set but LazyOutputFormat couldn't be loaded. Default PigOutputFormat will be used");
+                }
+            }
+            else {
+                nwJob.setOutputFormatClass(PigOutputFormat.class);
+            }
 
             if (mapStores.size() + reduceStores.size() == 1) { // single store case
                 log.info("Setting up single store job");
@@ -691,8 +710,6 @@ public class JobControlCompiler{
             else if (mapStores.size() + reduceStores.size() > 0) { // multi store case
                 log.info("Setting up multi store job");
                 MapRedUtil.setupStreamingDirsConfMulti(pigContext, conf);
-                
-                nwJob.setOutputFormatClass(PigOutputFormat.class);
 
                 boolean disableCounter = conf.getBoolean("pig.disable.counter", false);
                 if (disableCounter) {

Modified: pig/trunk/test/org/apache/pig/test/TestStore.java
URL: http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/test/TestStore.java?rev=1564016&r1=1564015&r2=1564016&view=diff
==============================================================================
--- pig/trunk/test/org/apache/pig/test/TestStore.java (original)
+++ pig/trunk/test/org/apache/pig/test/TestStore.java Mon Feb  3 19:24:34 2014
@@ -19,6 +19,7 @@ package org.apache.pig.test;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.fail;
 
 import java.io.BufferedReader;
@@ -45,6 +46,7 @@ import org.apache.hadoop.mapreduce.TaskA
 import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter;
 import org.apache.pig.EvalFunc;
 import org.apache.pig.ExecType;
+import org.apache.pig.PigConfiguration;
 import org.apache.pig.PigException;
 import org.apache.pig.PigServer;
 import org.apache.pig.ResourceSchema;
@@ -84,6 +86,7 @@ import org.apache.pig.test.utils.TestHel
 import org.joda.time.DateTimeZone;
 import org.junit.After;
 import org.junit.AfterClass;
+import org.junit.Assume;
 import org.junit.Before;
 import org.junit.Test;
 
@@ -691,6 +694,86 @@ public class TestStore {
             Util.deleteFile(ps.getPigContext(), TESTDIR);
         }
     }
+    
+    /**
+     * Test whether "part-m-00000" file is created on empty output when 
+     * {@link PigConfiguration#PIG_OUTPUT_LAZY} is set and if LazyOutputFormat is
+     * supported by Hadoop.
+     * The test covers multi store and single store case in local and mapreduce mode
+     *
+     * @throws IOException
+     */
+    @Test
+    public void testEmptyPartFileCreation() throws IOException {
+        
+        boolean isLazyOutputPresent = true;
+        try {
+            Class<?> clazz = PigContext
+                    .resolveClassName("org.apache.hadoop.mapreduce.lib.output.LazyOutputFormat");
+            clazz.getMethod("setOutputFormatClass", Job.class, Class.class);
+        }
+        catch (Exception e) {
+            isLazyOutputPresent = false;
+        }
+        
+        //skip test if LazyOutputFormat is not supported (<= Hadoop 1.0.0)
+        Assume.assumeTrue("LazyOutputFormat couldn't be loaded, test is skipped", isLazyOutputPresent);
+        
+        PigServer ps = null;
+
+        try {
+            ExecType[] modes = new ExecType[] { ExecType.LOCAL, ExecType.MAPREDUCE};
+            String[] inputData = new String[]{"hello\tworld", "hi\tworld", "bye\tworld"};
+
+            String multiStoreScript = "a = load '"+ inputFileName + "';" +
+                    "b = filter a by $0 == 'hey';" +
+                    "c = filter a by $1 == 'globe';" +
+                    "d = limit a 2;" +
+                    "e = foreach d generate *, 'x';" +
+                    "f = filter e by $3 == 'y';" +
+                    "store b into '" + outputFileName + "_1';" +
+                    "store c into '" + outputFileName + "_2';" +
+                    "store f into '" + outputFileName + "_3';";
+
+            String singleStoreScript =  "a = load '"+ inputFileName + "';" +
+                    "b = filter a by $0 == 'hey';" +
+                    "store b into '" + outputFileName + "_1';" ;
+
+            for (ExecType execType : modes) {
+                for(boolean isMultiStore: new boolean[] { true, false}) {
+                    String script = (isMultiStore ? multiStoreScript :
+                        singleStoreScript);
+                    // since we will be switching between map red and local modes
+                    // we will need to make sure filelocalizer is reset before each
+                    // run.
+                    FileLocalizer.setInitialized(false);
+                    if(execType == ExecType.MAPREDUCE) {
+                        ps = new PigServer(ExecType.MAPREDUCE,
+                                cluster.getProperties());
+                    } else {
+                        Properties props = new Properties();
+                        props.setProperty(MapRedUtil.FILE_SYSTEM_NAME, "file:///");
+                        ps = new PigServer(ExecType.LOCAL, props);
+                    }
+                    ps.getPigContext().getProperties().setProperty(
+                            PigConfiguration.PIG_OUTPUT_LAZY, "true");
+                    Util.deleteFile(ps.getPigContext(), TESTDIR);
+                    ps.setBatchOn();
+                    Util.createInputFile(ps.getPigContext(),
+                            inputFileName, inputData);
+                    Util.registerMultiLineQuery(ps, script);
+                    ps.executeBatch();
+                    for(int i = 1; i <= (isMultiStore ? 3 : 1); i++) {
+                        String output = "part-m-00000";
+                        assertFalse("For an empty output part-m-00000 should not exist in " +
+                                execType + " mode", Util.exists(ps.getPigContext(), output));
+                    }
+                }
+            }
+        } finally {
+            Util.deleteFile(ps.getPigContext(), TESTDIR);
+        }
+    }
 
     // A UDF which always throws an Exception so that the job can fail
     public static class FailUDF extends EvalFunc<String> {