You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by da...@apache.org on 2014/02/03 20:24:34 UTC
svn commit: r1564016 - in /pig/trunk: ./ conf/ ivy/ src/org/apache/pig/
src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/
test/org/apache/pig/test/
Author: daijy
Date: Mon Feb 3 19:24:34 2014
New Revision: 1564016
URL: http://svn.apache.org/r1564016
Log:
PIG-3299: Provide support for LazyOutputFormat to avoid creating empty files
Modified:
pig/trunk/CHANGES.txt
pig/trunk/conf/pig.properties
pig/trunk/ivy/libraries.properties
pig/trunk/src/org/apache/pig/PigConfiguration.java
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java
pig/trunk/test/org/apache/pig/test/TestStore.java
Modified: pig/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/pig/trunk/CHANGES.txt?rev=1564016&r1=1564015&r2=1564016&view=diff
==============================================================================
--- pig/trunk/CHANGES.txt (original)
+++ pig/trunk/CHANGES.txt Mon Feb 3 19:24:34 2014
@@ -30,6 +30,8 @@ PIG-2207: Support custom counters for ag
IMPROVEMENTS
+PIG-3299: Provide support for LazyOutputFormat to avoid creating empty files (lbendig via daijy)
+
PIG-3642: Direct HDFS access for small jobs (fetch) (lbendig via cheolsoo)
PIG-3730: Performance issue in SelfSpillBag (rajesh.balamohan via rohini)
Modified: pig/trunk/conf/pig.properties
URL: http://svn.apache.org/viewvc/pig/trunk/conf/pig.properties?rev=1564016&r1=1564015&r2=1564016&view=diff
==============================================================================
--- pig/trunk/conf/pig.properties (original)
+++ pig/trunk/conf/pig.properties Mon Feb 3 19:24:34 2014
@@ -243,3 +243,7 @@ pig.location.check.strict=false
# Set value in long as a threshold number of bytes to convert
# jobs with smaller input data size to run in local mode
# pig.auto.local.input.maxbytes=100000000
+
+# When enabled, jobs won't create empty part files if no output is written. In this case
+# PigOutputFormat will be wrapped with org.apache.hadoop.mapreduce.lib.output.LazyOutputFormat.
+# pig.output.lazy=true
Modified: pig/trunk/ivy/libraries.properties
URL: http://svn.apache.org/viewvc/pig/trunk/ivy/libraries.properties?rev=1564016&r1=1564015&r2=1564016&view=diff
==============================================================================
--- pig/trunk/ivy/libraries.properties (original)
+++ pig/trunk/ivy/libraries.properties Mon Feb 3 19:24:34 2014
@@ -37,8 +37,8 @@ jasper.version=6.1.14
groovy.version=1.8.6
guava.version=11.0
jersey-core.version=1.8
-hadoop-core.version=1.0.0
-hadoop-test.version=1.0.0
+hadoop-core.version=1.0.4
+hadoop-test.version=1.0.4
hadoop-common.version=2.0.3-alpha
hadoop-hdfs.version=2.0.3-alpha
hadoop-mapreduce.version=2.0.3-alpha
Modified: pig/trunk/src/org/apache/pig/PigConfiguration.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/PigConfiguration.java?rev=1564016&r1=1564015&r2=1564016&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/PigConfiguration.java (original)
+++ pig/trunk/src/org/apache/pig/PigConfiguration.java Mon Feb 3 19:24:34 2014
@@ -154,5 +154,12 @@ public class PigConfiguration {
*/
public static final String OPT_FETCH = "opt.fetch";
+
+ /**
+ * This key is used to define whether PigOutputFormat will be wrapped with LazyOutputFormat
+ * so that jobs won't write empty part files if no output is generated
+ */
+ public static final String PIG_OUTPUT_LAZY = "pig.output.lazy";
+
}
Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java?rev=1564016&r1=1564015&r2=1564016&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java Mon Feb 3 19:24:34 2014
@@ -21,6 +21,7 @@ import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.OutputStream;
+import java.lang.reflect.Method;
import java.net.URI;
import java.net.URISyntaxException;
import java.net.URL;
@@ -668,8 +669,26 @@ public class JobControlCompiler{
sFunc.setStoreLocation(st.getSFile().getFileName(), nwJob);
}
- // the OutputFormat we report to Hadoop is always PigOutputFormat
- nwJob.setOutputFormatClass(PigOutputFormat.class);
+ // the OutputFormat we report to Hadoop is always PigOutputFormat which
+ // can be wrapped with LazyOutputFormat provided if it is supported by
+ // the Hadoop version and PigConfiguration.PIG_OUTPUT_LAZY is set
+ if ("true".equalsIgnoreCase(conf.get(PigConfiguration.PIG_OUTPUT_LAZY))) {
+ try {
+ Class<?> clazz = PigContext.resolveClassName(
+ "org.apache.hadoop.mapreduce.lib.output.LazyOutputFormat");
+ Method method = clazz.getMethod("setOutputFormatClass", nwJob.getClass(),
+ Class.class);
+ method.invoke(null, nwJob, PigOutputFormat.class);
+ }
+ catch (Exception e) {
+ nwJob.setOutputFormatClass(PigOutputFormat.class);
+ log.warn(PigConfiguration.PIG_OUTPUT_LAZY
+ + " is set but LazyOutputFormat couldn't be loaded. Default PigOutputFormat will be used");
+ }
+ }
+ else {
+ nwJob.setOutputFormatClass(PigOutputFormat.class);
+ }
if (mapStores.size() + reduceStores.size() == 1) { // single store case
log.info("Setting up single store job");
@@ -691,8 +710,6 @@ public class JobControlCompiler{
else if (mapStores.size() + reduceStores.size() > 0) { // multi store case
log.info("Setting up multi store job");
MapRedUtil.setupStreamingDirsConfMulti(pigContext, conf);
-
- nwJob.setOutputFormatClass(PigOutputFormat.class);
boolean disableCounter = conf.getBoolean("pig.disable.counter", false);
if (disableCounter) {
Modified: pig/trunk/test/org/apache/pig/test/TestStore.java
URL: http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/test/TestStore.java?rev=1564016&r1=1564015&r2=1564016&view=diff
==============================================================================
--- pig/trunk/test/org/apache/pig/test/TestStore.java (original)
+++ pig/trunk/test/org/apache/pig/test/TestStore.java Mon Feb 3 19:24:34 2014
@@ -19,6 +19,7 @@ package org.apache.pig.test;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.assertFalse;
import static org.junit.Assert.fail;
import java.io.BufferedReader;
@@ -45,6 +46,7 @@ import org.apache.hadoop.mapreduce.TaskA
import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter;
import org.apache.pig.EvalFunc;
import org.apache.pig.ExecType;
+import org.apache.pig.PigConfiguration;
import org.apache.pig.PigException;
import org.apache.pig.PigServer;
import org.apache.pig.ResourceSchema;
@@ -84,6 +86,7 @@ import org.apache.pig.test.utils.TestHel
import org.joda.time.DateTimeZone;
import org.junit.After;
import org.junit.AfterClass;
+import org.junit.Assume;
import org.junit.Before;
import org.junit.Test;
@@ -691,6 +694,86 @@ public class TestStore {
Util.deleteFile(ps.getPigContext(), TESTDIR);
}
}
+
+ /**
+ * Test whether "part-m-00000" file is created on empty output when
+ * {@link PigConfiguration#PIG_OUTPUT_LAZY} is set and if LazyOutputFormat is
+ * supported by Hadoop.
+ * The test covers multi store and single store case in local and mapreduce mode
+ *
+ * @throws IOException
+ */
+ @Test
+ public void testEmptyPartFileCreation() throws IOException {
+
+ boolean isLazyOutputPresent = true;
+ try {
+ Class<?> clazz = PigContext
+ .resolveClassName("org.apache.hadoop.mapreduce.lib.output.LazyOutputFormat");
+ clazz.getMethod("setOutputFormatClass", Job.class, Class.class);
+ }
+ catch (Exception e) {
+ isLazyOutputPresent = false;
+ }
+
+ //skip test if LazyOutputFormat is not supported (<= Hadoop 1.0.0)
+ Assume.assumeTrue("LazyOutputFormat couldn't be loaded, test is skipped", isLazyOutputPresent);
+
+ PigServer ps = null;
+
+ try {
+ ExecType[] modes = new ExecType[] { ExecType.LOCAL, ExecType.MAPREDUCE};
+ String[] inputData = new String[]{"hello\tworld", "hi\tworld", "bye\tworld"};
+
+ String multiStoreScript = "a = load '"+ inputFileName + "';" +
+ "b = filter a by $0 == 'hey';" +
+ "c = filter a by $1 == 'globe';" +
+ "d = limit a 2;" +
+ "e = foreach d generate *, 'x';" +
+ "f = filter e by $3 == 'y';" +
+ "store b into '" + outputFileName + "_1';" +
+ "store c into '" + outputFileName + "_2';" +
+ "store f into '" + outputFileName + "_3';";
+
+ String singleStoreScript = "a = load '"+ inputFileName + "';" +
+ "b = filter a by $0 == 'hey';" +
+ "store b into '" + outputFileName + "_1';" ;
+
+ for (ExecType execType : modes) {
+ for(boolean isMultiStore: new boolean[] { true, false}) {
+ String script = (isMultiStore ? multiStoreScript :
+ singleStoreScript);
+ // since we will be switching between map red and local modes
+ // we will need to make sure filelocalizer is reset before each
+ // run.
+ FileLocalizer.setInitialized(false);
+ if(execType == ExecType.MAPREDUCE) {
+ ps = new PigServer(ExecType.MAPREDUCE,
+ cluster.getProperties());
+ } else {
+ Properties props = new Properties();
+ props.setProperty(MapRedUtil.FILE_SYSTEM_NAME, "file:///");
+ ps = new PigServer(ExecType.LOCAL, props);
+ }
+ ps.getPigContext().getProperties().setProperty(
+ PigConfiguration.PIG_OUTPUT_LAZY, "true");
+ Util.deleteFile(ps.getPigContext(), TESTDIR);
+ ps.setBatchOn();
+ Util.createInputFile(ps.getPigContext(),
+ inputFileName, inputData);
+ Util.registerMultiLineQuery(ps, script);
+ ps.executeBatch();
+ for(int i = 1; i <= (isMultiStore ? 3 : 1); i++) {
+ String output = "part-m-00000";
+ assertFalse("For an empty output part-m-00000 should not exist in " +
+ execType + " mode", Util.exists(ps.getPigContext(), output));
+ }
+ }
+ }
+ } finally {
+ Util.deleteFile(ps.getPigContext(), TESTDIR);
+ }
+ }
// A UDF which always throws an Exception so that the job can fail
public static class FailUDF extends EvalFunc<String> {