You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by da...@apache.org on 2012/02/02 03:17:19 UTC
svn commit: r1239436 - in /pig/trunk: CHANGES.txt
src/org/apache/pig/Main.java src/org/apache/pig/impl/util/Utils.java
test/org/apache/pig/pigunit/pig/PigServer.java
test/org/apache/pig/test/pigunit/TestPigTest.java
Author: daijy
Date: Thu Feb 2 02:17:18 2012
New Revision: 1239436
URL: http://svn.apache.org/viewvc?rev=1239436&view=rev
Log:
PIG-2456: Pig should have a pigrc to specify default script cache
Modified:
pig/trunk/CHANGES.txt
pig/trunk/src/org/apache/pig/Main.java
pig/trunk/src/org/apache/pig/impl/util/Utils.java
pig/trunk/test/org/apache/pig/pigunit/pig/PigServer.java
pig/trunk/test/org/apache/pig/test/pigunit/TestPigTest.java
Modified: pig/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/pig/trunk/CHANGES.txt?rev=1239436&r1=1239435&r2=1239436&view=diff
==============================================================================
--- pig/trunk/CHANGES.txt (original)
+++ pig/trunk/CHANGES.txt Thu Feb 2 02:17:18 2012
@@ -24,6 +24,8 @@ INCOMPATIBLE CHANGES
IMPROVEMENTS
+PIG-2456: Pig should have a pigrc to specify default script cache (prkommireddi via daijy)
+
PIG-2496: Cache resolved classes in PigContext (dvryaboy)
PIG-2482: Integrate HCat DDL command into Pig (daijy)
Modified: pig/trunk/src/org/apache/pig/Main.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/Main.java?rev=1239436&r1=1239435&r2=1239436&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/Main.java (original)
+++ pig/trunk/src/org/apache/pig/Main.java Thu Feb 2 02:17:18 2012
@@ -22,12 +22,14 @@ import java.io.BufferedReader;
import java.io.BufferedWriter;
import java.io.File;
import java.io.FileInputStream;
+import java.io.FileNotFoundException;
import java.io.FileReader;
import java.io.FileWriter;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.io.OutputStreamWriter;
+import java.io.SequenceInputStream;
import java.io.StringReader;
import java.io.StringWriter;
import java.text.ParseException;
@@ -66,6 +68,7 @@ import org.apache.pig.impl.util.LogUtils
import org.apache.pig.impl.util.ObjectSerializer;
import org.apache.pig.impl.util.PropertiesUtil;
import org.apache.pig.impl.util.UDFContext;
+import org.apache.pig.impl.util.Utils;
import org.apache.pig.parser.DryRunGruntParser;
import org.apache.pig.scripting.ScriptEngine;
import org.apache.pig.scripting.ScriptEngine.SupportedScriptLang;
@@ -360,7 +363,6 @@ static int run(String args[], PigProgres
scriptState.registerListener(listener);
}
-
if(logFileName == null && !userSpecifiedLog) {
logFileName = validateLogFile(properties.getProperty("pig.logfile"), null);
}
@@ -387,7 +389,7 @@ static int run(String args[], PigProgres
PigContext.initializeImportList((String)properties.get("udf.import.list"));
PigContext.setClassLoader(pigContext.createCl(null));
-
+
// construct the parameter substitution preprocessor
Grunt grunt = null;
BufferedReader in;
@@ -416,8 +418,8 @@ static int run(String args[], PigProgres
.getPath(), type.name().toLowerCase());
}
}
-
- in = new BufferedReader(new FileReader(localFileRet.file));
+ //Reader is created by first loading "pig.load.default.statements" or .pigbootup file if available
+ in = new BufferedReader(new InputStreamReader(Utils.getCompositeStream(new FileInputStream(localFileRet.file), properties)));
// run parameter substitution preprocessor first
substFile = file + ".substituted";
@@ -511,7 +513,8 @@ static int run(String args[], PigProgres
}
// Interactive
mode = ExecMode.SHELL;
- ConsoleReader reader = new ConsoleReader(System.in, new OutputStreamWriter(System.out));
+ //Reader is created by first loading "pig.load.default.statements" or .pigbootup file if available
+ ConsoleReader reader = new ConsoleReader(Utils.getCompositeStream(System.in, properties), new OutputStreamWriter(System.out));
reader.setDefaultPrompt("grunt> ");
final String HISTORYFILE = ".pig_history";
String historyFile = System.getProperty("user.home") + File.separator + HISTORYFILE;
@@ -545,8 +548,9 @@ static int run(String args[], PigProgres
.getPath(), type.name().toLowerCase());
}
}
-
- in = new BufferedReader(new FileReader(localFileRet.file));
+ //Reader is created by first loading "pig.load.default.statements" or .pigbootup file if available
+ InputStream seqInputStream = Utils.getCompositeStream(new FileInputStream(localFileRet.file), properties);
+ in = new BufferedReader(new InputStreamReader(seqInputStream));
// run parameter substitution preprocessor first
substFile = remainders[0] + ".substituted";
@@ -983,26 +987,26 @@ private static SupportedScriptLang deter
}
private static int runEmbeddedScript(PigContext pigContext, String file, String engine)
- throws IOException {
- log.info("Run embedded script: " + engine);
- pigContext.connect();
- ScriptEngine scriptEngine = ScriptEngine.getInstance(engine);
- Map<String, List<PigStats>> statsMap = scriptEngine.run(pigContext, file);
- PigStatsUtil.setStatsMap(statsMap);
-
- int failCount = 0;
- int totalCount = 0;
- for (List<PigStats> lst : statsMap.values()) {
- if (lst != null && !lst.isEmpty()) {
- for (PigStats stats : lst) {
- if (!stats.isSuccessful()) failCount++;
- totalCount++;
- }
- }
- }
- return (totalCount > 0 && failCount == totalCount) ? ReturnCode.FAILURE
- : (failCount > 0) ? ReturnCode.PARTIAL_FAILURE
- : ReturnCode.SUCCESS;
+throws IOException {
+ log.info("Run embedded script: " + engine);
+ pigContext.connect();
+ ScriptEngine scriptEngine = ScriptEngine.getInstance(engine);
+ Map<String, List<PigStats>> statsMap = scriptEngine.run(pigContext, file);
+ PigStatsUtil.setStatsMap(statsMap);
+
+ int failCount = 0;
+ int totalCount = 0;
+ for (List<PigStats> lst : statsMap.values()) {
+ if (lst != null && !lst.isEmpty()) {
+ for (PigStats stats : lst) {
+ if (!stats.isSuccessful()) failCount++;
+ totalCount++;
+ }
+ }
+ }
+ return (totalCount > 0 && failCount == totalCount) ? ReturnCode.FAILURE
+ : (failCount > 0) ? ReturnCode.PARTIAL_FAILURE
+ : ReturnCode.SUCCESS;
}
}
Modified: pig/trunk/src/org/apache/pig/impl/util/Utils.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/impl/util/Utils.java?rev=1239436&r1=1239435&r2=1239436&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/impl/util/Utils.java (original)
+++ pig/trunk/src/org/apache/pig/impl/util/Utils.java Thu Feb 2 02:17:18 2012
@@ -18,11 +18,19 @@
package org.apache.pig.impl.util;
import java.io.ByteArrayInputStream;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileNotFoundException;
import java.io.IOException;
+import java.io.InputStream;
+import java.io.SequenceInputStream;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
+import java.util.Properties;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
@@ -51,7 +59,7 @@ import com.google.common.collect.Lists;
* Class with utility static methods
*/
public class Utils {
-
+ private static final Log log = LogFactory.getLog(Utils.class);
/**
* This method is a helper for classes to implement {@link java.lang.Object#equals(java.lang.Object)}
* checks if two objects are equals - two levels of checks are
@@ -268,5 +276,18 @@ public class Utils {
return result;
}
+
+ public static InputStream getCompositeStream(InputStream in, Properties properties) {
+ //Load default ~/.pigbootup if not specified by user
+ final String bootupFile = properties.getProperty("pig.load.default.statements", System.getProperty("user.home") + "/.pigbootup");
+ try {
+ final InputStream inputSteam = new FileInputStream(new File(bootupFile));
+ return new SequenceInputStream(inputSteam, in);
+ } catch(FileNotFoundException fe) {
+ log.info("Default bootup file " +bootupFile+ " not found");
+ return in;
+ }
+ }
+
}
Modified: pig/trunk/test/org/apache/pig/pigunit/pig/PigServer.java
URL: http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/pigunit/pig/PigServer.java?rev=1239436&r1=1239435&r2=1239436&view=diff
==============================================================================
--- pig/trunk/test/org/apache/pig/pigunit/pig/PigServer.java (original)
+++ pig/trunk/test/org/apache/pig/pigunit/pig/PigServer.java Thu Feb 2 02:17:18 2012
@@ -12,15 +12,17 @@
*/
package org.apache.pig.pigunit.pig;
-import java.io.File;
+import java.io.FileInputStream;
import java.io.FileNotFoundException;
-import java.io.FileReader;
import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
import java.util.Map;
import java.util.Properties;
import org.apache.pig.ExecType;
import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.impl.util.Utils;
/**
* Slightly modified PigServer that accepts a list of Pig aliases to override.
@@ -47,7 +49,8 @@ public class PigServer extends org.apach
public void registerScript(String fileName, Map<String, String> aliasOverride)
throws IOException {
try {
- GruntParser grunt = new GruntParser(new FileReader(new File(fileName)), aliasOverride);
+ InputStream compositeStream = Utils.getCompositeStream(new FileInputStream(fileName), pigContext.getProperties());
+ GruntParser grunt = new GruntParser(new InputStreamReader(compositeStream), aliasOverride);
grunt.setInteractive(false);
grunt.setParams(this);
grunt.parseStopOnError(true);
Modified: pig/trunk/test/org/apache/pig/test/pigunit/TestPigTest.java
URL: http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/test/pigunit/TestPigTest.java?rev=1239436&r1=1239435&r2=1239436&view=diff
==============================================================================
--- pig/trunk/test/org/apache/pig/test/pigunit/TestPigTest.java (original)
+++ pig/trunk/test/org/apache/pig/test/pigunit/TestPigTest.java Thu Feb 2 02:17:18 2012
@@ -13,12 +13,17 @@
package org.apache.pig.test.pigunit;
import java.io.File;
+import java.io.FileWriter;
import java.io.IOException;
+import java.io.PrintWriter;
import junit.framework.TestCase;
import org.apache.commons.lang.StringUtils;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.fs.Path;
+import org.apache.pig.ExecType;
import org.apache.pig.pigunit.Cluster;
import org.apache.pig.pigunit.PigTest;
import org.apache.pig.pigunit.pig.PigServer;
@@ -41,7 +46,8 @@ public class TestPigTest {
private PigTest test;
private static Cluster cluster;
private static final String PIG_SCRIPT = "test/data/pigunit/top_queries.pig";
-
+ private static final Log LOG = LogFactory.getLog(TestPigTest.class);
+
@BeforeClass
public static void setUpOnce() throws IOException {
cluster = PigTest.getCluster();
@@ -298,4 +304,78 @@ public class TestPigTest {
test.assertOutput(new File("data/top_queries_expected_top_3.txt"));
}
+
+ /**
+ * This is a test for default bootup. PIG-2456
+ * @throws IOException
+ */
+
+ @Test
+ public void testDefaultBootup() throws ParseException, IOException {
+ //Test with properties file
+ String pigProps = "pig.properties";
+ String bootupPath = "/tmp/.temppigbootup";
+ File propertyFile = new File(pigProps);
+ PrintWriter out = new PrintWriter(new FileWriter(propertyFile));
+ out.println("pig.load.default.statements="+bootupPath);
+ out.close();
+
+ File bootupFile = new File(bootupPath);
+ out = new PrintWriter(new FileWriter(bootupFile));
+ out.println("data = LOAD 'top_queries_input_data.txt' AS (query:CHARARRAY, count:INT);");
+ out.close();
+
+ String[] script = {
+ //The following line is commented as the test creates a bootup file which contains it instead. PigTests (and Pig scripts in general) will read the bootup file to load default statements
+ //"data = LOAD 'top_queries_input_data.txt' AS (query:CHARARRAY, count:INT);",
+ "queries_group = GROUP data BY query PARALLEL 1;",
+ "queries_sum = FOREACH queries_group GENERATE group AS query, SUM(data.count) AS count;",
+ "queries_ordered = ORDER queries_sum BY count DESC PARALLEL 1;",
+ "queries_limit = LIMIT queries_ordered 3;",
+ "STORE queries_limit INTO 'top_3_queries';",
+ };
+
+ String scriptPath = "/tmp/tempScript";
+ File scriptFile = new File(scriptPath);
+ out = new PrintWriter(new FileWriter(scriptFile));
+ for(String line : script) {
+ out.println(line);
+ }
+ out.close();
+
+
+ String[] args = {
+ "n=3",
+ "reducers=1",
+ "input=top_queries_input_data.txt",
+ "output=top_3_queries",
+ };
+
+ //Create a pigunit.pig.PigServer and Cluster to run this test.
+ PigServer pig = null;
+ if (System.getProperties().containsKey("pigunit.exectype.cluster")) {
+ LOG.info("Using cluster mode");
+ pig = new PigServer(ExecType.MAPREDUCE);
+ } else {
+ LOG.info("Using default local mode");
+ pig = new PigServer(ExecType.LOCAL);
+ }
+
+ final Cluster cluster = new Cluster(pig.getPigContext());
+
+ test = new PigTest(scriptPath, args, pig, cluster);
+
+ String[] output = {
+ "(yahoo,25)",
+ "(facebook,15)",
+ "(twitter,7)",
+ };
+
+ test.assertOutput("queries_limit", output);
+
+ propertyFile.delete();
+ scriptFile.delete();
+ bootupFile.delete();
+ }
+
}