You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by da...@apache.org on 2012/02/02 03:17:19 UTC

svn commit: r1239436 - in /pig/trunk: CHANGES.txt src/org/apache/pig/Main.java src/org/apache/pig/impl/util/Utils.java test/org/apache/pig/pigunit/pig/PigServer.java test/org/apache/pig/test/pigunit/TestPigTest.java

Author: daijy
Date: Thu Feb  2 02:17:18 2012
New Revision: 1239436

URL: http://svn.apache.org/viewvc?rev=1239436&view=rev
Log:
PIG-2456: Pig should have a pigrc to specify default script cache

Modified:
    pig/trunk/CHANGES.txt
    pig/trunk/src/org/apache/pig/Main.java
    pig/trunk/src/org/apache/pig/impl/util/Utils.java
    pig/trunk/test/org/apache/pig/pigunit/pig/PigServer.java
    pig/trunk/test/org/apache/pig/test/pigunit/TestPigTest.java

Modified: pig/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/pig/trunk/CHANGES.txt?rev=1239436&r1=1239435&r2=1239436&view=diff
==============================================================================
--- pig/trunk/CHANGES.txt (original)
+++ pig/trunk/CHANGES.txt Thu Feb  2 02:17:18 2012
@@ -24,6 +24,8 @@ INCOMPATIBLE CHANGES
 
 IMPROVEMENTS
 
+PIG-2456: Pig should have a pigrc to specify default script cache (prkommireddi via daijy)
+
 PIG-2496: Cache resolved classes in PigContext (dvryaboy)
 
 PIG-2482: Integrate HCat DDL command into Pig (daijy)

Modified: pig/trunk/src/org/apache/pig/Main.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/Main.java?rev=1239436&r1=1239435&r2=1239436&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/Main.java (original)
+++ pig/trunk/src/org/apache/pig/Main.java Thu Feb  2 02:17:18 2012
@@ -22,12 +22,14 @@ import java.io.BufferedReader;
 import java.io.BufferedWriter;
 import java.io.File;
 import java.io.FileInputStream;
+import java.io.FileNotFoundException;
 import java.io.FileReader;
 import java.io.FileWriter;
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.InputStreamReader;
 import java.io.OutputStreamWriter;
+import java.io.SequenceInputStream;
 import java.io.StringReader;
 import java.io.StringWriter;
 import java.text.ParseException;
@@ -66,6 +68,7 @@ import org.apache.pig.impl.util.LogUtils
 import org.apache.pig.impl.util.ObjectSerializer;
 import org.apache.pig.impl.util.PropertiesUtil;
 import org.apache.pig.impl.util.UDFContext;
+import org.apache.pig.impl.util.Utils;
 import org.apache.pig.parser.DryRunGruntParser;
 import org.apache.pig.scripting.ScriptEngine;
 import org.apache.pig.scripting.ScriptEngine.SupportedScriptLang;
@@ -360,7 +363,6 @@ static int run(String args[], PigProgres
             scriptState.registerListener(listener);
         }
 
-
         if(logFileName == null && !userSpecifiedLog) {
             logFileName = validateLogFile(properties.getProperty("pig.logfile"), null);
         }
@@ -387,7 +389,7 @@ static int run(String args[], PigProgres
             PigContext.initializeImportList((String)properties.get("udf.import.list"));
 
         PigContext.setClassLoader(pigContext.createCl(null));
-
+    
         // construct the parameter substitution preprocessor
         Grunt grunt = null;
         BufferedReader in;
@@ -416,8 +418,8 @@ static int run(String args[], PigProgres
                                 .getPath(), type.name().toLowerCase());
                 }
             }
-
-            in = new BufferedReader(new FileReader(localFileRet.file));
+            //Reader is created by first loading "pig.load.default.statements" or .pigbootup file if available
+            in = new BufferedReader(new InputStreamReader(Utils.getCompositeStream(new FileInputStream(localFileRet.file), properties)));
 
             // run parameter substitution preprocessor first
             substFile = file + ".substituted";
@@ -511,7 +513,8 @@ static int run(String args[], PigProgres
             }
             // Interactive
             mode = ExecMode.SHELL;
-            ConsoleReader reader = new ConsoleReader(System.in, new OutputStreamWriter(System.out));
+          //Reader is created by first loading "pig.load.default.statements" or .pigbootup file if available
+            ConsoleReader reader = new ConsoleReader(Utils.getCompositeStream(System.in, properties), new OutputStreamWriter(System.out));
             reader.setDefaultPrompt("grunt> ");
             final String HISTORYFILE = ".pig_history";
             String historyFile = System.getProperty("user.home") + File.separator  + HISTORYFILE;
@@ -545,8 +548,9 @@ static int run(String args[], PigProgres
                                 .getPath(), type.name().toLowerCase());
                 }
             }
