You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by ol...@apache.org on 2008/07/19 03:43:32 UTC
svn commit: r678090 [1/2] - in /incubator/pig/branches/types:
lib-src/shock/org/apache/pig/shock/ src/org/apache/pig/
src/org/apache/pig/backend/executionengine/
src/org/apache/pig/backend/hadoop/datastorage/
src/org/apache/pig/backend/hadoop/execution...
Author: olga
Date: Fri Jul 18 18:43:31 2008
New Revision: 678090
URL: http://svn.apache.org/viewvc?rev=678090&view=rev
Log:
merge of PIG-111-215-236-18-266-291
Added:
incubator/pig/branches/types/src/org/apache/pig/impl/util/ConfigurationValidator.java
Modified:
incubator/pig/branches/types/lib-src/shock/org/apache/pig/shock/SSHSocketImplFactory.java
incubator/pig/branches/types/src/org/apache/pig/Main.java
incubator/pig/branches/types/src/org/apache/pig/PigServer.java
incubator/pig/branches/types/src/org/apache/pig/StandAloneParser.java
incubator/pig/branches/types/src/org/apache/pig/backend/executionengine/ExecException.java
incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/datastorage/HDataStorage.java
incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/HExecutionEngine.java
incubator/pig/branches/types/src/org/apache/pig/backend/local/executionengine/LocalExecutionEngine.java
incubator/pig/branches/types/src/org/apache/pig/impl/PigContext.java
incubator/pig/branches/types/src/org/apache/pig/impl/io/FileLocalizer.java
incubator/pig/branches/types/src/org/apache/pig/impl/mapReduceLayer/MapReduceLauncher.java
incubator/pig/branches/types/src/org/apache/pig/impl/mapReduceLayer/SortPartitioner.java
incubator/pig/branches/types/src/org/apache/pig/tools/grunt/GruntParser.java
incubator/pig/branches/types/test/org/apache/pig/test/MiniCluster.java
incubator/pig/branches/types/test/org/apache/pig/test/TestAlgebraicEval.java
incubator/pig/branches/types/test/org/apache/pig/test/TestCombiner.java
incubator/pig/branches/types/test/org/apache/pig/test/TestCompressedFiles.java
incubator/pig/branches/types/test/org/apache/pig/test/TestEvalPipeline.java
incubator/pig/branches/types/test/org/apache/pig/test/TestFilterOpNumeric.java
incubator/pig/branches/types/test/org/apache/pig/test/TestFilterOpString.java
incubator/pig/branches/types/test/org/apache/pig/test/TestGrunt.java
incubator/pig/branches/types/test/org/apache/pig/test/TestInfixArithmetic.java
incubator/pig/branches/types/test/org/apache/pig/test/TestInputOutputFileValidator.java
incubator/pig/branches/types/test/org/apache/pig/test/TestJobSubmission.java
incubator/pig/branches/types/test/org/apache/pig/test/TestLargeFile.java
incubator/pig/branches/types/test/org/apache/pig/test/TestLogToPhyCompiler.java
incubator/pig/branches/types/test/org/apache/pig/test/TestLogicalPlanBuilder.java
incubator/pig/branches/types/test/org/apache/pig/test/TestMapReduce.java
incubator/pig/branches/types/test/org/apache/pig/test/TestOrderBy.java
incubator/pig/branches/types/test/org/apache/pig/test/TestPi.java
incubator/pig/branches/types/test/org/apache/pig/test/TestPigScriptParser.java
incubator/pig/branches/types/test/org/apache/pig/test/TestPigServer.java
incubator/pig/branches/types/test/org/apache/pig/test/TestPigSplit.java
incubator/pig/branches/types/test/org/apache/pig/test/TestStoreOld.java
incubator/pig/branches/types/test/org/apache/pig/test/Util.java
incubator/pig/branches/types/test/org/apache/pig/test/utils/LogicalPlanTester.java
Modified: incubator/pig/branches/types/lib-src/shock/org/apache/pig/shock/SSHSocketImplFactory.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/lib-src/shock/org/apache/pig/shock/SSHSocketImplFactory.java?rev=678090&r1=678089&r2=678090&view=diff
==============================================================================
--- incubator/pig/branches/types/lib-src/shock/org/apache/pig/shock/SSHSocketImplFactory.java (original)
+++ incubator/pig/branches/types/lib-src/shock/org/apache/pig/shock/SSHSocketImplFactory.java Fri Jul 18 18:43:31 2008
@@ -30,6 +30,7 @@
import java.net.SocketAddress;
import java.net.SocketException;
import java.net.SocketImpl;
+import java.net.SocketOptions;
import java.net.SocketImplFactory;
import java.net.UnknownHostException;
import java.net.Proxy.Type;
@@ -436,7 +437,10 @@
}
public Object getOption(int optID) throws SocketException {
- throw new SocketException("SSHSocketImpl does not implement getOption");
+ if (optID == SocketOptions.SO_SNDBUF)
+ return new Integer(1024);
+ else
+ throw new SocketException("SSHSocketImpl does not implement getOption for " + optID);
}
/**
Modified: incubator/pig/branches/types/src/org/apache/pig/Main.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/Main.java?rev=678090&r1=678089&r2=678090&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/Main.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/Main.java Fri Jul 18 18:43:31 2008
@@ -33,9 +33,12 @@
import org.apache.log4j.PatternLayout;
import org.apache.log4j.PropertyConfigurator;
import org.apache.pig.ExecType;
+import org.apache.pig.backend.hadoop.executionengine.HExecutionEngine;
import org.apache.pig.impl.PigContext;
+import org.apache.pig.impl.io.FileLocalizer;
import org.apache.pig.impl.logicalLayer.LogicalPlanBuilder;
import org.apache.pig.impl.util.JarManager;
+import org.apache.pig.impl.util.PropertiesUtil;
import org.apache.pig.tools.cmdline.CmdLineParser;
import org.apache.pig.tools.grunt.Grunt;
import org.apache.pig.tools.timer.PerformanceTimerFactory;
@@ -46,7 +49,13 @@
private final static Log log = LogFactory.getLog(Main.class);
-private enum ExecMode {STRING, FILE, SHELL, UNKNOWN};
+ private static final String LOG4J_CONF = "log4jconf";
+ private static final String BRIEF = "brief";
+ private static final String DEBUG = "debug";
+ private static final String JAR = "jar";
+ private static final String VERBOSE = "verbose";
+
+ private enum ExecMode {STRING, FILE, SHELL, UNKNOWN};
/**
* The Main-Class for the Pig Jar that will provide a shell and setup a classpath appropriate
@@ -61,26 +70,17 @@
public static void main(String args[])
{
int rc = 1;
- PigContext pigContext = new PigContext();
+ Properties properties = new Properties();
+ PropertiesUtil.loadPropertiesFromFile(properties);
try {
- BufferedReader in = null;
BufferedReader pin = null;
- ExecMode mode = ExecMode.UNKNOWN;
- int port = 0;
- String file = null;
- Level logLevel = Level.INFO;
- boolean brief = false;
- String log4jconf = null;
- boolean verbose = false;
boolean debug = false;
boolean dryrun = false;
ArrayList<String> params = new ArrayList<String>();
ArrayList<String> paramFiles = new ArrayList<String>();
CmdLineParser opts = new CmdLineParser(args);
- // Don't use -l, --latest, -c, --cluster, -cp, -classpath, -D as these
- // are masked by the startup perl script.
opts.registerOpt('4', "log4jconf", CmdLineParser.ValueExpected.REQUIRED);
opts.registerOpt('b', "brief", CmdLineParser.ValueExpected.NOT_ACCEPTED);
opts.registerOpt('c', "cluster", CmdLineParser.ValueExpected.REQUIRED);
@@ -97,32 +97,48 @@
opts.registerOpt('m', "param_file", CmdLineParser.ValueExpected.OPTIONAL);
opts.registerOpt('r', "dryrun", CmdLineParser.ValueExpected.NOT_ACCEPTED);
+ ExecMode mode = ExecMode.UNKNOWN;
+ String file = null;
+ ExecType execType = ExecType.MAPREDUCE ;
+ String execTypeString = properties.getProperty("exectype");
+ if(execTypeString!=null && execTypeString.length()>0){
+ execType = PigServer.parseExecType(execTypeString);
+ }
+ String cluster = "local";
+ String clusterConfigured = properties.getProperty("cluster");
+ if(clusterConfigured != null && clusterConfigured.length() > 0){
+ cluster = clusterConfigured;
+ }
+
char opt;
while ((opt = opts.getNextOpt()) != CmdLineParser.EndOfOpts) {
switch (opt) {
case '4':
- log4jconf = opts.getValStr();
+ String log4jconf = opts.getValStr();
+ if(log4jconf != null){
+ properties.setProperty(LOG4J_CONF, log4jconf);
+ }
break;
case 'b':
- brief = true;
+ properties.setProperty(BRIEF, "true");
break;
- case 'c': {
+ case 'c':
// Needed away to specify the cluster to run the MR job on
// Bug 831708 - fixed
- String cluster = opts.getValStr();
- System.out.println("Changing MR cluster to " + cluster);
- if(cluster.indexOf(':') < 0) {
- cluster = cluster + ":50020";
- }
- pigContext.setJobtrackerLocation(cluster);
+ String clusterParameter = opts.getValStr();
+ if (clusterParameter != null && clusterParameter.length() > 0) {
+ cluster = clusterParameter;
+ }
break;
- }
case 'd':
+ String logLevel = opts.getValStr();
+ if (logLevel != null) {
+ properties.setProperty(DEBUG, logLevel);
+ }
debug = true;
- logLevel = Level.toLevel(opts.getValStr(), Level.INFO);
break;
case 'e':
@@ -138,29 +154,26 @@
usage();
return;
- case 'j': {
- String splits[] = opts.getValStr().split(":", -1);
- for (int i = 0; i < splits.length; i++) {
- if (splits[i].length() > 0) {
- pigContext.addJar(splits[i]);
- }
- }
+ case 'j':
+ String jarsString = opts.getValStr();
+ if(jarsString != null){
+ properties.setProperty(JAR, jarsString);
+ }
break;
- }
case 'm':
paramFiles.add(opts.getValStr());
break;
- case 'o': {
- String gateway = System.getProperty("ssh.gateway");
- if (gateway == null || gateway.length() == 0) {
- System.setProperty("hod.server", "local");
- } else {
- System.setProperty("hod.server", System.getProperty("ssh.gateway"));
- }
+ case 'o':
+ // TODO sgroschupf using system properties is always a very bad idea
+ String gateway = System.getProperty("ssh.gateway");
+ if (gateway == null || gateway.length() == 0) {
+ properties.setProperty("hod.server", "local");
+ } else {
+ properties.setProperty("hod.server", System.getProperty("ssh.gateway"));
+ }
break;
- }
case 'p':
String val = opts.getValStr();
@@ -174,17 +187,15 @@
break;
case 'v':
- verbose = true;
+ properties.setProperty(VERBOSE, ""+true);
break;
case 'x':
- ExecType exectype;
- try {
- exectype = PigServer.parseExecType(opts.getValStr());
- } catch (IOException e) {
- throw new RuntimeException("ERROR: Unrecognized exectype.", e);
- }
- pigContext.setExecType(exectype);
+ try {
+ execType = PigServer.parseExecType(opts.getValStr());
+ } catch (IOException e) {
+ throw new RuntimeException("ERROR: Unrecognized exectype.", e);
+ }
break;
case 'i':
System.out.println(getVersionString());
@@ -195,44 +206,16 @@
}
}
}
+ // configure logging
+ configureLog4J(properties);
+ // create the context with the parameter
+ PigContext pigContext = new PigContext(execType, properties);
LogicalPlanBuilder.classloader = pigContext.createCl(null);
- if (log4jconf != null) {
- PropertyConfigurator.configure(log4jconf);
- } else if (!brief) {
- // non-brief logging - timestamps
- Properties props = new Properties();
- props.setProperty("log4j.rootLogger", "INFO, PIGCONSOLE");
- props.setProperty("log4j.appender.PIGCONSOLE",
- "org.apache.log4j.ConsoleAppender");
- props.setProperty("log4j.appender.PIGCONSOLE.layout",
- "org.apache.log4j.PatternLayout");
- props.setProperty("log4j.appender.PIGCONSOLE.layout.ConversionPattern",
- "%d [%t] %-5p %c - %m%n");
- PropertyConfigurator.configure(props);
- // Set the log level/threshold
- Logger.getRootLogger().setLevel(verbose ? Level.ALL : logLevel);
- } else {
- // brief logging - no timestamps
- Properties props = new Properties();
- props.setProperty("log4j.rootLogger", "INFO, PIGCONSOLE");
- props.setProperty("log4j.appender.PIGCONSOLE",
- "org.apache.log4j.ConsoleAppender");
- props.setProperty("log4j.appender.PIGCONSOLE.layout",
- "org.apache.log4j.PatternLayout");
- props.setProperty("log4j.appender.PIGCONSOLE.layout.ConversionPattern",
- "%m%n");
- PropertyConfigurator.configure(props);
- // Set the log level/threshold
- Logger.getRootLogger().setLevel(verbose ? Level.ALL : logLevel);
- }
-
- // TODO Add a file appender for the logs
- // TODO Need to create a property in the properties file for it.
-
// construct the parameter subsitution preprocessor
Grunt grunt = null;
+ BufferedReader in;
String substFile = null;
switch (mode) {
case FILE:
@@ -321,14 +304,65 @@
usage();
rc = 1;
} catch (Throwable e) {
+ //log.error(e);
+ // this is a hack to see full error till we resolve commons logging config
e.printStackTrace();
- log.error(e);
} finally {
+ // clear temp files
+ FileLocalizer.deleteTempFiles();
PerformanceTimerFactory.getPerfTimerFactory().dumpTimers();
System.exit(rc);
}
}
+//TODO jz: log4j.properties should be used instead
+private static void configureLog4J(Properties properties) {
+ // TODO Add a file appender for the logs
+ // TODO Need to create a property in the properties file for it.
+ // sgroschupf, 25Feb2008: this method will be obsolete with PIG-115.
+
+ String log4jconf = properties.getProperty(LOG4J_CONF);
+ String trueString = "true";
+ boolean brief = trueString.equalsIgnoreCase(properties.getProperty(BRIEF));
+ boolean verbose = trueString.equalsIgnoreCase(properties.getProperty(VERBOSE));
+ Level logLevel = Level.INFO;
+
+ String logLevelString = properties.getProperty(DEBUG);
+ if (logLevelString != null){
+ logLevel = Level.toLevel(logLevelString, Level.INFO);
+ }
+
+ if (log4jconf != null) {
+ PropertyConfigurator.configure(log4jconf);
+ } else if (!brief ) {
+ // non-brief logging - timestamps
+ Properties props = new Properties();
+ props.setProperty("log4j.rootLogger", "INFO, PIGCONSOLE");
+ props.setProperty("log4j.appender.PIGCONSOLE",
+ "org.apache.log4j.ConsoleAppender");
+ props.setProperty("log4j.appender.PIGCONSOLE.layout",
+ "org.apache.log4j.PatternLayout");
+ props.setProperty("log4j.appender.PIGCONSOLE.layout.ConversionPattern",
+ "%d [%t] %-5p %c - %m%n");
+ PropertyConfigurator.configure(props);
+ // Set the log level/threshold
+ Logger.getRootLogger().setLevel(verbose ? Level.ALL : logLevel);
+ } else {
+ // brief logging - no timestamps
+ Properties props = new Properties();
+ props.setProperty("log4j.rootLogger", "INFO, PIGCONSOLE");
+ props.setProperty("log4j.appender.PIGCONSOLE",
+ "org.apache.log4j.ConsoleAppender");
+ props.setProperty("log4j.appender.PIGCONSOLE.layout",
+ "org.apache.log4j.PatternLayout");
+ props.setProperty("log4j.appender.PIGCONSOLE.layout.ConversionPattern",
+ "%m%n");
+ PropertyConfigurator.configure(props);
+ // Set the log level/threshold
+ Logger.getRootLogger().setLevel(verbose ? Level.ALL : logLevel);
+ }
+}
+
// retruns the stream of final pig script to be passed to Grunt
private static BufferedReader runParamPreprocessor(BufferedReader origPigScript, ArrayList<String> params,
ArrayList<String> paramFiles, String scriptFile, boolean createFile)
Modified: incubator/pig/branches/types/src/org/apache/pig/PigServer.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/PigServer.java?rev=678090&r1=678089&r2=678090&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/PigServer.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/PigServer.java Fri Jul 18 18:43:31 2008
@@ -62,6 +62,7 @@
import org.apache.pig.impl.plan.OperatorKey;
import org.apache.pig.impl.plan.VisitorException;
import org.apache.pig.impl.util.WrappedIOException;
+import org.apache.pig.impl.util.PropertiesUtil;
/**
@@ -105,16 +106,23 @@
return user + "-" + date;
}
- public PigServer(String execType) throws ExecException, IOException {
- this(parseExecType(execType));
+ public PigServer(String execTypeString) throws ExecException, IOException {
+ this(parseExecType(execTypeString));
}
+ public PigServer(ExecType execType) throws ExecException {
+ this(execType, PropertiesUtil.loadPropertiesFromFile());
+ }
+
public PigServer() throws ExecException {
this(ExecType.MAPREDUCE);
}
- public PigServer(ExecType execType) throws ExecException {
- this.pigContext = new PigContext(execType);
+ public PigServer(ExecType execType, Properties properties) throws ExecException {
+ this.pigContext = new PigContext(execType, properties);
+ if (this.pigContext.getProperties().getProperty(PigContext.JOB_NAME) == null) {
+ setJobName("DefaultJobName") ;
+ }
pigContext.connect();
}
@@ -256,7 +264,7 @@
}
public void setJobName(String name){
- pigContext.setJobName(name);
+ pigContext.getProperties().setProperty(PigContext.JOB_NAME, PigContext.JOB_NAME_PREFIX + ":" + name);
}
/**
Modified: incubator/pig/branches/types/src/org/apache/pig/StandAloneParser.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/StandAloneParser.java?rev=678090&r1=678089&r2=678090&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/StandAloneParser.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/StandAloneParser.java Fri Jul 18 18:43:31 2008
@@ -21,6 +21,7 @@
import java.io.IOException;
import java.io.InputStreamReader;
import java.util.Iterator;
+import java.util.Properties;
import java.util.Map;
import org.apache.commons.logging.Log;
@@ -40,7 +41,7 @@
public static void main(String args[]) throws IOException, ExecException {
BufferedReader in = new BufferedReader(new InputStreamReader(System.in));
- pig = new PigServer(ExecType.LOCAL);
+ pig = new PigServer(ExecType.LOCAL, new Properties());
while (true) {
System.out.print("> ");
Modified: incubator/pig/branches/types/src/org/apache/pig/backend/executionengine/ExecException.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/backend/executionengine/ExecException.java?rev=678090&r1=678089&r2=678090&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/backend/executionengine/ExecException.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/backend/executionengine/ExecException.java Fri Jul 18 18:43:31 2008
@@ -26,14 +26,14 @@
}
public ExecException() {
- this(null, null);
+ super();
}
public ExecException(String message) {
- this(message, null);
+ super(message);
}
public ExecException(Throwable cause) {
- this(null, cause);
+ super(cause);
}
}
Modified: incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/datastorage/HDataStorage.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/datastorage/HDataStorage.java?rev=678090&r1=678089&r2=678090&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/datastorage/HDataStorage.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/datastorage/HDataStorage.java Fri Jul 18 18:43:31 2008
@@ -32,49 +32,65 @@
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.dfs.DistributedFileSystem;
import org.apache.hadoop.conf.Configuration;
-
-import org.apache.pig.backend.datastorage.*;
-
+import org.apache.pig.backend.datastorage.ContainerDescriptor;
+import org.apache.pig.backend.datastorage.DataStorage;
+import org.apache.pig.backend.datastorage.DataStorageException;
+import org.apache.pig.backend.datastorage.ElementDescriptor;
public class HDataStorage implements DataStorage {
-
+ private static final String FILE_SYSTEM_LOCATION = "fs.default.name";
+
private FileSystem fs;
-
- public HDataStorage(URI uri, Configuration conf) throws IOException {
- this(uri, new HConfiguration(conf));
- }
-
- public HDataStorage(URI uri, HConfiguration conf) throws IOException {
- fs = FileSystem.get(uri, conf.getConfiguration());
+ private Configuration configuration;
+ private Properties properties;
+ private URI uri;
+
+ public HDataStorage(URI uri, Properties properties) {
+ this.properties = properties;
+ this.uri = uri;
+ init();
}
- public HDataStorage(Configuration conf) throws IOException {
- this(new HConfiguration(conf));
+ public HDataStorage(Properties properties) {
+ this.properties = properties;
+ init();
}
-
- public HDataStorage(HConfiguration conf) throws IOException {
- fs = FileSystem.get(conf.getConfiguration());
+
+ public void init() {
+ // check if name node is set, if not we set local as fail back
+ String nameNode = this.properties.getProperty(FILE_SYSTEM_LOCATION);
+ if (nameNode == null || nameNode.length() == 0) {
+ nameNode = "local";
+ }
+ this.configuration = ConfigurationUtil.toConfiguration(this.properties);
+ try {
+ if (this.uri != null) {
+ this.fs = FileSystem.get(this.uri, this.configuration);
+ } else {
+ this.fs = FileSystem.get(this.configuration);
+ }
+ } catch (IOException e) {
+ throw new RuntimeException("Failed to create DataStorage", e);
+ }
+ short defaultReplication = fs.getDefaultReplication();
+ properties.setProperty(DEFAULT_REPLICATION_FACTOR_KEY, (new Short(
+ defaultReplication)).toString());
}
- public void init() { }
-
public void close() throws IOException {
fs.close();
}
public Properties getConfiguration() {
- Properties props = new HConfiguration(fs.getConf());
-
- short defaultReplication = fs.getDefaultReplication();
- props.setProperty(DEFAULT_REPLICATION_FACTOR_KEY,
- (new Short(defaultReplication)).toString());
-
- return props;
+ return this.properties;
}
-
- public void updateConfiguration(Properties newConfiguration)
- throws DataStorageException {
+
+ public void updateConfiguration(Properties newConfiguration)
+ throws DataStorageException {
+ // TODO sgroschupf 25Feb2008 this method is never called and
+ // I'm even not sure if hadoop would support that, I doubt it.
+
if (newConfiguration == null) {
return;
}
@@ -110,8 +126,7 @@
return stats;
}
- public ElementDescriptor asElement(String name)
- throws DataStorageException {
+ public ElementDescriptor asElement(String name) throws DataStorageException {
if (this.isContainer(name)) {
return new HDirectory(this, name);
}
Modified: incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/HExecutionEngine.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/HExecutionEngine.java?rev=678090&r1=678089&r2=678090&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/HExecutionEngine.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/HExecutionEngine.java Fri Jul 18 18:43:31 2008
@@ -18,22 +18,31 @@
package org.apache.pig.backend.hadoop.executionengine;
+import java.io.File;
import java.io.IOException;
-import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.io.OutputStreamWriter;
+import java.io.FileOutputStream;
+import java.io.BufferedReader;
+import java.io.BufferedWriter;
import java.io.PrintStream;
import java.net.InetAddress;
import java.net.Socket;
import java.net.SocketException;
import java.net.SocketImplFactory;
+import java.net.URI;
+import java.net.URISyntaxException;
import java.net.UnknownHostException;
import java.util.Collection;
import java.util.Enumeration;
import java.util.HashMap;
+import java.util.Iterator;
import java.util.Map;
import java.util.Properties;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.mapred.JobConf;
@@ -45,7 +54,7 @@
import org.apache.pig.backend.executionengine.ExecJob;
import org.apache.pig.backend.executionengine.ExecPhysicalOperator;
import org.apache.pig.backend.executionengine.ExecutionEngine;
-import org.apache.pig.backend.hadoop.datastorage.HConfiguration;
+import org.apache.pig.backend.hadoop.datastorage.ConfigurationUtil;
import org.apache.pig.backend.hadoop.datastorage.HDataStorage;
import org.apache.pig.builtin.BinStorage;
import org.apache.pig.impl.PigContext;
@@ -66,12 +75,18 @@
public class HExecutionEngine implements ExecutionEngine {
+ private static final String HOD_SERVER = "hod.server";
+ public static final String JOB_TRACKER_LOCATION = "mapred.job.tracker";
+ private static final String FILE_SYSTEM_LOCATION = "fs.default.name";
+
private final Log log = LogFactory.getLog(getClass());
+ private static final String LOCAL = "local";
+
+ private StringBuilder hodParams = null;
protected PigContext pigContext;
protected DataStorage ds;
- protected HConfiguration conf;
protected JobSubmissionProtocol jobTracker;
protected JobClient jobClient;
@@ -85,10 +100,8 @@
// map from LOGICAL key to into about the execution
protected Map<OperatorKey, MapRedResult> materializedResults;
- public HExecutionEngine(PigContext pigContext,
- HConfiguration conf) {
+ public HExecutionEngine(PigContext pigContext) {
this.pigContext = pigContext;
- this.conf = conf;
this.logicalToPhysicalKeys = new HashMap<OperatorKey, OperatorKey>();
this.physicalOpTable = new HashMap<OperatorKey, ExecPhysicalOperator>();
this.materializedResults = new HashMap<OperatorKey, MapRedResult>();
@@ -108,10 +121,6 @@
return this.materializedResults;
}
- public HExecutionEngine(PigContext pigContext) {
- this(pigContext, new HConfiguration(new JobConf()));
- }
-
public Map<OperatorKey, ExecPhysicalOperator> getPhysicalOpTable() {
return this.physicalOpTable;
}
@@ -120,69 +129,85 @@
public DataStorage getDataStorage() {
return this.ds;
}
-
- private void setJobtrackerLocation(String newLocation) {
- conf.put("mapred.job.tracker", newLocation);
- }
-
- private void setFilesystemLocation(String newLocation) {
- conf.put("fs.default.name", newLocation);
- }
public void init() throws ExecException {
+ init(this.pigContext.getProperties());
+ }
+
+ public void init(Properties properties) throws ExecException {
//First set the ssh socket factory
setSSHFactory();
- String hodServer = System.getProperty("hod.server");
+ String hodServer = properties.getProperty(HOD_SERVER);
+ String cluster = null;
+ String nameNode = null;
+ Configuration configuration = null;
if (hodServer != null && hodServer.length() > 0) {
- String hdfsAndMapred[] = doHod(hodServer);
- setFilesystemLocation(hdfsAndMapred[0]);
- setJobtrackerLocation(hdfsAndMapred[1]);
+ String hdfsAndMapred[] = doHod(hodServer, properties);
+ properties.setProperty(FILE_SYSTEM_LOCATION, hdfsAndMapred[0]);
+ properties.setProperty(JOB_TRACKER_LOCATION, hdfsAndMapred[1]);
}
else {
- String cluster = System.getProperty("cluster");
+
+ // We need to build a configuration object first in the manner described below
+ // and then get back a properties object to inspect the JOB_TRACKER_LOCATION
+ // and FILE_SYSTEM_LOCATION. The reason to do this is if we looked only at
+ // the existing properties object, we may not get the right settings. So we want
+ // to read the configurations in the order specified below and only then look
+ // for JOB_TRACKER_LOCATION and FILE_SYSTEM_LOCATION.
+
+ // Hadoop by default specifies two resources, loaded in-order from the classpath:
+ // 1. hadoop-default.xml : Read-only defaults for hadoop.
+ // 2. hadoop-site.xml: Site-specific configuration for a given hadoop installation.
+ // Now add the settings from "properties" object to override any existing properties
+ // All of the above is accomplished in the method call below
+ configuration = ConfigurationUtil.toConfiguration(properties);
+ properties = ConfigurationUtil.toProperties(configuration);
+ cluster = properties.getProperty(JOB_TRACKER_LOCATION);
+ nameNode = properties.getProperty(FILE_SYSTEM_LOCATION);
+
if (cluster != null && cluster.length() > 0) {
- if(cluster.indexOf(':') < 0) {
+ if(!cluster.contains(":") && !cluster.equalsIgnoreCase(LOCAL)) {
cluster = cluster + ":50020";
}
- setJobtrackerLocation(cluster);
+ properties.setProperty(JOB_TRACKER_LOCATION, cluster);
}
- String nameNode = System.getProperty("namenode");
if (nameNode!=null && nameNode.length() > 0) {
- if(nameNode.indexOf(':') < 0) {
+ if(!nameNode.contains(":") && !nameNode.equalsIgnoreCase(LOCAL)) {
nameNode = nameNode + ":8020";
}
- setFilesystemLocation(nameNode);
+ properties.setProperty(FILE_SYSTEM_LOCATION, nameNode);
}
}
- log.info("Connecting to hadoop file system at: " + conf.get("fs.default.name"));
-
- try {
- ds = new HDataStorage(conf);
- }
- catch (IOException e) {
- throw new ExecException("Failed to create DataStorage", e);
- }
-
- String jobTrackerName = conf.get("mapred.job.tracker").toString();
- log.info("Connecting to map-reduce job tracker at: " + jobTrackerName);
+ log.info("Connecting to hadoop file system at: " + (nameNode==null? LOCAL: nameNode) ) ;
+ ds = new HDataStorage(properties);
+
+ // The above HDataStorage constructor sets DEFAULT_REPLICATION_FACTOR_KEY in properties.
+ // So we need to reconstruct the configuration object for the non HOD case
+ // In the HOD case, this is the first time the configuration object will be created
+ configuration = ConfigurationUtil.toConfiguration(properties);
- try {
- if(!jobTrackerName.equalsIgnoreCase("local"))
- jobTracker = (JobSubmissionProtocol) RPC.getProxy(JobSubmissionProtocol.class,
- JobSubmissionProtocol.versionID,
- JobTracker.getAddress(conf.getConfiguration()),
- conf.getConfiguration());
- }
- catch (IOException e) {
- throw new ExecException("Failed to crate job tracker", e);
+
+ if(cluster != null && !cluster.equalsIgnoreCase(LOCAL)){
+ log.info("Connecting to map-reduce job tracker at: " + properties.get(JOB_TRACKER_LOCATION));
+ if (!LOCAL.equalsIgnoreCase(cluster)) {
+ try {
+ jobTracker = (JobSubmissionProtocol) RPC.getProxy(
+ JobSubmissionProtocol.class,
+ JobSubmissionProtocol.versionID, JobTracker
+ .getAddress(configuration), configuration);
+ } catch (IOException e) {
+ throw new ExecException("Failed to crate job tracker", e);
+ }
+ }
}
try {
- jobClient = new JobClient(new JobConf(conf.getConfiguration()));
+ // Set job-specific configuration knobs
+ jobClient = new JobClient(new JobConf(configuration));
}
catch (IOException e) {
throw new ExecException("Failed to create job client", e);
@@ -190,26 +215,16 @@
}
public void close() throws ExecException {
- ;
+ closeHod(pigContext.getProperties().getProperty("hod.server"));
}
public Properties getConfiguration() throws ExecException {
- return this.conf;
+ return this.pigContext.getProperties();
}
public void updateConfiguration(Properties newConfiguration)
throws ExecException {
- Enumeration keys = newConfiguration.propertyNames();
-
- while (keys.hasMoreElements()) {
- Object obj = keys.nextElement();
-
- if (obj instanceof String) {
- String str = (String) obj;
-
- conf.put(str, newConfiguration.get(str));
- }
- }
+ init(newConfiguration);
}
public Map<String, Object> getStatistics() throws ExecException {
@@ -305,7 +320,8 @@
}
private void setSSHFactory(){
- String g = System.getProperty("ssh.gateway");
+ Properties properties = this.pigContext.getProperties();
+ String g = properties.getProperty("ssh.gateway");
if (g == null || g.length() == 0) return;
try {
Class clazz = Class.forName("org.apache.pig.shock.SSHSocketImplFactory");
@@ -321,136 +337,98 @@
//To prevent doing hod if the pig server is constructed multiple times
private static String hodMapRed;
private static String hodHDFS;
+ private String hodConfDir = null;
+ private String remoteHodConfDir = null;
+ private Process hodProcess = null;
- private enum ParsingState {
- NOTHING, HDFSUI, MAPREDUI, HDFS, MAPRED, HADOOPCONF
- };
+ class ShutdownThread extends Thread{
+ public synchronized void run() {
+ closeHod(pigContext.getProperties().getProperty("hod.server"));
+ }
+ }
- private String[] doHod(String server) throws ExecException {
+ private String[] doHod(String server, Properties properties) throws ExecException {
if (hodMapRed != null) {
return new String[] {hodHDFS, hodMapRed};
}
try {
- Process p = null;
- // Make the kryptonite released version the default if nothing
- // else is specified.
- StringBuilder cmd = new StringBuilder();
- cmd.append(System.getProperty("hod.expect.root"));
- cmd.append('/');
- cmd.append("libexec/pig/");
- cmd.append(System.getProperty("hod.expect.uselatest"));
- cmd.append('/');
- cmd.append(System.getProperty("hod.command"));
-
- String cluster = System.getProperty("yinst.cluster");
-
- // TODO This is a Yahoo specific holdover, need to remove
- // this.
- if (cluster != null && cluster.length() > 0 && !cluster.startsWith("kryptonite")) {
- cmd.append(" --config=");
- cmd.append(System.getProperty("hod.config.dir"));
- cmd.append('/');
- cmd.append(cluster);
- }
+ // first, create temp director to store the configuration
+ hodConfDir = createTempDir(server);
+
+ //jz: fallback to systemproperty cause this not handled in Main
+ hodParams = new StringBuilder(properties.getProperty(
+ "hod.param", System.getProperty("hod.param", "")));
+ // get the number of nodes out of the command or use default
+ int nodes = getNumNodes(hodParams);
+
+ // command format: hod allocate - d <cluster_dir> -n <number_of_nodes> <other params>
+ String[] fixedCmdArray = new String[] { "hod", "allocate", "-d",
+ hodConfDir, "-n", Integer.toString(nodes) };
+ String[] extraParams = hodParams.toString().split(" ");
+
+ String[] cmdarray = new String[fixedCmdArray.length + extraParams.length];
+ System.arraycopy(fixedCmdArray, 0, cmdarray, 0, fixedCmdArray.length);
+ System.arraycopy(extraParams, 0, cmdarray, fixedCmdArray.length, extraParams.length);
+
+ log.info("Connecting to HOD...");
+ log.debug("sending HOD command " + cmdToString(cmdarray));
- cmd.append(" " + System.getProperty("hod.param", ""));
+ // setup shutdown hook to make sure we tear down hod connection
+ Runtime.getRuntime().addShutdownHook(new ShutdownThread());
- if (server.equals("local")) {
- p = Runtime.getRuntime().exec(cmd.toString());
- }
- else {
- SSHSocketImplFactory fac = SSHSocketImplFactory.getFactory(server);
- p = fac.ssh(cmd.toString());
- }
-
- InputStream is = p.getInputStream();
+ hodProcess = runCommand(server, cmdarray);
- log.info("Connecting to HOD...");
- log.debug("sending HOD command " + cmd.toString());
+ // print all the information provided by HOD
+ try {
+ BufferedReader br = new BufferedReader(new InputStreamReader(hodProcess.getErrorStream()));
+ String msg;
+ while ((msg = br.readLine()) != null)
+ log.info(msg);
+ br.close();
+ } catch(IOException ioe) {}
+
+ // for remote connection we need to bring the file locally
+ if (!server.equals(LOCAL))
+ hodConfDir = copyHadoopConfLocally(server);
- StringBuffer sb = new StringBuffer();
- int c;
- String hdfsUI = null;
- String mapredUI = null;
String hdfs = null;
String mapred = null;
- String hadoopConf = null;
+ String hadoopConf = hodConfDir + "/hadoop-site.xml";
- ParsingState current = ParsingState.NOTHING;
+ log.info ("Hadoop configuration file: " + hadoopConf);
- while((c = is.read()) != -1 && mapred == null) {
- if (c == '\n' || c == '\r') {
- switch(current) {
- case HDFSUI:
- hdfsUI = sb.toString().trim();
- log.info("HDFS Web UI: " + hdfsUI);
- break;
- case HDFS:
- hdfs = sb.toString().trim();
- log.info("HDFS: " + hdfs);
- break;
- case MAPREDUI:
- mapredUI = sb.toString().trim();
- log.info("JobTracker Web UI: " + mapredUI);
- break;
- case MAPRED:
- mapred = sb.toString().trim();
- log.info("JobTracker: " + mapred);
- break;
- case HADOOPCONF:
- hadoopConf = sb.toString().trim();
- log.info("HadoopConf: " + hadoopConf);
- break;
- }
- current = ParsingState.NOTHING;
- sb = new StringBuffer();
- }
- sb.append((char)c);
- if (sb.indexOf("hdfsUI:") != -1) {
- current = ParsingState.HDFSUI;
- sb = new StringBuffer();
- }
- else if (sb.indexOf("hdfs:") != -1) {
- current = ParsingState.HDFS;
- sb = new StringBuffer();
- }
- else if (sb.indexOf("mapredUI:") != -1) {
- current = ParsingState.MAPREDUI;
- sb = new StringBuffer();
- }
- else if (sb.indexOf("mapred:") != -1) {
- current = ParsingState.MAPRED;
- sb = new StringBuffer();
- }
- else if (sb.indexOf("hadoopConf:") != -1) {
- current = ParsingState.HADOOPCONF;
- sb = new StringBuffer();
- }
- }
-
- hdfsUI = fixUpDomain(hdfsUI);
- hdfs = fixUpDomain(hdfs);
- mapredUI = fixUpDomain(mapredUI);
- mapred = fixUpDomain(mapred);
+ JobConf jobConf = new JobConf(hadoopConf);
+ jobConf.addResource("pig-cluster-hadoop-site.xml");
+
+ // We need to load the properties from the hadoop configuration
+ // file we just found in the hod dir. We want these to override
+ // any existing properties we have.
+ if (jobConf != null) {
+ Iterator<Map.Entry<String, String>> iter = jobConf.iterator();
+ while (iter.hasNext()) {
+ Map.Entry<String, String> entry = iter.next();
+ properties.put(entry.getKey(), entry.getValue());
+ }
+ }
+
+ hdfs = properties.getProperty(FILE_SYSTEM_LOCATION);
+ if (hdfs == null)
+ throw new ExecException("Missing fs.default.name from hadoop configuration");
+ log.info("HDFS: " + hdfs);
+
+ mapred = properties.getProperty(JOB_TRACKER_LOCATION);
+ if (mapred == null)
+ throw new ExecException("Missing mapred.job.tracker from hadoop configuration");
+ log.info("JobTracker: " + mapred);
+
+ // this is not longer needed as hadoop-site.xml given to us by HOD
+ // contains data in the correct format
+ // hdfs = fixUpDomain(hdfs, properties);
+ // mapred = fixUpDomain(mapred, properties);
hodHDFS = hdfs;
hodMapRed = mapred;
- if (hadoopConf != null) {
- JobConf jobConf = new JobConf(hadoopConf);
- jobConf.addResource("pig-cluster-hadoop-site.xml");
-
- conf = new HConfiguration(jobConf);
-
- // make sure that files on class path are used
- System.out.println("Job Conf = " + conf);
- System.out.println("dfs.block.size= " + conf.get("dfs.block.size"));
- System.out.println("ipc.client.timeout= " + conf.get("ipc.client.timeout"));
- System.out.println("mapred.child.java.opts= " + conf.get("mapred.child.java.opts"));
- }
- else {
- throw new IOException("Missing Hadoop configuration file");
- }
return new String[] {hdfs, mapred};
}
catch (Exception e) {
@@ -460,13 +438,117 @@
}
}
- private String fixUpDomain(String hostPort) throws UnknownHostException {
- String parts[] = hostPort.split(":");
- if (parts[0].indexOf('.') == -1) {
- parts[0] = parts[0] + ".inktomisearch.com";
+ private synchronized void closeHod(String server){
+ if (hodProcess == null)
+ return;
+
+ // hod deallocate format: hod deallocate -d <conf dir>
+ String[] cmdarray = new String[4];
+ cmdarray[0] = "hod";
+ cmdarray[1] = "deallocate";
+ cmdarray[2] = "-d";
+ if (remoteHodConfDir != null)
+ cmdarray[3] = remoteHodConfDir;
+ else
+ cmdarray[3] = hodConfDir;
+
+ log.info("Disconnecting from HOD...");
+ log.debug("Disconnect command: " + cmdToString(cmdarray));
+
+ try {
+ Process p = runCommand(server, cmdarray);
+ } catch (Exception e) {
+ log.warn("Failed to disconnect from HOD; error: " + e.getMessage());
+ } finally {
+ if (remoteHodConfDir != null)
+ deleteDir(server, remoteHodConfDir);
+ deleteDir(LOCAL, hodConfDir);
+ }
+
+ hodProcess = null;
+ }
+
+ private String copyHadoopConfLocally(String server) throws ExecException {
+ String localDir = createTempDir(LOCAL);
+ String remoteFile = new String(hodConfDir + "/hadoop-site.xml");
+ String localFile = new String(localDir + "/hadoop-site.xml");
+
+ remoteHodConfDir = hodConfDir;
+
+ String[] cmdarray = new String[2];
+ cmdarray[0] = "cat";
+ cmdarray[1] = remoteFile;
+
+ Process p = runCommand(server, cmdarray);
+
+ BufferedWriter bw;
+ try {
+ bw = new BufferedWriter(new OutputStreamWriter(new FileOutputStream(localFile)));
+ } catch (Exception e){
+ throw new ExecException("Failed to create local hadoop file " + localFile, e);
+ }
+
+ try {
+ BufferedReader br = new BufferedReader(new InputStreamReader(p.getInputStream()));
+ String line;
+ while ((line = br.readLine()) != null){
+ bw.write(line, 0, line.length());
+ bw.newLine();
+ }
+ br.close();
+ bw.close();
+ } catch (Exception e){
+ throw new ExecException("Failed to copy data to local hadoop file " + localFile, e);
+ }
+
+ return localDir;
+ }
+
+ private String cmdToString(String[] cmdarray) {
+ StringBuilder cmd = new StringBuilder();
+
+ for (int i = 0; i < cmdarray.length; i++) {
+ cmd.append(cmdarray[i]);
+ cmd.append(' ');
+ }
+
+ return cmd.toString();
+ }
+ private Process runCommand(String server, String[] cmdarray) throws ExecException {
+ Process p;
+ try {
+ if (server.equals(LOCAL)) {
+ p = Runtime.getRuntime().exec(cmdarray);
+ }
+ else {
+ SSHSocketImplFactory fac = SSHSocketImplFactory.getFactory(server);
+ p = fac.ssh(cmdToString(cmdarray));
+ }
+
+ //this should return as soon as connection is shutdown
+ int rc = p.waitFor();
+ if (rc != 0) {
+ String errMsg = new String();
+ try {
+ BufferedReader br = new BufferedReader(new InputStreamReader(p.getErrorStream()));
+ errMsg = br.readLine();
+ br.close();
+ } catch (IOException ioe) {}
+ StringBuilder msg = new StringBuilder("Failed to run command ");
+ msg.append(cmdToString(cmdarray));
+ msg.append(" on server ");
+ msg.append(server);
+ msg.append("; return code: ");
+ msg.append(rc);
+ msg.append("; error: ");
+ msg.append(errMsg);
+ throw new ExecException(msg.toString());
+ }
+ } catch (Exception e){
+ throw new ExecException(e);
}
- InetAddress.getByName(parts[0]);
- return parts[0] + ":" + parts[1];
+
+ return p;
}
private FileSpec checkLeafIsStore(PhysicalPlan plan) throws ExecException {
@@ -491,8 +573,145 @@
throw new ExecException(e);
}
}
+ private void deleteDir(String server, String dir) {
+ if (server.equals(LOCAL)){
+ File path = new File(dir);
+ deleteLocalDir(path);
+ }
+ else {
+ // send rm command over ssh
+ String[] cmdarray = new String[3];
+ cmdarray[0] = "rm";
+ cmdarray[1] = "-rf";
+ cmdarray[2] = dir;
+
+ try{
+ Process p = runCommand(server, cmdarray);
+ }catch(Exception e){
+ log.warn("Failed to remove HOD configuration directory - " + dir);
+ }
+ }
+ }
+
+ private void deleteLocalDir(File path){
+ File[] files = path.listFiles();
+ int i;
+ for (i = 0; i < files.length; i++){
+ if (files[i].isHidden())
+ continue;
+ if (files[i].isFile())
+ files[i].delete();
+ else if (files[i].isDirectory())
+ deleteLocalDir(files[i]);
+ }
+
+ path.delete();
+ }
+
+ private String fixUpDomain(String hostPort,Properties properties) throws UnknownHostException {
+ URI uri = null;
+ try {
+ uri = new URI(hostPort);
+ } catch (URISyntaxException use) {
+ throw new RuntimeException("Illegal hostPort: " + hostPort);
+ }
+
+ String hostname = uri.getHost();
+ int port = uri.getPort();
+
+ // Parse manually if hostPort wasn't non-opaque URI
+ // e.g. hostPort is "myhost:myport"
+ if (hostname == null || port == -1) {
+ String parts[] = hostPort.split(":");
+ hostname = parts[0];
+ port = Integer.valueOf(parts[1]);
+ }
+
+ if (hostname.indexOf('.') == -1) {
+ //jz: fallback to systemproperty cause this not handled in Main
+ String domain = properties.getProperty("cluster.domain",System.getProperty("cluster.domain"));
+ if (domain == null)
+ throw new RuntimeException("Missing cluster.domain property!");
+ hostname = hostname + "." + domain;
+ }
+ InetAddress.getByName(hostname);
+ return hostname + ":" + Integer.toString(port);
+ }
+
+ // create temp dir to store hod output; removed on exit
+ // format: <tempdir>/PigHod.<host name>.<user name>.<nanosecondts>
+ private String createTempDir(String server) throws ExecException {
+ StringBuilder tempDirPrefix = new StringBuilder ();
+
+ if (server.equals(LOCAL))
+ tempDirPrefix.append(System.getProperty("java.io.tmpdir"));
+ else
+ // for remote access we assume /tmp as temp dir
+ tempDirPrefix.append("/tmp");
+
+ tempDirPrefix.append("/PigHod.");
+ try {
+ tempDirPrefix.append(InetAddress.getLocalHost().getHostName());
+ tempDirPrefix.append(".");
+ } catch (UnknownHostException e) {}
+
+ tempDirPrefix.append(System.getProperty("user.name"));
+ tempDirPrefix.append(".");
+ String path;
+ do {
+ path = tempDirPrefix.toString() + System.nanoTime();
+ } while (!createDir(server, path));
+
+ return path;
+ }
+
+ private boolean createDir(String server, String dir) throws ExecException{
+ if (server.equals(LOCAL)){
+ // create local directory
+ File tempDir = new File(dir);
+ boolean success = tempDir.mkdir();
+ if (!success)
+ log.warn("Failed to create HOD configuration directory - " + dir + ". Retrying ...");
+
+ return success;
+ }
+ else {
+ String[] cmdarray = new String[2];
+ cmdarray[0] = "mkdir ";
+ cmdarray[1] = dir;
+
+ try{
+ Process p = runCommand(server, cmdarray);
+ }
+ catch(ExecException e){
+ log.warn("Failed to create HOD configuration directory - " + dir + "Retrying...");
+ return false;
+ }
+
+ return true;
+ }
+ }
-
+ // returns number of nodes based on -m option in hodParams if present;
+ // otherwise, default is used; -m is removed from the params
+ int getNumNodes(StringBuilder hodParams) {
+ String val = hodParams.toString();
+ int startPos = val.indexOf("-m ");
+ if (startPos == -1)
+ startPos = val.indexOf("-m\t");
+ if (startPos != -1) {
+ int curPos = startPos + 3;
+ int len = val.length();
+ while (curPos < len && Character.isWhitespace(val.charAt(curPos))) curPos ++;
+ int numStartPos = curPos;
+ while (curPos < len && Character.isDigit(val.charAt(curPos))) curPos ++;
+ int nodes = Integer.parseInt(val.substring(numStartPos, curPos));
+ hodParams.delete(startPos, curPos);
+ return nodes;
+ } else {
+ return Integer.getInteger("hod.nodes", 15);
+ }
+ }
}
Modified: incubator/pig/branches/types/src/org/apache/pig/backend/local/executionengine/LocalExecutionEngine.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/backend/local/executionengine/LocalExecutionEngine.java?rev=678090&r1=678089&r2=678090&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/backend/local/executionengine/LocalExecutionEngine.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/backend/local/executionengine/LocalExecutionEngine.java Fri Jul 18 18:43:31 2008
@@ -94,13 +94,12 @@
}
public Properties getConfiguration() throws ExecException {
- Properties conf = new Properties();
- return conf;
+ return this.pigContext.getProperties();
}
public void updateConfiguration(Properties newConfiguration)
throws ExecException {
- ;
+ // there is nothing to do here.
}
public Map<String, Object> getStatistics() throws ExecException {
Modified: incubator/pig/branches/types/src/org/apache/pig/impl/PigContext.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/PigContext.java?rev=678090&r1=678089&r2=678090&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/impl/PigContext.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/impl/PigContext.java Fri Jul 18 18:43:31 2008
@@ -63,7 +63,8 @@
private transient final Log log = LogFactory.getLog(getClass());
- private static final String JOB_NAME_PREFIX= "PigLatin";
+ public static final String JOB_NAME = "jobName";
+ public static final String JOB_NAME_PREFIX= "PigLatin";
/* NOTE: we only serialize some of the stuff
*
@@ -75,7 +76,7 @@
private ExecType execType;;
// configuration for connecting to hadoop
- transient private Properties conf = new Properties();
+ private Properties conf = new Properties();
// extra jar files that are needed to run a job
transient public List<URL> extraJars = new LinkedList<URL>();
@@ -94,6 +95,8 @@
private String jobName = JOB_NAME_PREFIX; // can be overwritten by users
+ private Properties properties;
+
/**
* a table mapping function names to function specs.
*/
@@ -101,16 +104,16 @@
private static ArrayList<String> packageImportList = new ArrayList<String>();
- public boolean debug = true;
+ public boolean debug = true;
public PigContext() {
- this(ExecType.MAPREDUCE);
+ this(ExecType.MAPREDUCE, new Properties());
}
- public PigContext(ExecType execType){
+ public PigContext(ExecType execType, Properties properties){
this.execType = execType;
+ this.properties = properties;
- initProperties();
String pigJar = JarManager.findContainingJar(Main.class);
String hadoopJar = JarManager.findContainingJar(FileSystem.class);
if (pigJar != null) {
@@ -128,48 +131,14 @@
packageImportList.add("com.yahoo.pig.yst.sds.ULT.");
packageImportList.add("org.apache.pig.impl.builtin.");
}
-
- private void initProperties() {
- Properties fileProperties = new Properties();
-
- try{
- // first read the properties in the jar file
- InputStream pis = MapReduceLauncher.class.getClassLoader().getResourceAsStream("properties");
- if (pis != null) {
- fileProperties.load(pis);
- }
-
- //then read the properties in the home directory
- try{
- pis = new FileInputStream(System.getProperty("user.home") + "/.pigrc");
- }catch(IOException e){}
- if (pis != null) {
- fileProperties.load(pis);
- }
- }catch (IOException e){
- log.error(e);
- throw new RuntimeException(e);
- }
-
- //Now set these as system properties only if they are not already defined.
- for (Object o: fileProperties.keySet()){
- String propertyName = (String)o;
- log.debug("Found system property " + propertyName + " in .pigrc");
- if (System.getProperty(propertyName) == null){
- System.setProperty(propertyName, fileProperties.getProperty(propertyName));
- log.debug("Setting system property " + propertyName);
- }
- }
- }
public void connect() throws ExecException {
- try {
- switch (execType) {
+ switch (execType) {
case LOCAL:
{
lfs = new HDataStorage(URI.create("file:///"),
- new Configuration());
+ new Properties());
dfs = lfs;
executionEngine = new LocalExecutionEngine(this);
@@ -185,7 +154,7 @@
dfs = executionEngine.getDataStorage();
lfs = new HDataStorage(URI.create("file:///"),
- new Configuration());
+ new Properties());
}
break;
@@ -193,19 +162,8 @@
{
throw new ExecException("Unkown execType: " + execType);
}
- }
}
- catch (IOException e) {
- ;
- }
- }
-
- public void setJobName(String name){
- jobName = JOB_NAME_PREFIX + ":" + name;
- }
- public String getJobName(){
- return jobName;
}
public void setJobtrackerLocation(String newLocation) {
@@ -278,7 +236,7 @@
throw WrappedIOException.wrap("Unable to copy " + src + " to " + dst + (localDst ? "locally" : ""), e);
}
- srcElement.copy(dstElement, conf,false);
+ srcElement.copy(dstElement, this.properties, false);
}
public ExecutionEngine getExecutionEngine() {
@@ -293,8 +251,21 @@
return lfs;
}
+ /**
+ * Provides configuration information.
+ *
+ * @return - information about the configuration used to connect to
+ * execution engine
+ */
+ public Properties getProperties() {
+ return this.properties;
+ }
+
+ /**
+ * @deprecated use {@link #getProperties()} instead
+ */
public Properties getConf() {
- return conf;
+ return getProperties();
}
/**
Modified: incubator/pig/branches/types/src/org/apache/pig/impl/io/FileLocalizer.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/io/FileLocalizer.java?rev=678090&r1=678089&r2=678090&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/impl/io/FileLocalizer.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/impl/io/FileLocalizer.java Fri Jul 18 18:43:31 2008
@@ -29,6 +29,7 @@
import java.util.Iterator;
import java.util.Random;
import java.util.Stack;
+import java.util.Properties ;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -41,6 +42,7 @@
import org.apache.pig.backend.hadoop.datastorage.HDataStorage;
import org.apache.pig.impl.PigContext;
import org.apache.pig.impl.util.WrappedIOException;
+import org.apache.pig.backend.hadoop.datastorage.ConfigurationUtil;
public class FileLocalizer {
private static final Log log = LogFactory.getLog(FileLocalizer.class);
@@ -156,8 +158,8 @@
}
*/
- public static InputStream openDFSFile(String fileName, JobConf conf) throws IOException{
- DataStorage dds = new HDataStorage(conf);
+ public static InputStream openDFSFile(String fileName, Properties properties) throws IOException{
+ DataStorage dds = new HDataStorage(properties);
ElementDescriptor elem = dds.asElement(fileName);
return openDFSFile(elem);
}
@@ -318,20 +320,24 @@
initialized = true;
relativeRoot = pigContext.getDfs().asContainer("/tmp/temp" + r.nextInt());
toDelete.push(relativeRoot);
- Runtime.getRuntime().addShutdownHook(new Thread() {
- @Override
- public void run() {
- while (!toDelete.empty()) {
- try {
- ElementDescriptor elem = toDelete.pop();
- elem.delete();
- }
- catch (IOException e) {
- log.error(e);
- }
- }
- }
- });
+ // Runtime.getRuntime().addShutdownHook(new Thread() {
+ // @Override
+ // public void run() {
+ // deleteTempFiles();
+ // }
+ //});
+ }
+ }
+
+ public static void deleteTempFiles() {
+ while (!toDelete.empty()) {
+ try {
+ ElementDescriptor elem = toDelete.pop();
+ elem.delete();
+ }
+ catch (IOException e) {
+ log.error(e);
+ }
}
}
Modified: incubator/pig/branches/types/src/org/apache/pig/impl/mapReduceLayer/MapReduceLauncher.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/mapReduceLayer/MapReduceLauncher.java?rev=678090&r1=678089&r2=678090&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/impl/mapReduceLayer/MapReduceLauncher.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/impl/mapReduceLayer/MapReduceLauncher.java Fri Jul 18 18:43:31 2008
@@ -12,6 +12,7 @@
import org.apache.hadoop.mapred.jobcontrol.JobControl;
import org.apache.pig.backend.executionengine.ExecException;
import org.apache.pig.backend.executionengine.ExecutionEngine;
+import org.apache.pig.backend.hadoop.datastorage.ConfigurationUtil;
import org.apache.pig.backend.hadoop.datastorage.HConfiguration;
import org.apache.pig.backend.hadoop.executionengine.HExecutionEngine;
import org.apache.pig.impl.PigContext;
@@ -41,7 +42,7 @@
comp.compile();
ExecutionEngine exe = pc.getExecutionEngine();
- Configuration conf = ((HConfiguration)exe.getConfiguration()).getConfiguration();
+ Configuration conf = ConfigurationUtil.toConfiguration(exe.getConfiguration());
JobClient jobClient = ((HExecutionEngine)exe).getJobClient();
MROperPlan mrp = comp.getMRPlan();
Modified: incubator/pig/branches/types/src/org/apache/pig/impl/mapReduceLayer/SortPartitioner.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/mapReduceLayer/SortPartitioner.java?rev=678090&r1=678089&r2=678090&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/impl/mapReduceLayer/SortPartitioner.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/impl/mapReduceLayer/SortPartitioner.java Fri Jul 18 18:43:31 2008
@@ -27,6 +27,7 @@
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.Partitioner;
+import org.apache.pig.backend.hadoop.datastorage.ConfigurationUtil;
import org.apache.pig.backend.executionengine.ExecException;
import org.apache.pig.builtin.BinStorage;
import org.apache.pig.data.DataBag;
@@ -52,7 +53,7 @@
throw new RuntimeException("Sort paritioner used but no quantiles found");
try{
- InputStream is = FileLocalizer.openDFSFile(quantilesFile,job);
+ InputStream is = FileLocalizer.openDFSFile(quantilesFile,ConfigurationUtil.toProperties(job));
BinStorage loader = new BinStorage();
loader.bindTo(quantilesFile, new BufferedPositionedInputStream(is), 0, Long.MAX_VALUE);
Added: incubator/pig/branches/types/src/org/apache/pig/impl/util/ConfigurationValidator.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/util/ConfigurationValidator.java?rev=678090&view=auto
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/impl/util/ConfigurationValidator.java (added)
+++ incubator/pig/branches/types/src/org/apache/pig/impl/util/ConfigurationValidator.java Fri Jul 18 18:43:31 2008
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.pig.impl.util;
+
+import java.util.Properties;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+public class ConfigurationValidator {
+
+ private final static Log log = LogFactory.getLog(PropertiesUtil.class);
+ /***
+ * All pig configurations should be validated in here before use
+ * @param properties
+ */
+
+ public static void validatePigProperties(Properties properties) {
+ ensureLongType(properties, "pig.spill.size.threshold", 0L) ;
+ ensureLongType(properties, "pig.spill.gc.activation.size", Long.MAX_VALUE) ;
+ }
+
+ private static void ensureLongType(Properties properties,
+ String key,
+ long defaultValue) {
+ String str = properties.getProperty(key) ;
+ if (str != null) {
+ try {
+ Long.parseLong(str) ;
+ }
+ catch (NumberFormatException nfe) {
+ log.error(str + " has to be parsable to long") ;
+ properties.setProperty(key, Long.toString(defaultValue)) ;
+ }
+ }
+ else {
+ properties.setProperty(key, Long.toString(defaultValue)) ;
+ }
+ }
+}
Modified: incubator/pig/branches/types/src/org/apache/pig/tools/grunt/GruntParser.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/tools/grunt/GruntParser.java?rev=678090&r1=678089&r2=678090&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/tools/grunt/GruntParser.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/tools/grunt/GruntParser.java Fri Jul 18 18:43:31 2008
@@ -110,7 +110,7 @@
mDfs = mPigServer.getPigContext().getDfs();
mLfs = mPigServer.getPigContext().getLfs();
- mConf = mPigServer.getPigContext().getConf();
+ mConf = mPigServer.getPigContext().getProperties();
// TODO: this violates the abstraction layer decoupling between
// front end and back end and needs to be changed.
Modified: incubator/pig/branches/types/test/org/apache/pig/test/MiniCluster.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/test/org/apache/pig/test/MiniCluster.java?rev=678090&r1=678089&r2=678090&view=diff
==============================================================================
--- incubator/pig/branches/types/test/org/apache/pig/test/MiniCluster.java (original)
+++ incubator/pig/branches/types/test/org/apache/pig/test/MiniCluster.java Fri Jul 18 18:43:31 2008
@@ -18,6 +18,7 @@
package org.apache.pig.test;
import java.io.*;
+import java.util.Properties;
import org.apache.hadoop.dfs.MiniDFSCluster;
import org.apache.hadoop.mapred.MiniMRCluster;
@@ -25,6 +26,7 @@
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.fs.FileSystem;
+import org.apache.pig.backend.hadoop.datastorage.ConfigurationUtil;
/**
* This class builds a single instance of itself with the Singleton
@@ -98,5 +100,9 @@
}
if (m_dfs != null) { m_dfs.shutdown(); }
if (m_mr != null) { m_mr.shutdown(); }
+ }
+
+ public Properties getProperties() {
+ return ConfigurationUtil.toProperties(m_conf);
}
}
Modified: incubator/pig/branches/types/test/org/apache/pig/test/TestAlgebraicEval.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/test/org/apache/pig/test/TestAlgebraicEval.java?rev=678090&r1=678089&r2=678090&view=diff
==============================================================================
--- incubator/pig/branches/types/test/org/apache/pig/test/TestAlgebraicEval.java (original)
+++ incubator/pig/branches/types/test/org/apache/pig/test/TestAlgebraicEval.java Fri Jul 18 18:43:31 2008
@@ -24,8 +24,10 @@
import junit.framework.TestCase;
+import org.junit.Before;
import org.junit.Test;
+import org.apache.pig.ExecType;
import org.apache.pig.PigServer;
import org.apache.pig.builtin.PigStorage;
import org.apache.pig.data.DataByteArray;
@@ -34,12 +36,21 @@
public class TestAlgebraicEval extends TestCase {
- private String initString = "mapreduce";
+ private int LOOP_COUNT = 1024;
+
+
+ private PigServer pig;
+
+ @Before
+ @Override
+ protected void setUp() throws Exception {
+ pig = new PigServer(ExecType.MAPREDUCE, cluster.getProperties());
+ }
+
+
MiniCluster cluster = MiniCluster.buildCluster();
@Test
public void testGroupCountWithMultipleFields() throws Throwable {
- int LOOP_COUNT = 1024;
- PigServer pig = new PigServer(initString);
File tmpFile = File.createTempFile("test", "txt");
PrintStream ps = new PrintStream(new FileOutputStream(tmpFile));
for(int i = 0; i < LOOP_COUNT; i++) {
@@ -69,8 +80,6 @@
@Test
public void testSimpleCount() throws Exception {
- long LOOP_COUNT = 1024;
- PigServer pig = new PigServer(initString);
File tmpFile = File.createTempFile("test", "txt");
PrintStream ps = new PrintStream(new FileOutputStream(tmpFile));
for(int i = 0; i < LOOP_COUNT; i++) {
@@ -89,8 +98,6 @@
@Test
public void testGroupCount() throws Throwable {
- long LOOP_COUNT = 1024;
- PigServer pig = new PigServer(initString);
File tmpFile = File.createTempFile("test", "txt");
PrintStream ps = new PrintStream(new FileOutputStream(tmpFile));
for(int i = 0; i < LOOP_COUNT; i++) {
@@ -109,8 +116,6 @@
@Test
public void testGroupReorderCount() throws Throwable {
- long LOOP_COUNT = 1024;
- PigServer pig = new PigServer(initString);
File tmpFile = File.createTempFile("test", "txt");
PrintStream ps = new PrintStream(new FileOutputStream(tmpFile));
for(int i = 0; i < LOOP_COUNT; i++) {
@@ -131,8 +136,6 @@
@Test
public void testGroupUniqueColumnCount() throws Throwable {
- int LOOP_COUNT = 1024;
- PigServer pig = new PigServer(initString);
File tmpFile = File.createTempFile("test", "txt");
PrintStream ps = new PrintStream(new FileOutputStream(tmpFile));
long groupsize = 0;
@@ -160,8 +163,6 @@
@Test
public void testGroupDuplicateColumnCount() throws Throwable {
- int LOOP_COUNT = 1024;
- PigServer pig = new PigServer(initString);
File tmpFile = File.createTempFile("test", "txt");
PrintStream ps = new PrintStream(new FileOutputStream(tmpFile));
long groupsize = 0;
Modified: incubator/pig/branches/types/test/org/apache/pig/test/TestCombiner.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/test/org/apache/pig/test/TestCombiner.java?rev=678090&r1=678089&r2=678090&view=diff
==============================================================================
--- incubator/pig/branches/types/test/org/apache/pig/test/TestCombiner.java (original)
+++ incubator/pig/branches/types/test/org/apache/pig/test/TestCombiner.java Fri Jul 18 18:43:31 2008
@@ -24,10 +24,12 @@
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
+import java.util.Properties;
import org.junit.Test;
import junit.framework.TestCase;
+import org.apache.pig.ExecType;
import org.apache.pig.PigServer;
import org.apache.pig.builtin.PigStorage;
import org.apache.pig.data.Tuple;
@@ -41,14 +43,14 @@
@Test
public void testOnCluster() throws Exception {
// run the test on cluster
- runTest(new PigServer("mapreduce"));
+ runTest(new PigServer(ExecType.MAPREDUCE, cluster.getProperties()));
}
@Test
public void testLocal() throws Exception {
// run the test locally
- runTest(new PigServer("local"));
+ runTest(new PigServer(ExecType.LOCAL, new Properties()));
}
Modified: incubator/pig/branches/types/test/org/apache/pig/test/TestCompressedFiles.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/test/org/apache/pig/test/TestCompressedFiles.java?rev=678090&r1=678089&r2=678090&view=diff
==============================================================================
--- incubator/pig/branches/types/test/org/apache/pig/test/TestCompressedFiles.java (original)
+++ incubator/pig/branches/types/test/org/apache/pig/test/TestCompressedFiles.java Fri Jul 18 18:43:31 2008
@@ -28,6 +28,7 @@
import org.junit.Test;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
+import org.apache.pig.ExecType;
import org.apache.pig.PigServer;
import org.apache.pig.builtin.DIFF;
import junit.framework.TestCase;
@@ -72,7 +73,7 @@
@Test
public void testCompressed1() throws Throwable {
- PigServer pig = new PigServer("mapreduce");
+ PigServer pig = new PigServer(ExecType.MAPREDUCE, cluster.getProperties());
pig.registerQuery("A = foreach (cogroup (load 'file:"+gzFile+"') by $1, (load 'file:"+datFile + "') by $1) generate flatten( " + DIFF.class.getName() + "($1.$1,$2.$1)) ;");
Iterator it = pig.openIterator("A");
boolean success = true;
Modified: incubator/pig/branches/types/test/org/apache/pig/test/TestEvalPipeline.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/test/org/apache/pig/test/TestEvalPipeline.java?rev=678090&r1=678089&r2=678090&view=diff
==============================================================================
--- incubator/pig/branches/types/test/org/apache/pig/test/TestEvalPipeline.java (original)
+++ incubator/pig/branches/types/test/org/apache/pig/test/TestEvalPipeline.java Fri Jul 18 18:43:31 2008
@@ -35,6 +35,7 @@
import org.junit.Test;
import org.apache.pig.EvalFunc;
+import org.apache.pig.ExecType;
import org.apache.pig.PigServer;
import org.apache.pig.backend.executionengine.ExecException;
import org.apache.pig.builtin.BinStorage;
@@ -48,14 +49,16 @@
public class TestEvalPipeline extends TestCase {
- String initString = "mapreduce";
MiniCluster cluster = MiniCluster.buildCluster();
+ private PigServer pigServer;
TupleFactory mTf = TupleFactory.getInstance();
@Before
- public void setUp(){
+ @Override
+ public void setUp() throws Exception{
FileLocalizer.setR(new Random());
+ pigServer = new PigServer(ExecType.MAPREDUCE, cluster.getProperties());
}
static public class MyBagFunction extends EvalFunc<DataBag>{
@@ -84,7 +87,6 @@
@Test
public void testFunctionInsideFunction() throws Exception{
- PigServer pigServer = new PigServer(initString);
File f1 = createFile(new String[]{"a:1","b:1","a:1"});
@@ -100,8 +102,7 @@
@Test
public void testJoin() throws Exception{
- PigServer pigServer = new PigServer(initString);
-
+
File f1 = createFile(new String[]{"a:1","b:1","a:1"});
File f2 = createFile(new String[]{"b","b","a"});
@@ -122,7 +123,6 @@
@Test
public void testDriverMethod() throws Exception{
- PigServer pigServer = new PigServer(initString);
File f = File.createTempFile("tmp", "");
PrintWriter pw = new PrintWriter(f);
pw.println("a");
@@ -145,7 +145,6 @@
@Test
public void testMapLookup() throws Exception {
- PigServer pigServer = new PigServer(initString);
DataBag b = BagFactory.getInstance().newDefaultBag();
Map<Object, Object> colors = new HashMap<Object, Object>();
colors.put("apple","red");
@@ -246,7 +245,6 @@
@Test
public void testBagFunctionWithFlattening() throws Exception{
- PigServer pigServer = new PigServer(initString);
File queryLogFile = createFile(
new String[]{
"stanford\tdeer\tsighting",
@@ -312,18 +310,17 @@
}
ps.close();
- PigServer pig = new PigServer(initString);
- String tmpOutputFile = FileLocalizer.getTemporaryPath(null, pig.getPigContext()).toString();
- pig.registerQuery("A = LOAD 'file:" + tmpFile + "';");
+ String tmpOutputFile = FileLocalizer.getTemporaryPath(null, pigServer.getPigContext()).toString();
+ pigServer.registerQuery("A = LOAD 'file:" + tmpFile + "';");
if (eliminateDuplicates){
- pig.registerQuery("B = DISTINCT (FOREACH A GENERATE $0) PARALLEL 10;");
+ pigServer.registerQuery("B = DISTINCT (FOREACH A GENERATE $0) PARALLEL 10;");
}else{
- pig.registerQuery("B = ORDER A BY $0 PARALLEL 10;");
+ pigServer.registerQuery("B = ORDER A BY $0 PARALLEL 10;");
}
- pig.store("B", tmpOutputFile);
+ pigServer.store("B", tmpOutputFile);
- pig.registerQuery("A = load '" + tmpOutputFile + "';");
- Iterator<Tuple> iter = pig.openIterator("A");
+ pigServer.registerQuery("A = load '" + tmpOutputFile + "';");
+ Iterator<Tuple> iter = pigServer.openIterator("A");
String last = "";
HashSet<Integer> seen = new HashSet<Integer>();
if(!iter.hasNext()) fail("No Results obtained");
@@ -356,11 +353,10 @@
}
ps.close();
- PigServer pig = new PigServer(initString);
String tmpOutputFile = FileLocalizer.getTemporaryPath(null,
- pig.getPigContext()).toString();
- pig.registerQuery("A = LOAD 'file:" + tmpFile + "';");
- pig.registerQuery("B = group A by $0;");
+ pigServer.getPigContext()).toString();
+ pigServer.registerQuery("A = LOAD 'file:" + tmpFile + "';");
+ pigServer.registerQuery("B = group A by $0;");
String query = "C = foreach B {"
+ "C1 = filter A by $0 > -1;"
+ "C2 = distinct C1;"
@@ -368,8 +364,8 @@
+ "generate (int)group," + Identity.class.getName() +"(*), COUNT(C2), SUM(C2.$1)," + TitleNGrams.class.getName() + "(C3), MAX(C3.$1);"
+ "};";
- pig.registerQuery(query);
- Iterator<Tuple> iter = pig.openIterator("C");
+ pigServer.registerQuery(query);
+ Iterator<Tuple> iter = pigServer.openIterator("C");
if(!iter.hasNext()) fail("No output found");
int numIdentity = 0;
while(iter.hasNext()){
Modified: incubator/pig/branches/types/test/org/apache/pig/test/TestFilterOpNumeric.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/test/org/apache/pig/test/TestFilterOpNumeric.java?rev=678090&r1=678089&r2=678090&view=diff
==============================================================================
--- incubator/pig/branches/types/test/org/apache/pig/test/TestFilterOpNumeric.java (original)
+++ incubator/pig/branches/types/test/org/apache/pig/test/TestFilterOpNumeric.java Fri Jul 18 18:43:31 2008
@@ -22,10 +22,12 @@
import java.io.PrintStream;
import java.util.Iterator;
+import org.junit.Before;
import org.junit.Test;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
+import org.apache.pig.ExecType;
import org.apache.pig.PigServer;
import org.apache.pig.builtin.PigStorage;
import org.apache.pig.data.Tuple;
@@ -39,10 +41,17 @@
private static int LOOP_COUNT = 1024;
private String initString = "mapreduce";
MiniCluster cluster = MiniCluster.buildCluster();
+ private PigServer pig;
+ @Before
+ @Override
+ protected void setUp() throws Exception {
+ pig = new PigServer(ExecType.MAPREDUCE, cluster.getProperties());
+ }
+
+
@Test
public void testNumericEq() throws Throwable {
- PigServer pig = new PigServer(initString);
File tmpFile = File.createTempFile("test", "txt");
PrintStream ps = new PrintStream(new FileOutputStream(tmpFile));
for(int i = 0; i < LOOP_COUNT; i++) {
@@ -73,7 +82,6 @@
@Test
public void testNumericNeq() throws Throwable {
- PigServer pig = new PigServer(initString);
File tmpFile = File.createTempFile("test", "txt");
PrintStream ps = new PrintStream(new FileOutputStream(tmpFile));
for(int i = 0; i < LOOP_COUNT; i++) {
@@ -100,7 +108,6 @@
@Test
public void testNumericGt() throws Throwable {
- PigServer pig = new PigServer(initString);
File tmpFile = File.createTempFile("test", "txt");
PrintStream ps = new PrintStream(new FileOutputStream(tmpFile));
for(int i = 0; i < LOOP_COUNT; i++) {
@@ -130,7 +137,6 @@
@Test
public void testBinCond() throws Throwable {
- PigServer pig = new PigServer(initString);
File tmpFile = File.createTempFile("test", "txt");
PrintStream ps = new PrintStream(new FileOutputStream(tmpFile));
for(int i = 0; i < LOOP_COUNT; i++) {
@@ -159,7 +165,6 @@
@Test
public void testNestedBinCond() throws Throwable {
- PigServer pig = new PigServer(initString);
File tmpFile = File.createTempFile("test", "txt");
PrintStream ps = new PrintStream(new FileOutputStream(tmpFile));
for(int i = 0; i < LOOP_COUNT; i++) {
@@ -185,7 +190,6 @@
@Test
public void testNumericLt() throws Throwable {
- PigServer pig = new PigServer(initString);
File tmpFile = File.createTempFile("test", "txt");
PrintStream ps = new PrintStream(new FileOutputStream(tmpFile));
for(int i = 0; i < LOOP_COUNT; i++) {
@@ -215,7 +219,6 @@
@Test
public void testNumericGte() throws Throwable {
- PigServer pig = new PigServer(initString);
File tmpFile = File.createTempFile("test", "txt");
PrintStream ps = new PrintStream(new FileOutputStream(tmpFile));
for(int i = 0; i < LOOP_COUNT; i++) {
@@ -245,7 +248,6 @@
@Test
public void testNumericLte() throws Throwable {
- PigServer pig = new PigServer(initString);
File tmpFile = File.createTempFile("test", "txt");
PrintStream ps = new PrintStream(new FileOutputStream(tmpFile));
for(int i = 0; i < LOOP_COUNT; i++) {