You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by ol...@apache.org on 2008/02/01 04:11:39 UTC
svn commit: r617338 [2/5] - in /incubator/pig/trunk: ./ src/org/apache/pig/
src/org/apache/pig/backend/ src/org/apache/pig/backend/datastorage/
src/org/apache/pig/backend/executionengine/
src/org/apache/pig/backend/hadoop/ src/org/apache/pig/backend/ha...
Added: incubator/pig/trunk/src/org/apache/pig/backend/hadoop/datastorage/HPath.java
URL: http://svn.apache.org/viewvc/incubator/pig/trunk/src/org/apache/pig/backend/hadoop/datastorage/HPath.java?rev=617338&view=auto
==============================================================================
--- incubator/pig/trunk/src/org/apache/pig/backend/hadoop/datastorage/HPath.java (added)
+++ incubator/pig/trunk/src/org/apache/pig/backend/hadoop/datastorage/HPath.java Thu Jan 31 19:11:28 2008
@@ -0,0 +1,191 @@
+
+package org.apache.pig.backend.hadoop.datastorage;
+
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.io.IOException;
+import java.util.Properties;
+import java.util.Map;
+import java.util.HashMap;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.FileUtil;
+import org.apache.hadoop.conf.Configuration;
+
+import org.apache.pig.backend.datastorage.*;
+
+public abstract class HPath implements ElementDescriptor {
+
+ protected Path path;
+ protected HDataStorage fs;
+
+ public HPath(HDataStorage fs, Path parent, Path child) {
+ this.path = new Path(parent, child);
+ this.fs = fs;
+ }
+
+ public HPath(HDataStorage fs, String parent, String child) {
+ this(fs, new Path(parent), new Path(child));
+ }
+
+ public HPath(HDataStorage fs, Path parent, String child) {
+ this(fs, parent, new Path(child));
+ }
+
+ public HPath(HDataStorage fs, String parent, Path child) {
+ this(fs, new Path(parent), child);
+ }
+
+ public HPath(HDataStorage fs, String pathString) {
+ this(fs, new Path(pathString));
+ }
+
+ public HPath(HDataStorage fs, Path path) {
+ this.path = path;
+ this.fs = fs;
+ }
+
+ @Override
+ public DataStorage getDataStorage() {
+ return fs;
+ }
+
+ @Override
+ public abstract OutputStream create(Properties configuration)
+ throws IOException;
+
+ @Override
+ public void copy(ElementDescriptor dstName,
+ Properties dstConfiguration,
+ boolean removeSrc)
+ throws IOException {
+ FileSystem srcFS = this.fs.getHFS();
+ FileSystem dstFS = ((HPath)dstName).fs.getHFS();
+
+ Path srcPath = this.path;
+ Path dstPath = ((HPath)dstName).path;
+
+ boolean result = FileUtil.copy(srcFS,
+ srcPath,
+ dstFS,
+ dstPath,
+ false,
+ new Configuration());
+
+ if (!result) {
+ throw new IOException("Failed to copy from: " + this.toString() +
+ " to: " + dstName.toString());
+ }
+ }
+
+ @Override
+ public abstract InputStream open() throws IOException;
+
+ @Override
+ public abstract SeekableInputStream sopen() throws IOException;
+
+ @Override
+ public boolean exists() throws IOException {
+ return fs.getHFS().exists(path);
+ }
+
+ @Override
+ public void rename(ElementDescriptor newName)
+ throws IOException {
+ if (newName != null) {
+ fs.getHFS().rename(path, ((HPath)newName).path);
+ }
+ }
+
+ @Override
+ public void delete() throws IOException {
+ // the file is removed and not placed in the trash bin
+ fs.getHFS().delete(path);
+ }
+
+ @Override
+ public Properties getConfiguration() throws IOException {
+ HConfiguration props = new HConfiguration();
+
+ long blockSize = fs.getHFS().getFileStatus(path).getBlockSize();
+
+ short replication = fs.getHFS().getFileStatus(path).getReplication();
+
+ props.setProperty(BLOCK_SIZE_KEY, (new Long(blockSize)).toString());
+ props.setProperty(BLOCK_REPLICATION_KEY, (new Short(replication)).toString());
+
+ return props;
+ }
+
+ @Override
+ public void updateConfiguration(Properties newConfig) throws IOException {
+ if (newConfig == null) {
+ return;
+ }
+
+ String blkReplStr = newConfig.getProperty(BLOCK_REPLICATION_KEY);
+
+ fs.getHFS().setReplication(path,
+ new Short(blkReplStr).shortValue());
+ }
+
+ @Override
+ public Map<String, Object> getStatistics() throws IOException {
+ HashMap<String, Object> props = new HashMap<String, Object>();
+
+ Long length = new Long(fs.getHFS().getFileStatus(path).getLen());
+
+ Long modificationTime = new Long(fs.getHFS().getFileStatus(path).
+ getModificationTime());
+
+ props.put(LENGTH_KEY, length.toString());
+ props.put(MODIFICATION_TIME_KEY, modificationTime.toString());
+
+ return props;
+ }
+
+ @Override
+ public OutputStream create() throws IOException {
+ return create(null);
+ }
+
+ @Override
+ public void copy(ElementDescriptor dstName,
+ boolean removeSrc)
+ throws IOException {
+ copy(dstName, null, removeSrc);
+ }
+
+ public Path getPath() {
+ return path;
+ }
+
+ public FileSystem getHFS() {
+ return fs.getHFS();
+ }
+
+ @Override
+ public String toString() {
+ return path.toString();
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (! (obj instanceof HPath)) {
+ return false;
+ }
+
+ return this.path.equals(((HPath)obj).path);
+ }
+
+ @Override
+ public int compareTo(ElementDescriptor other) {
+ return path.compareTo(((HPath)other).path);
+ }
+
+ @Override
+ public int hashCode() {
+ return this.path.hashCode();
+ }
+}
Added: incubator/pig/trunk/src/org/apache/pig/backend/hadoop/datastorage/HSeekableInputStream.java
URL: http://svn.apache.org/viewvc/incubator/pig/trunk/src/org/apache/pig/backend/hadoop/datastorage/HSeekableInputStream.java?rev=617338&view=auto
==============================================================================
--- incubator/pig/trunk/src/org/apache/pig/backend/hadoop/datastorage/HSeekableInputStream.java (added)
+++ incubator/pig/trunk/src/org/apache/pig/backend/hadoop/datastorage/HSeekableInputStream.java Thu Jan 31 19:11:28 2008
@@ -0,0 +1,94 @@
+package org.apache.pig.backend.hadoop.datastorage;
+
+import java.io.IOException;
+
+import org.apache.hadoop.fs.FSDataInputStream;
+
+import org.apache.pig.backend.datastorage.SeekableInputStream;
+
+public class HSeekableInputStream extends SeekableInputStream {
+
+ protected FSDataInputStream input;
+ protected long contentLength;
+
+ HSeekableInputStream(FSDataInputStream input,
+ long contentLength) {
+ this.input = input;
+ this.contentLength = contentLength;
+ }
+
+ @Override
+ public void seek(long offset, FLAGS whence) throws IOException {
+ long targetPos;
+
+ switch (whence) {
+ case SEEK_SET: {
+ targetPos = offset;
+ break;
+ }
+ case SEEK_CUR: {
+ targetPos = input.getPos() + offset;
+ break;
+ }
+ case SEEK_END: {
+ targetPos = contentLength + offset;
+ break;
+ }
+ default: {
+ throw new IOException("Invalid seek option: " + whence);
+ }
+ }
+
+ input.seek(targetPos);
+ }
+
+ @Override
+ public long tell() throws IOException {
+ return input.getPos();
+ }
+
+ @Override
+ public int read() throws IOException {
+ return input.read();
+ }
+
+ @Override
+ public int read(byte[] b) throws IOException {
+ return input.read(b);
+ }
+
+ @Override
+ public int read(byte[] b, int off, int len ) throws IOException {
+ return input.read(b, off, len);
+ }
+
+ @Override
+ public int available() throws IOException {
+ return input.available();
+ }
+
+ @Override
+ public long skip(long n) throws IOException {
+ return input.skip(n);
+ }
+
+ @Override
+ public void close() throws IOException {
+ input.close();
+ }
+
+ @Override
+ public void mark(int readlimit) {
+ input.mark(readlimit);
+ }
+
+ @Override
+ public void reset() throws IOException {
+ input.reset();
+ }
+
+ @Override
+ public boolean markSupported() {
+ return input.markSupported();
+ }
+}
Added: incubator/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/HExecutionEngine.java
URL: http://svn.apache.org/viewvc/incubator/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/HExecutionEngine.java?rev=617338&view=auto
==============================================================================
--- incubator/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/HExecutionEngine.java (added)
+++ incubator/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/HExecutionEngine.java Thu Jan 31 19:11:28 2008
@@ -0,0 +1,476 @@
+package org.apache.pig.backend.hadoop.executionengine;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.net.InetAddress;
+import java.net.Socket;
+import java.net.SocketException;
+import java.net.SocketImplFactory;
+import java.net.UnknownHostException;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Enumeration;
+
+import org.apache.pig.impl.PigContext;
+import org.apache.pig.builtin.BinStorage;
+import org.apache.pig.impl.io.FileLocalizer;
+import org.apache.pig.impl.io.FileSpec;
+import org.apache.pig.backend.executionengine.*;
+import org.apache.pig.backend.executionengine.ExecJob.JOB_STATUS;
+import org.apache.pig.backend.hadoop.datastorage.HDataStorage;
+import org.apache.pig.backend.hadoop.datastorage.HConfiguration;
+import org.apache.pig.backend.datastorage.DataStorage;
+import org.apache.pig.impl.logicalLayer.OperatorKey;
+import org.apache.pig.impl.physicalLayer.PhysicalOperator;
+
+import org.apache.pig.backend.hadoop.executionengine.mapreduceExec.MapReduceLauncher;
+import org.apache.pig.backend.local.executionengine.LocalResult;
+import org.apache.pig.data.Tuple;
+
+import org.apache.log4j.Logger;
+
+import org.apache.pig.shock.SSHSocketImplFactory;
+import org.apache.hadoop.mapred.JobTracker;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.ipc.RPC;
+import org.apache.hadoop.mapred.JobSubmissionProtocol;
+import org.apache.hadoop.mapred.JobClient;
+
+
+public class HExecutionEngine implements ExecutionEngine {
+
+ protected PigContext pigContext;
+
+ protected Logger logger;
+ protected DataStorage ds;
+ protected HConfiguration conf;
+
+ protected JobSubmissionProtocol jobTracker;
+ protected JobClient jobClient;
+
+ // key: the operator key from the logical plan that originated the physical plan
+ // val: the operator key for the root of the phyisical plan
+ protected Map<OperatorKey, OperatorKey> logicalToPhysicalKeys;
+
+ protected Map<OperatorKey, ExecPhysicalOperator> physicalOpTable;
+
+ // map from LOGICAL key to into about the execution
+ protected Map<OperatorKey, MapRedResult> materializedResults;
+
+ public HExecutionEngine(PigContext pigContext,
+ Logger logger,
+ HConfiguration conf) {
+ this.pigContext = pigContext;
+ this.logger = logger;
+ this.conf = conf;
+ this.logicalToPhysicalKeys = new HashMap<OperatorKey, OperatorKey>();
+ this.physicalOpTable = new HashMap<OperatorKey, ExecPhysicalOperator>();
+ this.materializedResults = new HashMap<OperatorKey, MapRedResult>();
+
+ this.ds = null;
+
+ // to be set in the init method
+ this.jobTracker = null;
+ this.jobClient = null;
+ }
+
+ public JobClient getJobClient() {
+ return this.jobClient;
+ }
+
+ public Map<OperatorKey, MapRedResult> getMaterializedResults() {
+ return this.materializedResults;
+ }
+
+ public HExecutionEngine(PigContext pigContext, Logger logger) {
+ this(pigContext, logger, new HConfiguration(new JobConf()));
+ }
+
+ public Map<OperatorKey, ExecPhysicalOperator> getPhysicalOpTable() {
+ return this.physicalOpTable;
+ }
+
+
+ public DataStorage getDataStorage() {
+ return this.ds;
+ }
+
+ private void setJobtrackerLocation(String newLocation) {
+ conf.put("mapred.job.tracker", newLocation);
+ }
+
+ private void setFilesystemLocation(String newLocation) {
+ conf.put("fs.default.name", newLocation);
+ }
+
+ @Override
+ public void init() throws ExecException {
+ //First set the ssh socket factory
+ setSSHFactory();
+
+ String hodServer = System.getProperty("hod.server");
+
+ if (hodServer != null && hodServer.length() > 0) {
+ String hdfsAndMapred[] = doHod(hodServer);
+ setFilesystemLocation(hdfsAndMapred[0]);
+ setJobtrackerLocation(hdfsAndMapred[1]);
+ }
+ else {
+ String cluster = System.getProperty("cluster");
+ if (cluster != null && cluster.length() > 0) {
+ if(cluster.indexOf(':') < 0) {
+ cluster = cluster + ":50020";
+ }
+ setJobtrackerLocation(cluster);
+ }
+
+ String nameNode = System.getProperty("namenode");
+ if (nameNode!=null && nameNode.length() > 0) {
+ if(nameNode.indexOf(':') < 0) {
+ nameNode = nameNode + ":8020";
+ }
+ setFilesystemLocation(nameNode);
+ }
+ }
+
+ logger.info("Connecting to hadoop file system at: " + conf.get("fs.default.name"));
+
+ try {
+ ds = new HDataStorage(conf);
+ }
+ catch (IOException e) {
+ throw new ExecException("Failed to create DataStorage", e);
+ }
+
+ logger.info("Connecting to map-reduce job tracker at: " + conf.get("mapred.job.tracker"));
+
+ try {
+ jobTracker = (JobSubmissionProtocol) RPC.getProxy(JobSubmissionProtocol.class,
+ JobSubmissionProtocol.versionID,
+ JobTracker.getAddress(conf.getConfiguration()),
+ conf.getConfiguration());
+ }
+ catch (IOException e) {
+ throw new ExecException("Failed to crate job tracker", e);
+ }
+
+ try {
+ jobClient = new JobClient(new JobConf(conf.getConfiguration()));
+ }
+ catch (IOException e) {
+ throw new ExecException("Failed to create job client", e);
+ }
+ }
+
+ @Override
+ public void close() throws ExecException {
+ ;
+ }
+
+ @Override
+ public Properties getConfiguration() throws ExecException {
+ return this.conf;
+ }
+
+ @Override
+ public void updateConfiguration(Properties newConfiguration)
+ throws ExecException {
+ Enumeration keys = newConfiguration.propertyNames();
+
+ while (keys.hasMoreElements()) {
+ Object obj = keys.nextElement();
+
+ if (obj instanceof String) {
+ String str = (String) obj;
+
+ conf.put(str, newConfiguration.get(str));
+ }
+ }
+ }
+
+ @Override
+ public Map<String, Object> getStatistics() throws ExecException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public ExecPhysicalPlan compile(ExecLogicalPlan plan,
+ Properties properties)
+ throws ExecException {
+ return compile(new ExecLogicalPlan[] { plan },
+ properties);
+ }
+
+ @Override
+ public ExecPhysicalPlan compile(ExecLogicalPlan[] plans,
+ Properties properties)
+ throws ExecException {
+ if (plans == null) {
+ throw new ExecException("No Plans to compile");
+ }
+
+ OperatorKey physicalKey = null;
+ for (int i = 0; i < plans.length; ++i) {
+ ExecLogicalPlan curPlan = null;
+
+ curPlan = plans[ i ];
+
+ OperatorKey logicalKey = curPlan.getRoot();
+
+ physicalKey = logicalToPhysicalKeys.get(logicalKey);
+
+ if (physicalKey == null) {
+ try {
+ physicalKey = new MapreducePlanCompiler(pigContext).
+ compile(curPlan.getRoot(),
+ curPlan.getOpTable(),
+ this);
+ }
+ catch (IOException e) {
+ throw new ExecException("Failed to compile plan (" + i + ") " + logicalKey,
+ e);
+ }
+
+ logicalToPhysicalKeys.put(logicalKey, physicalKey);
+ }
+ }
+
+ return new MapRedPhysicalPlan(physicalKey, physicalOpTable);
+ }
+
+ @Override
+ public ExecJob execute(ExecPhysicalPlan plan)
+ throws ExecException {
+
+ POMapreduce pom = (POMapreduce) physicalOpTable.get(plan.getRoot());
+
+ MapReduceLauncher.initQueryStatus(pom.numMRJobs()); // initialize status, for bookkeeping purposes.
+ MapReduceLauncher.setConf(this.conf.getConfiguration());
+ MapReduceLauncher.setExecEngine(this);
+
+ // if the final operator is a MapReduce with no output file, then send to a temp
+ // file.
+ if (pom.outputFileSpec==null) {
+ try {
+ pom.outputFileSpec = new FileSpec(FileLocalizer.getTemporaryPath(null, pigContext).toString(),
+ BinStorage.class.getName());
+ }
+ catch (IOException e) {
+ throw new ExecException("Failed to obtain temp file for " + plan.getRoot().toString(), e);
+ }
+ }
+
+ try {
+ pom.open();
+
+ Tuple t;
+ while ((t = (Tuple) pom.getNext()) != null) {
+ ;
+ }
+
+ pom.close();
+
+ this.materializedResults.put(pom.sourceLogicalKey,
+ new MapRedResult(pom.outputFileSpec,
+ pom.reduceParallelism));
+ }
+ catch (IOException e) {
+ throw new ExecException(e);
+ }
+
+ return new HJob(JOB_STATUS.COMPLETED, pigContext, pom.outputFileSpec);
+
+ }
+
+ @Override
+ public ExecJob submit(ExecPhysicalPlan plan) throws ExecException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public Collection<ExecJob> runningJobs(Properties properties) throws ExecException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public Collection<String> activeScopes() throws ExecException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void reclaimScope(String scope) throws ExecException {
+ throw new UnsupportedOperationException();
+ }
+
+ private void setSSHFactory(){
+ String g = System.getProperty("ssh.gateway");
+ if (g == null || g.length() == 0) return;
+ try {
+ Class clazz = Class.forName("org.apache.pig.shock.SSHSocketImplFactory");
+ SocketImplFactory f = (SocketImplFactory)clazz.getMethod("getFactory", new Class[0]).invoke(0, new Object[0]);
+ Socket.setSocketImplFactory(f);
+ }
+ catch (SocketException e) {}
+ catch (Exception e){
+ throw new RuntimeException(e);
+ }
+ }
+
+ //To prevent doing hod if the pig server is constructed multiple times
+ private static String hodMapRed;
+ private static String hodHDFS;
+
+ private enum ParsingState {
+ NOTHING, HDFSUI, MAPREDUI, HDFS, MAPRED, HADOOPCONF
+ };
+
+ private String[] doHod(String server) {
+ if (hodMapRed != null) {
+ return new String[] {hodHDFS, hodMapRed};
+ }
+
+ try {
+ Process p = null;
+ // Make the kryptonite released version the default if nothing
+ // else is specified.
+ StringBuilder cmd = new StringBuilder();
+ cmd.append(System.getProperty("hod.expect.root"));
+ cmd.append('/');
+ cmd.append("libexec/pig/");
+ cmd.append(System.getProperty("hod.expect.uselatest"));
+ cmd.append('/');
+ cmd.append(System.getProperty("hod.command"));
+
+ String cluster = System.getProperty("yinst.cluster");
+
+ // TODO This is a Yahoo specific holdover, need to remove
+ // this.
+ if (cluster != null && cluster.length() > 0 && !cluster.startsWith("kryptonite")) {
+ cmd.append(" --config=");
+ cmd.append(System.getProperty("hod.config.dir"));
+ cmd.append('/');
+ cmd.append(cluster);
+ }
+
+ cmd.append(" " + System.getProperty("hod.param", ""));
+
+ if (server.equals("local")) {
+ p = Runtime.getRuntime().exec(cmd.toString());
+ }
+ else {
+ SSHSocketImplFactory fac = SSHSocketImplFactory.getFactory(server);
+ p = fac.ssh(cmd.toString());
+ }
+
+ InputStream is = p.getInputStream();
+
+ logger.info("Connecting to HOD...");
+ logger.debug("sending HOD command " + cmd.toString());
+
+ StringBuffer sb = new StringBuffer();
+ int c;
+ String hdfsUI = null;
+ String mapredUI = null;
+ String hdfs = null;
+ String mapred = null;
+ String hadoopConf = null;
+
+ ParsingState current = ParsingState.NOTHING;
+
+ while((c = is.read()) != -1 && mapred == null) {
+ if (c == '\n' || c == '\r') {
+ switch(current) {
+ case HDFSUI:
+ hdfsUI = sb.toString().trim();
+ logger.info("HDFS Web UI: " + hdfsUI);
+ break;
+ case HDFS:
+ hdfs = sb.toString().trim();
+ logger.info("HDFS: " + hdfs);
+ break;
+ case MAPREDUI:
+ mapredUI = sb.toString().trim();
+ logger.info("JobTracker Web UI: " + mapredUI);
+ break;
+ case MAPRED:
+ mapred = sb.toString().trim();
+ logger.info("JobTracker: " + mapred);
+ break;
+ case HADOOPCONF:
+ hadoopConf = sb.toString().trim();
+ logger.info("HadoopConf: " + hadoopConf);
+ break;
+ }
+ current = ParsingState.NOTHING;
+ sb = new StringBuffer();
+ }
+ sb.append((char)c);
+ if (sb.indexOf("hdfsUI:") != -1) {
+ current = ParsingState.HDFSUI;
+ sb = new StringBuffer();
+ }
+ else if (sb.indexOf("hdfs:") != -1) {
+ current = ParsingState.HDFS;
+ sb = new StringBuffer();
+ }
+ else if (sb.indexOf("mapredUI:") != -1) {
+ current = ParsingState.MAPREDUI;
+ sb = new StringBuffer();
+ }
+ else if (sb.indexOf("mapred:") != -1) {
+ current = ParsingState.MAPRED;
+ sb = new StringBuffer();
+ }
+ else if (sb.indexOf("hadoopConf:") != -1) {
+ current = ParsingState.HADOOPCONF;
+ sb = new StringBuffer();
+ }
+ }
+
+ hdfsUI = fixUpDomain(hdfsUI);
+ hdfs = fixUpDomain(hdfs);
+ mapredUI = fixUpDomain(mapredUI);
+ mapred = fixUpDomain(mapred);
+ hodHDFS = hdfs;
+ hodMapRed = mapred;
+
+ if (hadoopConf != null) {
+ JobConf jobConf = new JobConf(hadoopConf);
+ jobConf.addResource("pig-cluster-hadoop-site.xml");
+
+ conf = new HConfiguration(jobConf);
+
+ // make sure that files on class path are used
+ System.out.println("Job Conf = " + conf);
+ System.out.println("dfs.block.size= " + conf.get("dfs.block.size"));
+ System.out.println("ipc.client.timeout= " + conf.get("ipc.client.timeout"));
+ System.out.println("mapred.child.java.opts= " + conf.get("mapred.child.java.opts"));
+ }
+ else {
+ throw new IOException("Missing Hadoop configuration file");
+ }
+ return new String[] {hdfs, mapred};
+ }
+ catch (Exception e) {
+ logger.fatal("Could not connect to HOD", e);
+ System.exit(4);
+ }
+ throw new RuntimeException("Could not scrape needed information.");
+ }
+
+ private String fixUpDomain(String hostPort) throws UnknownHostException {
+ String parts[] = hostPort.split(":");
+ if (parts[0].indexOf('.') == -1) {
+ parts[0] = parts[0] + ".inktomisearch.com";
+ }
+ InetAddress.getByName(parts[0]);
+ return parts[0] + ":" + parts[1];
+ }
+
+}
+
+
+
+
Added: incubator/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/HJob.java
URL: http://svn.apache.org/viewvc/incubator/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/HJob.java?rev=617338&view=auto
==============================================================================
--- incubator/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/HJob.java (added)
+++ incubator/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/HJob.java Thu Jan 31 19:11:28 2008
@@ -0,0 +1,136 @@
+package org.apache.pig.backend.hadoop.executionengine;
+
+import java.io.OutputStream;
+import java.io.InputStream;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Properties;
+
+import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.backend.executionengine.ExecJob;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.impl.PigContext;
+import org.apache.pig.impl.io.FileSpec;
+import org.apache.pig.LoadFunc;
+import org.apache.pig.impl.io.FileLocalizer;
+import org.apache.pig.impl.io.BufferedPositionedInputStream;
+
+
+public class HJob implements ExecJob {
+
+ protected JOB_STATUS status;
+ protected PigContext pigContext;
+ protected FileSpec outFileSpec;
+
+ public HJob(JOB_STATUS status,
+ PigContext pigContext,
+ FileSpec outFileSpec) {
+ this.status = status;
+ this.pigContext = pigContext;
+ this.outFileSpec = outFileSpec;
+ }
+
+ @Override
+ public JOB_STATUS getStatus() {
+ return status;
+ }
+
+ @Override
+ public boolean hasCompleted() throws ExecException {
+ return true;
+ }
+
+ @Override
+ public Iterator<Tuple> getResults() throws ExecException {
+ final LoadFunc p;
+
+ try{
+ p = (LoadFunc)PigContext.instantiateFuncFromSpec(outFileSpec.getFuncSpec());
+
+ InputStream is = FileLocalizer.open(outFileSpec.getFileName(), pigContext);
+
+ p.bindTo(outFileSpec.getFileName(), new BufferedPositionedInputStream(is), 0, Long.MAX_VALUE);
+
+ }catch (Exception e){
+ throw new ExecException("Unable to get results for " + outFileSpec, e);
+ }
+
+ return new Iterator<Tuple>() {
+ Tuple t;
+ boolean atEnd;
+
+ public boolean hasNext() {
+ if (atEnd)
+ return false;
+ try {
+ if (t == null)
+ t = p.getNext();
+ if (t == null)
+ atEnd = true;
+ } catch (Exception e) {
+ e.printStackTrace();
+ t = null;
+ atEnd = true;
+ }
+ return !atEnd;
+ }
+
+ public Tuple next() {
+ Tuple next = t;
+ if (next != null) {
+ t = null;
+ return next;
+ }
+ try {
+ next = p.getNext();
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ if (next == null)
+ atEnd = true;
+ return next;
+ }
+
+ public void remove() {
+ throw new RuntimeException("Removal not supported");
+ }
+
+ };
+ }
+
+ @Override
+ public Properties getContiguration() {
+ Properties props = new Properties();
+ return props;
+ }
+
+ @Override
+ public Map<String, Object> getStatistics() {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void completionNotification(Object cookie) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void kill() throws ExecException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void getLogs(OutputStream log) throws ExecException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void getSTDOut(OutputStream out) throws ExecException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void getSTDError(OutputStream error) throws ExecException {
+ throw new UnsupportedOperationException();
+ }
+}
Added: incubator/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/MapRedPhysicalPlan.java
URL: http://svn.apache.org/viewvc/incubator/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/MapRedPhysicalPlan.java?rev=617338&view=auto
==============================================================================
--- incubator/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/MapRedPhysicalPlan.java (added)
+++ incubator/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/MapRedPhysicalPlan.java Thu Jan 31 19:11:28 2008
@@ -0,0 +1,56 @@
+package org.apache.pig.backend.hadoop.executionengine;
+
+import java.io.OutputStream;
+import java.io.PrintStream;
+import java.util.Map;
+import java.util.Properties;
+
+import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.backend.executionengine.ExecPhysicalOperator;
+import org.apache.pig.backend.executionengine.ExecPhysicalPlan;
+import org.apache.pig.backend.local.executionengine.POVisitor;
+import org.apache.pig.impl.physicalLayer.PhysicalOperator;
+import org.apache.pig.impl.logicalLayer.OperatorKey;
+
+public class MapRedPhysicalPlan implements ExecPhysicalPlan {
+ private static final long serialVersionUID = 1;
+
+ protected OperatorKey root;
+ protected Map<OperatorKey, ExecPhysicalOperator> opTable;
+
+ MapRedPhysicalPlan(OperatorKey root,
+ Map<OperatorKey, ExecPhysicalOperator> opTable) {
+ this.root = root;
+ this.opTable = opTable;
+ }
+
+ @Override
+ public Properties getConfiguration() {
+ // TODO
+ return null;
+ }
+
+ @Override
+ public void updateConfiguration(Properties configuration)
+ throws ExecException {
+ // TODO
+ }
+
+ @Override
+ public void explain(OutputStream out) {
+ POVisitor lprinter = new POVisitor(new PrintStream(out));
+
+ ((PhysicalOperator)opTable.get(root)).visit(lprinter, "");
+ }
+
+ @Override
+ public Map<OperatorKey, ExecPhysicalOperator> getOpTable() {
+ return opTable;
+ }
+
+ @Override
+ public OperatorKey getRoot() {
+ return this.root;
+ }
+}
+
Added: incubator/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/MapRedResult.java
URL: http://svn.apache.org/viewvc/incubator/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/MapRedResult.java?rev=617338&view=auto
==============================================================================
--- incubator/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/MapRedResult.java (added)
+++ incubator/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/MapRedResult.java Thu Jan 31 19:11:28 2008
@@ -0,0 +1,15 @@
+package org.apache.pig.backend.hadoop.executionengine;
+
+import org.apache.pig.impl.io.FileSpec;
+
+public class MapRedResult {
+ public FileSpec outFileSpec;
+ public int parallelismRequest;
+
+ public MapRedResult(FileSpec outFileSpec,
+ int parallelismRequest) {
+ this.outFileSpec = outFileSpec;
+ this.parallelismRequest = parallelismRequest;
+ }
+}
+
Added: incubator/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/MapreducePlanCompiler.java
URL: http://svn.apache.org/viewvc/incubator/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/MapreducePlanCompiler.java?rev=617338&view=auto
==============================================================================
--- incubator/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/MapreducePlanCompiler.java (added)
+++ incubator/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/MapreducePlanCompiler.java Thu Jan 31 19:11:28 2008
@@ -0,0 +1,600 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.backend.hadoop.executionengine;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.Map;
+import java.util.Iterator;
+
+import org.apache.hadoop.io.WritableComparator;
+import org.apache.pig.builtin.BinStorage;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.impl.PigContext;
+import org.apache.pig.impl.FunctionInstantiator;
+import org.apache.pig.impl.builtin.FindQuantiles;
+import org.apache.pig.impl.builtin.RandomSampleLoader;
+import org.apache.pig.impl.eval.BinCondSpec;
+import org.apache.pig.impl.eval.ConstSpec;
+import org.apache.pig.impl.eval.EvalSpec;
+import org.apache.pig.impl.eval.FilterSpec;
+import org.apache.pig.impl.eval.FuncEvalSpec;
+import org.apache.pig.impl.eval.GenerateSpec;
+import org.apache.pig.impl.eval.ProjectSpec;
+import org.apache.pig.impl.eval.CompositeEvalSpec;
+import org.apache.pig.impl.eval.MapLookupSpec;
+import org.apache.pig.impl.eval.SortDistinctSpec;
+import org.apache.pig.impl.eval.StarSpec;
+import org.apache.pig.impl.eval.EvalSpecVisitor;
+import org.apache.pig.impl.io.FileLocalizer;
+import org.apache.pig.impl.io.FileSpec;
+import org.apache.pig.impl.logicalLayer.LOCogroup;
+import org.apache.pig.impl.logicalLayer.LOEval;
+import org.apache.pig.impl.logicalLayer.LOLoad;
+import org.apache.pig.impl.logicalLayer.LOSort;
+import org.apache.pig.impl.logicalLayer.LOSplit;
+import org.apache.pig.impl.logicalLayer.LOSplitOutput;
+import org.apache.pig.impl.logicalLayer.LOStore;
+import org.apache.pig.impl.logicalLayer.LOUnion;
+import org.apache.pig.impl.logicalLayer.LogicalOperator;
+import org.apache.pig.impl.physicalLayer.PlanCompiler;
+import org.apache.pig.impl.logicalLayer.OperatorKey;
+import org.apache.pig.impl.logicalLayer.parser.NodeIdGenerator;
+import org.apache.pig.impl.physicalLayer.PhysicalOperator;
+import org.apache.pig.backend.executionengine.ExecPhysicalOperator;
+import org.apache.pig.backend.hadoop.executionengine.mapreduceExec.SortPartitioner;
+import org.apache.pig.backend.hadoop.datastorage.HFile;
+import org.apache.pig.backend.hadoop.datastorage.HDataStorage;
+import org.apache.pig.impl.logicalLayer.LogicalPlan;
+import org.apache.pig.impl.logicalLayer.parser.NodeIdGenerator;
+
+// compiler for mapreduce physical plans
+public class MapreducePlanCompiler {
+
+ protected PigContext pigContext;
+ protected NodeIdGenerator nodeIdGenerator;
+
+ protected MapreducePlanCompiler(PigContext pigContext) {
+ this.pigContext = pigContext;
+ this.nodeIdGenerator = NodeIdGenerator.getGenerator();
+ }
+
+ private String getTempFile(PigContext pigcontext) throws IOException {
+ return FileLocalizer.getTemporaryPath(null,
+ pigContext).toString();
+ }
+
+ public OperatorKey compile(OperatorKey logicalKey,
+ Map<OperatorKey, LogicalOperator> logicalOpTable,
+ HExecutionEngine execEngine) throws IOException {
+
+ // check to see if we have materialized results for the logical tree to
+ // compile, if so, re-use them...
+ //
+ Map<OperatorKey, MapRedResult> materializedResults = execEngine.getMaterializedResults();
+
+ MapRedResult materializedResult = materializedResults.get(logicalKey);
+
+ if (materializedResult != null) {
+ POMapreduce pom = new POMapreduce(logicalKey.getScope(),
+ nodeIdGenerator.getNextNodeId(logicalKey.getScope()),
+ execEngine.getPhysicalOpTable(),
+ logicalKey,
+ pigContext);
+
+ String filename = FileLocalizer.fullPath(materializedResult.outFileSpec.getFileName(), pigContext);
+ FileSpec fileSpec = new FileSpec(filename, materializedResult.outFileSpec.getFuncSpec());
+ pom.addInputFile(fileSpec);
+ pom.mapParallelism = Math.max(pom.mapParallelism, materializedResult.parallelismRequest);
+
+ return pom.getOperatorKey();
+ }
+
+ // first, compile inputs into MapReduce operators
+ OperatorKey[] compiledInputs = new OperatorKey[logicalOpTable.get(logicalKey).getInputs().size()];
+
+ for (int i = 0; i < logicalOpTable.get(logicalKey).getInputs().size(); i++)
+ compiledInputs[i] = compile(logicalOpTable.get(logicalKey).getInputs().get(i),
+ logicalOpTable,
+ execEngine);
+
+ // then, compile this operator; if possible, merge with previous MapReduce
+ // operator rather than introducing a new one
+
+ LogicalOperator lo = logicalOpTable.get(logicalKey);
+
+ if (lo instanceof LOEval) {
+ POMapreduce pom = ((POMapreduce)execEngine.getPhysicalOpTable().get(compiledInputs[0]))
+ .copy(nodeIdGenerator.getNextNodeId(logicalKey.getScope())); // make a copy of the previous
+
+ // need to do this because the copy is implemented via serialize/deserialize and the ctr is not
+ // invoked for pom.
+ execEngine.getPhysicalOpTable().put(pom.getOperatorKey(), pom);
+
+ // MapReduce operator (NOTE: it's important that we make a copy rather than using it
+ // directly; this matters in the case of a "split")
+
+ pushInto((LOEval) lo, pom); // add the computation specified by "lo" to this mapreduce
+ // operator
+
+ return pom.getOperatorKey();
+ }
+ else if (lo instanceof LOCogroup) {
+ POMapreduce pom = new POMapreduce(logicalKey.getScope(),
+ nodeIdGenerator.getNextNodeId(logicalKey.getScope()),
+ execEngine.getPhysicalOpTable(),
+ logicalKey,
+ pigContext,
+ -1,
+ logicalOpTable.get(logicalKey).getRequestedParallelism());
+
+ pom.groupFuncs = (((LOCogroup) lo).getSpecs());
+
+ connectInputs(compiledInputs, pom, execEngine.getPhysicalOpTable());
+
+ return pom.getOperatorKey();
+ }
+ else if (lo instanceof LOSplitOutput){
+ POMapreduce child = (POMapreduce)execEngine.getPhysicalOpTable().get(compiledInputs[0]);
+ String fileName = child.toSplit.tempFiles.get(((LOSplitOutput)lo).getReadFrom());
+ POMapreduce pom = new POMapreduce(lo.getScope(),
+ nodeIdGenerator.getNextNodeId(logicalKey.getScope()),
+ execEngine.getPhysicalOpTable(),
+ logicalKey,
+ pigContext,
+ child.getOperatorKey());
+ FileSpec inputFileSpec = new FileSpec(fileName, BinStorage.class.getName());
+ pom.addInputFile(inputFileSpec);
+ return pom.getOperatorKey();
+ }
+ else if (lo instanceof LOSplit) {
+ //Make copy of previous operator
+ POMapreduce pom = ((POMapreduce)execEngine.getPhysicalOpTable().get(compiledInputs[0])).
+ copy(nodeIdGenerator.getNextNodeId(logicalKey.getScope()));
+
+ // need to do this because the copy is implemented via serialize/deserialize and the ctr is not
+ // invoked for pom.
+ execEngine.getPhysicalOpTable().put(pom.getOperatorKey(), pom);
+
+ pom.toSplit = new SplitSpec((LOSplit)lo, pigContext);
+
+ //Technically, we don't need the output to be set, in the split case
+ //because nothing will go to the output. But other code assumes its non
+ //null, so we set it to a temp file.
+ FileSpec fileSpec = new FileSpec(getTempFile(pigContext), BinStorage.class.getName());
+ pom.outputFileSpec = fileSpec;
+ return pom.getOperatorKey();
+ }
+ else if (lo instanceof LOLoad) {
+ POMapreduce pom = new POMapreduce(lo.getScope(),
+ nodeIdGenerator.getNextNodeId(logicalKey.getScope()),
+ execEngine.getPhysicalOpTable(),
+ logicalKey,
+ pigContext,
+ compiledInputs);
+ LOLoad loLoad = (LOLoad) lo;
+ String filename = FileLocalizer.fullPath(loLoad.getInputFileSpec().getFileName(), pigContext);
+ FileSpec fileSpec = new FileSpec(filename, loLoad.getInputFileSpec().getFuncSpec());
+ pom.addInputFile(fileSpec);
+ pom.mapParallelism = Math.max(pom.mapParallelism, lo.getRequestedParallelism());
+ return pom.getOperatorKey();
+ }
+ else if (lo instanceof LOStore) {
+ LOStore los = (LOStore) lo;
+ ((POMapreduce)execEngine.getPhysicalOpTable().get(compiledInputs[0])).outputFileSpec = los.getOutputFileSpec();
+
+ ((POMapreduce)execEngine.getPhysicalOpTable().get(compiledInputs[0])).sourceLogicalKey =
+ los.getInputs().get(0);
+
+ return compiledInputs[0];
+ }
+ else if (lo instanceof LOUnion) {
+ POMapreduce pom = new POMapreduce(lo.getScope(),
+ nodeIdGenerator.getNextNodeId(logicalKey.getScope()),
+ execEngine.getPhysicalOpTable(),
+ logicalKey,
+ pigContext,
+ -1,
+ lo.getRequestedParallelism());
+ connectInputs(compiledInputs, pom, execEngine.getPhysicalOpTable());
+ return pom.getOperatorKey();
+ }
+ else if (lo instanceof LOSort) {
+ LOSort loSort = (LOSort) lo;
+ //must break up into 2 map reduce jobs, one for gathering quantiles, another for sorting
+ POMapreduce quantileJob = getQuantileJob(lo.getScope(),
+ nodeIdGenerator.getNextNodeId(logicalKey.getScope()),
+ execEngine.getPhysicalOpTable(),
+ logicalKey,
+ (POMapreduce) (execEngine.getPhysicalOpTable().get(compiledInputs[0])),
+ loSort);
+
+ return getSortJob(lo.getScope(),
+ nodeIdGenerator.getNextNodeId(logicalKey.getScope()),
+ execEngine.getPhysicalOpTable(),
+ logicalKey,
+ quantileJob,
+ loSort).getOperatorKey();
+ }
+ throw new IOException("Unknown logical operator.");
+ }
+
+ // added for UNION:
+ // returns true iff okay to merge this operator with a subsequent binary op (e.g., co-group or union).
+ // this is the case iff (1) this operator doesn't do grouping (which requires its own reduce phase), and (2) this operator isn't itself a binary op
+ private boolean okayToMergeWithBinaryOp(POMapreduce mro) {
+ return (!mro.doesGrouping() && (mro.numInputFiles() == 1));
+ }
+
+ private void connectInputs(OperatorKey[] compiledInputs,
+ POMapreduce pom,
+ Map<OperatorKey, ExecPhysicalOperator> physicalOpTable) throws IOException {
+ // connect inputs (by merging operators, if possible; else connect via temp files)
+ for (int i = 0; i < compiledInputs.length; i++) {
+ if (okayToMergeWithBinaryOp((POMapreduce)physicalOpTable.get(compiledInputs[i]))) {
+ // can merge input i with this operator
+ pom.addInputFile(((POMapreduce)physicalOpTable.get(compiledInputs[i])).getFileSpec(0),
+ ((POMapreduce)physicalOpTable.get(compiledInputs[i])).getEvalSpec(0));
+ pom.addInputOperators(((PhysicalOperator)physicalOpTable.get(compiledInputs[i])).inputs);
+ } else {
+ // chain together via a temp file
+ String tempFile = getTempFile(pigContext);
+ FileSpec fileSpec = new FileSpec( tempFile, BinStorage.class.getName());
+ ((POMapreduce)physicalOpTable.get(compiledInputs[i])).outputFileSpec = fileSpec;
+ pom.addInputFile(fileSpec);
+ pom.addInputOperator(compiledInputs[i]);
+ }
+ }
+ }
+
+ // push the function evaluated by "lo" into the map-reduce operator "mro"
+ private void pushInto(LOEval lo, POMapreduce mro) throws IOException {
+
+ if (!mro.doesGrouping()) { // push into "map" phase
+
+ // changed for UNION:
+ for (int index = 0; index < mro.toMap.size(); index++) {
+ mro.addMapSpec(index, lo.getSpec());
+ }
+ //int index = mro.toMap.list.size() - 1;
+ //mro.toMap.list.get(index).add(lo.spec);
+
+ mro.mapParallelism = Math.max(mro.mapParallelism, lo.getRequestedParallelism());
+
+ } else { // push into "reduce" phase
+
+ EvalSpec spec = lo.getSpec();
+
+ if (mro.toReduce == null && shouldCombine(spec)) {
+ // Push this spec into the combiner. But we also need to
+ // create a new spec with a changed expected projection to
+ // push into the reducer.
+
+ if (mro.toCombine != null) {
+ throw new AssertionError("Combiner already set.");
+ }
+ // mro.toCombine = spec;
+
+ // Now, we need to adjust the expected projection for the
+ // eval spec(s) we just pushed. Also, this will change the
+ // function to be the final instead of general instance.
+ EvalSpec newSpec = spec.copy(pigContext);
+ newSpec.visit(new ReduceAdjuster(pigContext));
+ mro.addReduceSpec(newSpec);
+
+ // Adjust the function name for the combine spec, to set it
+ // to the initial function instead of the general
+ // instance. Make a copy of the eval spec rather than
+ // adjusting the existing one, to prevent breaking the
+ // logical plan in case another physical plan is generated
+ // from it later.
+ EvalSpec combineSpec = spec.copy(pigContext);
+ combineSpec.visit(new CombineAdjuster());
+ mro.toCombine = combineSpec;
+
+ } else {
+ mro.addReduceSpec(lo.getSpec()); // otherwise, don't use combiner
+ }
+
+ mro.reduceParallelism = Math.max(mro.reduceParallelism, lo.getRequestedParallelism());
+
+ }
+ }
+
+
+ private POMapreduce getQuantileJob(String scope,
+ long id,
+ Map<OperatorKey, ExecPhysicalOperator> physicalOpTable,
+ OperatorKey logicalKey,
+ POMapreduce input,
+ LOSort loSort) throws IOException{
+ //first the quantile job
+ POMapreduce quantileJob = new POMapreduce(scope,
+ id,
+ physicalOpTable,
+ logicalKey,
+ pigContext,
+ input.getOperatorKey());
+ //first materialize the output of the previous stage
+ String fileName = getTempFile(pigContext);
+ input.outputFileSpec = new FileSpec(fileName,BinStorage.class.getName());
+
+ //Load the output using a random sample load function
+ FileSpec inputFileSpec = new FileSpec(fileName, RandomSampleLoader.class.getName());
+ quantileJob.addInputFile(inputFileSpec);
+
+ quantileJob.addMapSpec(0, loSort.getSortSpec());
+
+ //Constructing the query structures by hand, quite ugly.
+
+ //group all
+ ArrayList<EvalSpec> groupFuncs = new ArrayList<EvalSpec>();
+
+ groupFuncs.add(new GenerateSpec(new ConstSpec("all")).getGroupBySpec());
+
+ quantileJob.groupFuncs = groupFuncs;
+
+ //find the quantiles in the reduce step
+ ArrayList<EvalSpec> argsList = new ArrayList<EvalSpec>();
+ argsList.add(new ConstSpec(Math.max(loSort.getRequestedParallelism()-1,1)));
+
+ //sort the first column of the cogroup output and feed it to the quantiles function
+ EvalSpec sortedSampleSpec = new ProjectSpec(1);
+ sortedSampleSpec = sortedSampleSpec.addSpec(new SortDistinctSpec(false, new StarSpec()));
+ argsList.add(sortedSampleSpec);
+
+ EvalSpec args = new GenerateSpec(argsList);
+
+ EvalSpec reduceSpec = new FuncEvalSpec(pigContext, FindQuantiles.class.getName(), args);
+ reduceSpec.setFlatten(true);
+ quantileJob.addReduceSpec(new GenerateSpec(reduceSpec));
+
+ //a temporary file to hold the quantile data
+ String quantileFile = getTempFile(pigContext);
+ quantileJob.outputFileSpec = new FileSpec(quantileFile, BinStorage.class.getName());
+
+ return quantileJob;
+ }
+
+ public POMapreduce getSortJob(String scope,
+ long id,
+ Map<OperatorKey, ExecPhysicalOperator> physicalOpTable,
+ OperatorKey logicalKey,
+ POMapreduce quantileJob,
+ LOSort loSort) throws IOException{
+ POMapreduce sortJob = new POMapreduce(scope,
+ id,
+ physicalOpTable,
+ logicalKey,
+ pigContext,
+ quantileJob.getOperatorKey());
+
+ sortJob.quantilesFile = quantileJob.outputFileSpec.getFileName();
+
+ //same input as the quantile job, but the full BinStorage load function
+ sortJob.addInputFile(new FileSpec(quantileJob.getFileSpec(0).getFileName(), BinStorage.class.getName()));
+
+ ArrayList<EvalSpec> groupFuncs = new ArrayList<EvalSpec>();
+
+ groupFuncs.add(new GenerateSpec(loSort.getSortSpec()).getGroupBySpec());
+
+ sortJob.groupFuncs = groupFuncs;
+ sortJob.partitionFunction = SortPartitioner.class;
+
+ ProjectSpec ps = new ProjectSpec(1);
+ ps.setFlatten(true);
+ sortJob.addReduceSpec(new GenerateSpec(ps));
+
+ sortJob.reduceParallelism = loSort.getRequestedParallelism();
+
+ String comparatorFuncName = loSort.getSortSpec().getComparatorName();
+ if (comparatorFuncName != null) {
+ sortJob.userComparator =
+ (Class<WritableComparator>)PigContext.resolveClassName(
+ comparatorFuncName);
+ }
+
+ return sortJob;
+ }
+
+ private boolean shouldCombine(EvalSpec spec) {
+ // Determine whether this something we can combine or not.
+ // First, it must be a generate spec.
+ if (!(spec instanceof GenerateSpec)) {
+ return false;
+ }
+
+ GenerateSpec gen = (GenerateSpec)spec;
+
+ // Second, the first immediate child of the generate spec must be
+ // a project with a value of 0.
+ Iterator<EvalSpec> i = gen.getSpecs().iterator();
+ if (!i.hasNext()) return false;
+ EvalSpec s = i.next();
+ if (!(s instanceof ProjectSpec)) {
+ return false;
+ } else {
+ ProjectSpec p = (ProjectSpec)s;
+ if (p.numCols() > 1) return false;
+ else if (p.getCol() != 0) return false;
+ }
+
+ // Third, all subsequent immediate children of the generate spec
+ // must be func eval specs
+ while (i.hasNext()) {
+ s = i.next();
+ if (!(s instanceof FuncEvalSpec)) return false;
+ }
+
+ // Third, walk the entire tree of the generate spec and see if we
+ // can combine it.
+ CombineDeterminer cd = new CombineDeterminer();
+ gen.visit(cd);
+ return cd.useCombiner();
+ }
+
+ private class ReduceAdjuster extends EvalSpecVisitor {
+ private int position = 0;
+ FunctionInstantiator instantiator = null;
+
+ public ReduceAdjuster(FunctionInstantiator fi) {
+ instantiator = fi;
+ }
+
+ public void visitGenerate(GenerateSpec g) {
+ Iterator<EvalSpec> i = g.getSpecs().iterator();
+ for (position = 0; i.hasNext(); position++) {
+ i.next().visit(this);
+ }
+ }
+
+ public void visitFuncEval(FuncEvalSpec fe) {
+ // Need to replace our arg spec with a project of our position.
+ // DON'T visit our args, they're exactly what we're trying to
+ // lop off.
+ // The first ProjectSpec in the Composite is because the tuples
+ // will come out of the combiner in the form (groupkey,
+ // {(x, y, z)}). The second ProjectSpec contains the offset of
+ // the projection element we're interested in.
+ CompositeEvalSpec cs = new CompositeEvalSpec(new ProjectSpec(1));
+ cs.addSpec(new ProjectSpec(position));
+ fe.setArgs(new GenerateSpec(cs));
+
+
+ // Reset the function to call the final instance of itself
+ // instead of the general instance. Have to instantiate the
+ // function itself first so we can find out if it's algebraic
+ // or not.
+ try {
+ fe.instantiateFunc(instantiator);
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ fe.resetFuncToFinal();
+ }
+ }
+
+ private class CombineAdjuster extends EvalSpecVisitor {
+ private int position = 0;
+
+ public void visitFuncEval(FuncEvalSpec fe) {
+ // Reset the function to call the initial instance of itself
+ // instead of the general instance.
+ fe.resetFuncToInitial();
+ }
+ }
+
+ private class CombineDeterminer extends EvalSpecVisitor {
+ private class FuncCombinable extends EvalSpecVisitor {
+ public boolean combinable = true;
+
+ @Override
+ public void visitBinCond(BinCondSpec bc) {
+ combinable = false;
+ }
+
+ @Override
+ public void visitFilter(FilterSpec bc) {
+ combinable = false;
+ }
+
+ @Override
+ public void visitFuncEval(FuncEvalSpec bc) {
+ combinable = false;
+ }
+
+ @Override
+ public void visitSortDistinct(SortDistinctSpec bc) {
+ combinable = false;
+ }
+ };
+
+ private int shouldCombine = 0;
+
+ public boolean useCombiner() {
+ return shouldCombine > 0;
+ }
+
+ @Override
+ public void visitBinCond(BinCondSpec bc) {
+ // TODO Could be true if both are true. But the logic in
+ // CombineAdjuster and ReduceAdjuster don't know how to handle
+ // binconds, so just do false for now.
+ shouldCombine = -1;
+ }
+
+ @Override
+ public void visitCompositeEval(CompositeEvalSpec ce) {
+ // If we've already determined we're not combinable, stop.
+ if (shouldCombine < 0) return;
+
+ for (EvalSpec spec: ce.getSpecs()) {
+ spec.visit(this);
+ }
+ }
+
+ // ConstSpec is a NOP, as it neither will benefit from nor
+ // prevents combinability.
+
+ @Override
+ public void visitFilter(FilterSpec f) {
+ shouldCombine = -1;
+ }
+
+ @Override
+ public void visitFuncEval(FuncEvalSpec fe) {
+ // Check the functions arguments, to make sure they are
+ // combinable.
+ FuncCombinable fc = new FuncCombinable();
+ fe.getArgs().visit(fc);
+ if (!fc.combinable) {
+ shouldCombine = -1;
+ return;
+ }
+
+ if (fe.combinable()) shouldCombine = 1;
+ else shouldCombine = -1;
+ }
+
+ @Override
+ public void visitGenerate(GenerateSpec g) {
+ // If we've already determined we're not combinable, stop.
+ if (shouldCombine < 0) return;
+
+ for (EvalSpec spec: g.getSpecs()) {
+ spec.visit(this);
+ }
+ }
+
+ // MapLookupSpec is a NOP, as it neither will benefit from nor
+ // prevents combinability.
+
+ // ProjectSpec is a NOP, as it neither will benefit from nor
+ // prevents combinability.
+
+ @Override
+ public void visitSortDistinct(SortDistinctSpec sd) {
+ shouldCombine = -1;
+ }
+
+ // StarSpec is a NOP, as it neither will benefit from nor
+ // prevents combinability.
+ }
+
+}
Added: incubator/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/POMapreduce.java
URL: http://svn.apache.org/viewvc/incubator/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/POMapreduce.java?rev=617338&view=auto
==============================================================================
--- incubator/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/POMapreduce.java (added)
+++ incubator/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/POMapreduce.java Thu Jan 31 19:11:28 2008
@@ -0,0 +1,262 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.backend.hadoop.executionengine;
+
+import java.io.IOException;
+import java.io.PrintStream;
+import java.util.ArrayList;
+import java.util.Map;
+
+import org.apache.log4j.Logger;
+
+import org.apache.hadoop.io.WritableComparator;
+import org.apache.pig.backend.executionengine.ExecPhysicalOperator;
+import org.apache.pig.backend.hadoop.executionengine.mapreduceExec.MapReduceLauncher;
+import org.apache.pig.backend.local.executionengine.POVisitor;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.impl.PigContext;
+import org.apache.pig.impl.eval.EvalSpec;
+import org.apache.pig.impl.eval.StarSpec;
+import org.apache.pig.impl.io.FileSpec;
+import org.apache.pig.impl.logicalLayer.OperatorKey;
+import org.apache.pig.impl.logicalLayer.LogicalOperator;
+import org.apache.pig.impl.physicalLayer.PhysicalOperator;
+import org.apache.pig.impl.util.ObjectSerializer;
+import org.apache.pig.impl.util.PigLogger;
+
+public class POMapreduce extends PhysicalOperator {
+ private static final long serialVersionUID = 1L;
+
+ public ArrayList<EvalSpec> toMap = new ArrayList<EvalSpec>();
+ public EvalSpec toCombine = null;
+ public EvalSpec toReduce = null;
+ public ArrayList<EvalSpec> groupFuncs = null;
+ public SplitSpec toSplit = null;
+ public ArrayList<FileSpec> inputFileSpecs = new ArrayList<FileSpec>();
+ public FileSpec outputFileSpec = null;
+ public Class partitionFunction = null;
+ public Class<WritableComparator> userComparator = null;
+ public String quantilesFile = null;
+ public PigContext pigContext;
+
+ public OperatorKey sourceLogicalKey;
+
+ public int mapParallelism = -1; // -1 means let hadoop decide
+ public int reduceParallelism = -1;
+
+
+ static MapReduceLauncher mapReduceLauncher = new MapReduceLauncher();
+
+ public String getScope() {
+ return scope;
+ }
+
+ public boolean doesGrouping() {
+ return !(groupFuncs == null);
+ }
+
+ public boolean doesSplitting() {
+ return toSplit !=null;
+ }
+
+ public boolean doesSorting() {
+ return quantilesFile!=null;
+ }
+
+ public int numInputFiles() {
+ if (inputFileSpecs == null)
+ return 0;
+ else
+ return inputFileSpecs.size();
+ }
+
+ public POMapreduce(String scope,
+ long id,
+ Map<OperatorKey, ExecPhysicalOperator> opTable,
+ OperatorKey sourceLogicalKey,
+ PigContext pigContext,
+ int mapParallelism,
+ int reduceParallelism) {
+ this(scope, id, opTable, sourceLogicalKey, pigContext);
+ this.mapParallelism = mapParallelism;
+ this.reduceParallelism = reduceParallelism;
+ }
+
+ public POMapreduce(String scope,
+ long id,
+ Map<OperatorKey, ExecPhysicalOperator> opTable,
+ OperatorKey sourceLogicalKey,
+ PigContext pigContext,
+ OperatorKey[] inputsIn) {
+ super(scope, id, opTable, LogicalOperator.FIXED);
+ this.sourceLogicalKey = sourceLogicalKey;
+ this.pigContext = pigContext;
+ inputs = inputsIn;
+ }
+
+ public POMapreduce(String scope,
+ long id,
+ Map<OperatorKey, ExecPhysicalOperator> opTable,
+ OperatorKey sourceLogicalKey,
+ PigContext pigContext,
+ OperatorKey inputIn) {
+ super(scope, id, opTable, LogicalOperator.FIXED);
+ this.sourceLogicalKey = sourceLogicalKey;
+ this.pigContext = pigContext;
+ inputs = new OperatorKey[1];
+ inputs[0] = inputIn;
+ }
+
+ public POMapreduce(String scope,
+ long id,
+ Map<OperatorKey, ExecPhysicalOperator> opTable,
+ OperatorKey sourceLogicalKey,
+ PigContext pigContext) {
+ super(scope, id, opTable, LogicalOperator.FIXED);
+ this.sourceLogicalKey = sourceLogicalKey;
+ this.pigContext = pigContext;
+ inputs = new OperatorKey[0];
+ }
+
+ public void addInputOperator(OperatorKey newInput) {
+ OperatorKey[] oldInputs = inputs;
+ inputs = new OperatorKey[oldInputs.length + 1];
+ for (int i = 0; i < oldInputs.length; i++)
+ inputs[i] = oldInputs[i];
+ inputs[inputs.length - 1] = newInput;
+ }
+
+ public void addInputOperators(OperatorKey[] newInputs) {
+ for (int i = 0; i < newInputs.length; i++) {
+ addInputOperator(newInputs[i]);
+ }
+ }
+
+ public void addInputFile(FileSpec fileSpec){
+ addInputFile(fileSpec, new StarSpec());
+ }
+
+ public void addInputFile(FileSpec fileSpec, EvalSpec evalSpec){
+ inputFileSpecs.add(fileSpec);
+ toMap.add(evalSpec);
+ }
+
+
+ @Override
+ public boolean open() throws IOException {
+ // first, call open() on all inputs
+ if (inputs != null) {
+ for (int i = 0; i < inputs.length; i++) {
+ if (!((PhysicalOperator)opTable.get(inputs[i])).open())
+ return false;
+ }
+ }
+
+ // then, have hadoop run this MapReduce job:
+ if (pigContext.debug) print();
+ boolean success = mapReduceLauncher.launchPig(this);
+ if (!success) {
+ // XXX: If we throw an exception on failure, why do we return a boolean ?!? - ben
+ throw new IOException("Job failed");
+ }
+ return success;
+ }
+
+ @Override
+ public Tuple getNext() throws IOException {
+ // drain all inputs
+ for (int i = 0; i < inputs.length; i++) {
+ while (((PhysicalOperator)opTable.get(inputs[i])).getNext() != null)
+ ;
+ }
+
+ // indicate that we are done
+ return null;
+ }
+
+ public int numMRJobs() {
+ int numInputJobs = 0;
+ if (inputs != null) {
+ for (OperatorKey i : inputs) {
+ numInputJobs += ((POMapreduce) opTable.get(i)).numMRJobs();
+ }
+ }
+ return 1 + numInputJobs;
+ }
+
+ void print() {
+ Logger log = PigLogger.getLogger();
+ log.info("----- MapReduce Job -----");
+ log.info("Input: " + inputFileSpecs);
+ log.info("Map: " + toMap);
+ log.info("Group: " + groupFuncs);
+ log.info("Combine: " + toCombine);
+ log.info("Reduce: " + toReduce);
+ log.info("Output: " + outputFileSpec);
+ log.info("Split: " + toSplit);
+ log.info("Map parallelism: " + mapParallelism);
+ log.info("Reduce parallelism: " + reduceParallelism);
+ }
+
+ public POMapreduce copy(long id){
+ try{
+ Map<OperatorKey, ExecPhysicalOperator> srcOpTable = this.opTable;
+ this.opTable = null;
+
+ POMapreduce copy = ((POMapreduce)ObjectSerializer.deserialize(ObjectSerializer.serialize(this)));
+
+ copy.pigContext = pigContext;
+ copy.inputs = inputs;
+ copy.opTable = srcOpTable;
+ copy.id = id;
+ return copy;
+ }catch(IOException e){
+ throw new RuntimeException(e);
+ }
+ }
+
+ public EvalSpec getEvalSpec(int i){
+ return toMap.get(i);
+ }
+
+ public FileSpec getFileSpec(int i){
+ return inputFileSpecs.get(i);
+ }
+
+ public void addMapSpec(int i, EvalSpec spec){
+ if (toMap.get(i) == null)
+ toMap.set(i, spec);
+ else
+ toMap.set(i, toMap.get(i).addSpec(spec));
+ }
+
+ public void addReduceSpec(EvalSpec spec){
+ if (toReduce == null)
+ toReduce = spec;
+ else
+ toReduce = toReduce.addSpec(spec);
+ }
+
+ public void visit(POVisitor v, String prefix) {
+ PrintStream ps = v.getPrintStream();
+
+ ps.println(prefix + "POMapReduce (" + this + "): scope id: " + this.scope);
+ }
+}
+
+
Added: incubator/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/SplitSpec.java
URL: http://svn.apache.org/viewvc/incubator/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/SplitSpec.java?rev=617338&view=auto
==============================================================================
--- incubator/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/SplitSpec.java (added)
+++ incubator/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/SplitSpec.java Thu Jan 31 19:11:28 2008
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.backend.hadoop.executionengine;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.pig.impl.FunctionInstantiator;
+import org.apache.pig.impl.PigContext;
+import org.apache.pig.impl.eval.cond.Cond;
+import org.apache.pig.impl.logicalLayer.LOSplit;
+import org.apache.pig.impl.io.FileLocalizer;
+
+
+public class SplitSpec implements Serializable{
+ private static final long serialVersionUID = 1L;
+
+ public ArrayList<Cond> conditions;
+ public List<String> tempFiles;
+
+ private static String getTempFile(PigContext pigContext) throws IOException {
+ return FileLocalizer.getTemporaryPath(null, pigContext).toString();
+ }
+
+ public SplitSpec(LOSplit lo, PigContext pigContext){
+ this.conditions = lo.getConditions();
+ tempFiles = new ArrayList<String>();
+ try{
+ for (int i=0; i<conditions.size(); i++){
+ tempFiles.add(getTempFile(pigContext));
+ }
+ }catch (IOException e){
+ e.printStackTrace();
+ }
+ }
+
+ @Override
+ public String toString(){
+ StringBuilder sb = new StringBuilder();
+ for (int i=0; i<conditions.size(); i++){
+ if (i!=0) sb.append(";");
+ sb.append(conditions.get(i).toString());
+ sb.append(";");
+ sb.append(tempFiles.get(i));
+ }
+ return sb.toString();
+ }
+ public void instantiateFunc(FunctionInstantiator instantiaor)
+ throws IOException {
+ for(Cond condition : conditions)
+ condition.instantiateFunc(instantiaor);
+ }
+}
Added: incubator/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapreduceExec/MapReduceLauncher.java
URL: http://svn.apache.org/viewvc/incubator/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapreduceExec/MapReduceLauncher.java?rev=617338&view=auto
==============================================================================
--- incubator/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapreduceExec/MapReduceLauncher.java (added)
+++ incubator/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapreduceExec/MapReduceLauncher.java Thu Jan 31 19:11:28 2008
@@ -0,0 +1,317 @@
+/*
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.backend.hadoop.executionengine.mapreduceExec;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Random;
+import java.io.File;
+import java.io.FileOutputStream;
+
+import org.apache.log4j.Logger;
+import org.apache.pig.impl.util.PigLogger;
+import org.apache.pig.backend.hadoop.executionengine.POMapreduce;
+import org.apache.pig.builtin.PigStorage;
+import org.apache.pig.data.DataBag;
+import org.apache.pig.data.IndexedTuple;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.data.BagFactory;
+import org.apache.pig.impl.eval.EvalSpec;
+import org.apache.pig.impl.io.PigFile;
+import org.apache.pig.impl.util.JarManager;
+import org.apache.pig.impl.util.ObjectSerializer;
+import org.apache.pig.backend.hadoop.executionengine.HExecutionEngine;
+import org.apache.pig.backend.hadoop.datastorage.HDataStorage;
+import org.apache.pig.backend.datastorage.ElementDescriptor;
+import org.apache.pig.backend.datastorage.DataStorageException;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.WritableComparator;
+import org.apache.hadoop.io.WritableComparator;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapred.TaskReport;
+import org.apache.hadoop.mapred.JobClient;
+import org.apache.hadoop.mapred.RunningJob;
+
+
+/**
+ * This is that Main-Class for the Pig Jar file. It will setup a Pig jar file to run with the proper
+ * libraries. It will also provide a basic shell if - or -e is used as the name of the Jar file.
+ *
+ * @author breed
+ *
+ */
+public class MapReduceLauncher {
+
+ public static long totalHadoopTimeSpent = 0;
+ public static int numMRJobs;
+ public static int mrJobNumber;
+
+ public static Configuration config = null;
+ public static HExecutionEngine execEngine = null;
+
+ public static void setConf(Configuration configuration) {
+ config = configuration;
+ }
+
+ public static void setExecEngine(HExecutionEngine executionEngine) {
+ execEngine = executionEngine;
+ }
+
+ public static void initQueryStatus(int numMRJobsIn) {
+ numMRJobs = numMRJobsIn;
+ mrJobNumber = 0;
+ }
+
+ public static class PigWritableComparator extends WritableComparator {
+ public PigWritableComparator() {
+ super(Tuple.class);
+ }
+
+ public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2){
+ return WritableComparator.compareBytes(b1, s1, l1, b2, s2, l2);
+ }
+ }
+
+ static Random rand = new Random();
+
+ /**
+ * Submit a Pig job to hadoop.
+ *
+ * @param mapFuncs
+ * a list of map functions to apply to the inputs. The cardinality of the list should
+ * be the same as input's cardinality.
+ * @param groupFuncs
+ * a list of grouping functions to apply to the inputs. The cardinality of the list
+ * should be the same as input's cardinality.
+ * @param reduceFunc
+ * the reduce function.
+ * @param mapTasks
+ * the number of map tasks to use.
+ * @param reduceTasks
+ * the number of reduce tasks to use.
+ * @param input
+ * a list of inputs
+ * @param output
+ * the path of the output.
+ * @return an indicator of success or failure.
+ * @throws IOException
+ */
+ public boolean launchPig(POMapreduce pom) throws IOException {
+ Logger log = PigLogger.getLogger();
+ JobConf conf = new JobConf(config);
+ conf.setJobName(pom.pigContext.getJobName());
+ boolean success = false;
+ List<String> funcs = new ArrayList<String>();
+
+ if (pom.toMap != null){
+ for (EvalSpec es: pom.toMap)
+ funcs.addAll(es.getFuncs());
+ }
+ if (pom.groupFuncs != null){
+ for(EvalSpec es: pom.groupFuncs)
+ funcs.addAll(es.getFuncs());
+ }
+ if (pom.toReduce != null) {
+ funcs.addAll(pom.toReduce.getFuncs());
+ }
+
+ // create jobs.jar locally and pass it to hadoop
+ File submitJarFile = File.createTempFile("Job", ".jar");
+ try {
+ FileOutputStream fos = new FileOutputStream(submitJarFile);
+ JarManager.createJar(fos, funcs, pom.pigContext);
+ log.debug("Job jar size = " + submitJarFile.length());
+ conf.setJar(submitJarFile.getPath());
+ String user = System.getProperty("user.name");
+ conf.setUser(user != null ? user : "Pigster");
+
+ if (pom.reduceParallelism != -1) {
+ conf.setNumReduceTasks(pom.reduceParallelism);
+ }
+ if (pom.toMap != null) {
+ conf.set("pig.mapFuncs", ObjectSerializer.serialize(pom.toMap));
+ }
+ if (pom.toCombine != null) {
+ conf.set("pig.combineFunc", ObjectSerializer.serialize(pom.toCombine));
+ }
+ if (pom.groupFuncs != null) {
+ conf.set("pig.groupFuncs", ObjectSerializer.serialize(pom.groupFuncs));
+ }
+ if (pom.toReduce != null) {
+ conf.set("pig.reduceFunc", ObjectSerializer.serialize(pom.toReduce));
+ }
+ if (pom.toSplit != null) {
+ conf.set("pig.splitSpec", ObjectSerializer.serialize(pom.toSplit));
+ }
+ if (pom.pigContext != null) {
+ conf.set("pig.pigContext", ObjectSerializer.serialize(pom.pigContext));
+ }
+ conf.setMapRunnerClass(PigMapReduce.class);
+ if (pom.toCombine != null) {
+ conf.setCombinerClass(PigCombine.class);
+ //conf.setCombinerClass(PigMapReduce.class);
+ }
+ if (pom.quantilesFile!=null){
+ conf.set("pig.quantilesFile", pom.quantilesFile);
+ }
+ else{
+ // this is not a sort job - can use byte comparison to speed up processing
+ conf.setOutputKeyComparatorClass(PigWritableComparator.class);
+ }
+ if (pom.partitionFunction!=null){
+ conf.setPartitionerClass(SortPartitioner.class);
+ }
+ conf.setReducerClass(PigMapReduce.class);
+ conf.setInputFormat(PigInputFormat.class);
+ conf.setOutputFormat(PigOutputFormat.class);
+ // not used starting with 0.15 conf.setInputKeyClass(Text.class);
+ // not used starting with 0.15 conf.setInputValueClass(Tuple.class);
+ conf.setOutputKeyClass(Tuple.class);
+ if (pom.userComparator != null) {
+ conf.setOutputKeyComparatorClass(pom.userComparator);
+ }
+ conf.setOutputValueClass(IndexedTuple.class);
+ conf.set("pig.inputs", ObjectSerializer.serialize(pom.inputFileSpecs));
+
+ conf.setOutputPath(new Path(pom.outputFileSpec.getFileName()));
+ conf.set("pig.storeFunc", pom.outputFileSpec.getFuncSpec());
+
+ //
+ // Now, actually submit the job (using the submit name)
+ //
+ JobClient jobClient = execEngine.getJobClient();
+ RunningJob status = jobClient.submitJob(conf);
+ log.debug("submitted job: " + status.getJobID());
+
+ long sleepTime = 1000;
+ double lastQueryProgress = -1.0;
+ int lastJobsQueued = -1;
+ double lastMapProgress = -1.0;
+ double lastReduceProgress = -1.0;
+ while (true) {
+ try {
+ Thread.sleep(sleepTime); } catch (Exception e) {}
+
+ if (status.isComplete()) {
+ success = status.isSuccessful();
+ log.debug("Job finished " + (success ? "" : "un") + "successfully");
+ if (success) {
+ mrJobNumber++;
+ }
+ double queryProgress = ((double) mrJobNumber) / ((double) numMRJobs);
+ if (queryProgress > lastQueryProgress) {
+ log.info("Pig progress = " + ((int) (queryProgress * 100)) + "%");
+ lastQueryProgress = queryProgress;
+ }
+ break;
+ }
+ else // still running
+ {
+ double mapProgress = status.mapProgress();
+ double reduceProgress = status.reduceProgress();
+ if (lastMapProgress != mapProgress || lastReduceProgress != reduceProgress) {
+ log.debug("Hadoop job progress: Map=" + (int) (mapProgress * 100) + "% Reduce="
+ + (int) (reduceProgress * 100) + "%");
+ lastMapProgress = mapProgress;
+ lastReduceProgress = reduceProgress;
+ }
+ double numJobsCompleted = mrJobNumber;
+ double thisJobProgress = (mapProgress + reduceProgress) / 2.0;
+ double queryProgress = (numJobsCompleted + thisJobProgress) / ((double) numMRJobs);
+ if (queryProgress > lastQueryProgress) {
+ log.info("Pig progress = " + ((int) (queryProgress * 100)) + "%");
+ lastQueryProgress = queryProgress;
+ }
+ }
+ }
+
+ // bug 1030028: if the input file is empty; hadoop doesn't create the output file!
+ Path outputFile = conf.getOutputPath();
+ String outputName = outputFile.getName();
+ int colon = outputName.indexOf(':');
+ if (colon != -1) {
+ outputFile = new Path(outputFile.getParent(), outputName.substring(0, colon));
+ }
+
+ try {
+ ElementDescriptor descriptor =
+ ((HDataStorage)(pom.pigContext.getDfs())).asElement(outputFile.toString());
+
+ if (success && !descriptor.exists()) {
+
+ // create an empty output file
+ PigFile f = new PigFile(outputFile.toString(), false);
+ f.store(BagFactory.getInstance().newDefaultBag(),
+ new PigStorage(),
+ pom.pigContext);
+ }
+ }
+ catch (DataStorageException e) {
+ throw new IOException("Failed to obtain descriptor for " + outputFile.toString(), e);
+ }
+
+ if (!success) {
+ // go find the error messages
+ getErrorMessages(jobClient.getMapTaskReports(status.getJobID()),
+ "map", log);
+ getErrorMessages(jobClient.getReduceTaskReports(status.getJobID()),
+ "reduce", log);
+ }
+ else {
+ long timeSpent = 0;
+
+ // NOTE: this call is crashing due to a bug in Hadoop; the bug is known and the patch has not been applied yet.
+ TaskReport[] mapReports = jobClient.getMapTaskReports(status.getJobID());
+ TaskReport[] reduceReports = jobClient.getReduceTaskReports(status.getJobID());
+ for (TaskReport r : mapReports) {
+ timeSpent += (r.getFinishTime() - r.getStartTime());
+ }
+ for (TaskReport r : reduceReports) {
+ timeSpent += (r.getFinishTime() - r.getStartTime());
+ }
+ totalHadoopTimeSpent += timeSpent;
+ }
+ }
+ catch (Exception e) {
+ // Do we need different handling for different exceptions
+ e.printStackTrace();
+ throw new IOException(e.getMessage());
+ }
+ finally {
+ submitJarFile.delete();
+ }
+ return success;
+ }
+
+private void getErrorMessages(TaskReport reports[], String type, Logger log)
+{
+ for (int i = 0; i < reports.length; i++) {
+ String msgs[] = reports[i].getDiagnostics();
+ StringBuilder sb = new StringBuilder("Error message from task (" + type + ") " +
+ reports[i].getTaskId());
+ for (int j = 0; j < msgs.length; j++) {
+ sb.append(" " + msgs[j]);
+ }
+ log.error(sb.toString());
+ }
+}
+
+}