-
-            in = new BufferedReader(new FileReader(localFileRet.file));
+            //Reader is created by first loading "pig.load.default.statements" or .pigbootup file if available
+            InputStream seqInputStream = Utils.getCompositeStream(new FileInputStream(localFileRet.file), properties);
+            in = new BufferedReader(new InputStreamReader(seqInputStream));
 
             // run parameter substitution preprocessor first
             substFile = remainders[0] + ".substituted";
@@ -983,26 +987,26 @@ private static SupportedScriptLang deter
 }
 
 private static int runEmbeddedScript(PigContext pigContext, String file, String engine)
-        throws IOException {
-    log.info("Run embedded script: " + engine);
-    pigContext.connect();
-    ScriptEngine scriptEngine = ScriptEngine.getInstance(engine);
-    Map<String, List<PigStats>> statsMap = scriptEngine.run(pigContext, file);
-    PigStatsUtil.setStatsMap(statsMap);
-
-    int failCount = 0;
-    int totalCount = 0;
-    for (List<PigStats> lst : statsMap.values()) {
-        if (lst != null && !lst.isEmpty()) {
-            for (PigStats stats : lst) {
-                if (!stats.isSuccessful()) failCount++;
-                totalCount++;
-            }
-        }
-    }
-    return (totalCount > 0 && failCount == totalCount) ? ReturnCode.FAILURE
-            : (failCount > 0) ? ReturnCode.PARTIAL_FAILURE
-                    : ReturnCode.SUCCESS;
+throws IOException {
+	log.info("Run embedded script: " + engine);
+	pigContext.connect();
+	ScriptEngine scriptEngine = ScriptEngine.getInstance(engine);
+	Map<String, List<PigStats>> statsMap = scriptEngine.run(pigContext, file);
+	PigStatsUtil.setStatsMap(statsMap);
+
+	int failCount = 0;
+	int totalCount = 0;
+	for (List<PigStats> lst : statsMap.values()) {
+		if (lst != null && !lst.isEmpty()) {
+			for (PigStats stats : lst) {
+				if (!stats.isSuccessful()) failCount++;
+				totalCount++;
+			}
+		}
+	}
+	return (totalCount > 0 && failCount == totalCount) ? ReturnCode.FAILURE
+			: (failCount > 0) ? ReturnCode.PARTIAL_FAILURE
+					: ReturnCode.SUCCESS;
 }
 
 }

Modified: pig/trunk/src/org/apache/pig/impl/util/Utils.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/impl/util/Utils.java?rev=1239436&r1=1239435&r2=1239436&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/impl/util/Utils.java (original)
+++ pig/trunk/src/org/apache/pig/impl/util/Utils.java Thu Feb  2 02:17:18 2012
@@ -18,11 +18,19 @@
 package org.apache.pig.impl.util;
 
 import java.io.ByteArrayInputStream;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileNotFoundException;
 import java.io.IOException;
+import java.io.InputStream;
+import java.io.SequenceInputStream;
 import java.util.Collection;
 import java.util.HashMap;
 import java.util.List;
+import java.util.Properties;
 
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
@@ -51,7 +59,7 @@ import com.google.common.collect.Lists;
  * Class with utility static methods
  */
 public class Utils {
-
+	private static final Log log = LogFactory.getLog(Utils.class);
     /**
      * This method is a helper for classes to implement {@link java.lang.Object#equals(java.lang.Object)}
      * checks if two objects are equals - two levels of checks are
@@ -268,5 +276,18 @@ public class Utils {
         
         return result;
     }
+    
+   public static InputStream getCompositeStream(InputStream in, Properties properties) {
+	   //Load default ~/.pigbootup if not specified by user
+    	final String bootupFile = properties.getProperty("pig.load.default.statements", System.getProperty("user.home") + "/.pigbootup");
+    	try {
+    	final InputStream inputSteam = new FileInputStream(new File(bootupFile));
+    	return new SequenceInputStream(inputSteam, in);
+    	} catch(FileNotFoundException fe) {
+    		log.info("Default bootup file " +bootupFile+ " not found");
+    		return in;
+    	}
+    }
+    
 
 }

Modified: pig/trunk/test/org/apache/pig/pigunit/pig/PigServer.java
URL: http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/pigunit/pig/PigServer.java?rev=1239436&r1=1239435&r2=1239436&view=diff
==============================================================================
--- pig/trunk/test/org/apache/pig/pigunit/pig/PigServer.java (original)
+++ pig/trunk/test/org/apache/pig/pigunit/pig/PigServer.java Thu Feb  2 02:17:18 2012
@@ -12,15 +12,17 @@
  */
 package org.apache.pig.pigunit.pig;
 
-import java.io.File;
+import java.io.FileInputStream;
 import java.io.FileNotFoundException;
-import java.io.FileReader;
 import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
 import java.util.Map;
 import java.util.Properties;
 
 import org.apache.pig.ExecType;
 import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.impl.util.Utils;
 
 /**
  * Slightly modified PigServer that accepts a list of Pig aliases to override.
@@ -47,7 +49,8 @@ public class PigServer extends org.apach
   public void registerScript(String fileName, Map<String, String> aliasOverride)
       throws IOException {
     try {
-      GruntParser grunt = new GruntParser(new FileReader(new File(fileName)), aliasOverride);
+      InputStream compositeStream = Utils.getCompositeStream(new FileInputStream(fileName), pigContext.getProperties());
+      GruntParser grunt = new GruntParser(new InputStreamReader(compositeStream), aliasOverride);
       grunt.setInteractive(false);
       grunt.setParams(this);
       grunt.parseStopOnError(true);

Modified: pig/trunk/test/org/apache/pig/test/pigunit/TestPigTest.java
URL: http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/test/pigunit/TestPigTest.java?rev=1239436&r1=1239435&r2=1239436&view=diff
==============================================================================
--- pig/trunk/test/org/apache/pig/test/pigunit/TestPigTest.java (original)
+++ pig/trunk/test/org/apache/pig/test/pigunit/TestPigTest.java Thu Feb  2 02:17:18 2012
@@ -13,12 +13,17 @@
 package org.apache.pig.test.pigunit;
 
 import java.io.File;
+import java.io.FileWriter;
 import java.io.IOException;
+import java.io.PrintWriter;
 
 import junit.framework.TestCase;
 
 import org.apache.commons.lang.StringUtils;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.fs.Path;
+import org.apache.pig.ExecType;
 import org.apache.pig.pigunit.Cluster;
 import org.apache.pig.pigunit.PigTest;
 import org.apache.pig.pigunit.pig.PigServer;
@@ -41,7 +46,8 @@ public class TestPigTest {
   private PigTest test;
   private static Cluster cluster;
   private static final String PIG_SCRIPT = "test/data/pigunit/top_queries.pig";
-
+  private static final Log LOG = LogFactory.getLog(TestPigTest.class);
+  
   @BeforeClass
   public static void setUpOnce() throws IOException {
     cluster = PigTest.getCluster();
@@ -298,4 +304,78 @@ public class TestPigTest {
 
     test.assertOutput(new File("data/top_queries_expected_top_3.txt"));
   }
+
+  /**
+ * This is a test for default bootup. PIG-2456
+ * @throws IOException
+ */
+
+  @Test
+  public void testDefaultBootup() throws ParseException, IOException {
+	  //Test with properties file
+	  String pigProps = "pig.properties";
+	  String bootupPath = "/tmp/.temppigbootup";
+	  File propertyFile = new File(pigProps);
+	  PrintWriter out = new PrintWriter(new FileWriter(propertyFile));
+	  out.println("pig.load.default.statements="+bootupPath);
+	  out.close();
+
+	  File bootupFile = new File(bootupPath);
+	  out = new PrintWriter(new FileWriter(bootupFile));
+	  out.println("data = LOAD 'top_queries_input_data.txt' AS (query:CHARARRAY, count:INT);");
+	  out.close();
+
+	  String[] script = {
+			  //The following line is commented as the test creates a bootup file which contains it instead. PigTests (and Pig scripts in general) will read the bootup file to load default statements
+			  //"data = LOAD 'top_queries_input_data.txt' AS (query:CHARARRAY, count:INT);",   
+			  "queries_group = GROUP data BY query PARALLEL 1;",
+			  "queries_sum = FOREACH queries_group GENERATE group AS query, SUM(data.count) AS count;",
+			  "queries_ordered = ORDER queries_sum BY count DESC PARALLEL 1;",
+			  "queries_limit = LIMIT queries_ordered 3;",
+			  "STORE queries_limit INTO 'top_3_queries';",
+	  };
+
+	  String scriptPath = "/tmp/tempScript";
+	  File scriptFile = new File(scriptPath);
+	  out = new PrintWriter(new FileWriter(scriptFile));
+	  for(String line : script) {
+		  out.println(line);
+	  }
+	  out.close();
+
+
+	  String[] args = {
+			  "n=3",
+			  "reducers=1",
+			  "input=top_queries_input_data.txt",
+			  "output=top_3_queries",
+	  };
+
+	  //Create a pigunit.pig.PigServer and Cluster to run this test.
+	  PigServer pig = null;
+	  if (System.getProperties().containsKey("pigunit.exectype.cluster")) {
+		  LOG.info("Using cluster mode");
+		  pig = new PigServer(ExecType.MAPREDUCE);
+	  } else {
+		  LOG.info("Using default local mode");
+		  pig = new PigServer(ExecType.LOCAL);
+	  }
+
+	  final Cluster cluster = new Cluster(pig.getPigContext());
+
+	  test = new PigTest(scriptPath, args, pig, cluster);
+
+	  String[] output = {
+			  "(yahoo,25)",
+			  "(facebook,15)",
+			  "(twitter,7)",
+	  };
+
+	  test.assertOutput("queries_limit", output);
+
+	  propertyFile.delete();
+	  scriptFile.delete();
+	  bootupFile.delete();
+  }
+
 }