You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@crunch.apache.org by jw...@apache.org on 2013/04/23 22:41:36 UTC

[34/43] CRUNCH-196: crunch -> crunch-core rename to fix build issues

http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch-core/src/main/java/org/apache/crunch/hadoop/mapreduce/lib/jobcontrol/CrunchControlledJob.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/hadoop/mapreduce/lib/jobcontrol/CrunchControlledJob.java b/crunch-core/src/main/java/org/apache/crunch/hadoop/mapreduce/lib/jobcontrol/CrunchControlledJob.java
new file mode 100644
index 0000000..93926c1
--- /dev/null
+++ b/crunch-core/src/main/java/org/apache/crunch/hadoop/mapreduce/lib/jobcontrol/CrunchControlledJob.java
@@ -0,0 +1,325 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.hadoop.mapreduce.lib.jobcontrol;
+
+import java.io.IOException;
+import java.util.List;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.crunch.impl.mr.run.RuntimeParameters;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.JobID;
+import org.apache.hadoop.util.StringUtils;
+
+import com.google.common.base.Objects;
+import com.google.common.collect.Lists;
+
+/**
+ * This class encapsulates a MapReduce job and its dependency. It monitors the
+ * states of the depending jobs and updates the state of this job. A job starts
+ * in the WAITING state. If it does not have any depending jobs, or all of the
+ * depending jobs are in SUCCEEDED state, then the job state will become READY. If
+ * any depending jobs fail, the job will fail too. When in READY state, the job
+ * can be submitted to Hadoop for execution, with the state changing into
+ * RUNNING state. From RUNNING state, the job can get into SUCCEEDED or FAILED
+ * state, depending the status of the job execution.
+ */
+public class CrunchControlledJob {
+
+  // A job will be in one of the following states
+  public static enum State {
+    SUCCESS, WAITING, RUNNING, READY, FAILED, DEPENDENT_FAILED
+  };
+
+  public static interface Hook {
+    public void run() throws IOException;
+  }
+
+  private static final Log LOG = LogFactory.getLog(CrunchControlledJob.class);
+
+  private final int jobID;
+  private final Job job; // mapreduce job to be executed.
+  // the jobs the current job depends on
+  private final List<CrunchControlledJob> dependingJobs;
+  private final Hook prepareHook;
+  private final Hook completionHook;
+  private State state;
+  // some info for human consumption, e.g. the reason why the job failed
+  private String message;
+  private String lastKnownProgress;
+
+  /**
+   * Construct a job.
+   *
+   * @param jobID
+   *          an ID used to match with its {@link org.apache.crunch.impl.mr.plan.JobPrototype}.
+   * @param job
+   *          a mapreduce job to be executed.
+   * @param prepareHook
+   *          a piece of code that will run before this job is submitted.
+   * @param completionHook
+   *          a piece of code that will run after this job gets completed.
+   */
+  public CrunchControlledJob(int jobID, Job job, Hook prepareHook, Hook completionHook) {
+    this.jobID = jobID;
+    this.job = job;
+    this.dependingJobs = Lists.newArrayList();
+    this.prepareHook = prepareHook;
+    this.completionHook = completionHook;
+    this.state = State.WAITING;
+    this.message = "just initialized";
+  }
+
+  @Override
+  public String toString() {
+    StringBuffer sb = new StringBuffer();
+    sb.append("job name:\t").append(this.job.getJobName()).append("\n");
+    sb.append("job id:\t").append(this.jobID).append("\n");
+    sb.append("job state:\t").append(this.state).append("\n");
+    sb.append("job mapred id:\t").append(this.job.getJobID()).append("\n");
+    sb.append("job message:\t").append(this.message).append("\n");
+
+    if (this.dependingJobs == null || this.dependingJobs.size() == 0) {
+      sb.append("job has no depending job:\t").append("\n");
+    } else {
+      sb.append("job has ").append(this.dependingJobs.size())
+          .append(" dependeng jobs:\n");
+      for (int i = 0; i < this.dependingJobs.size(); i++) {
+        sb.append("\t depending job ").append(i).append(":\t");
+        sb.append((this.dependingJobs.get(i)).getJobName()).append("\n");
+      }
+    }
+    return sb.toString();
+  }
+
+  /**
+   * @return the job name of this job
+   */
+  public String getJobName() {
+    return job.getJobName();
+  }
+
+  /**
+   * Set the job name for this job.
+   *
+   * @param jobName
+   *          the job name
+   */
+  public void setJobName(String jobName) {
+    job.setJobName(jobName);
+  }
+
+  /**
+   * @return the job ID of this job
+   */
+  public int getJobID() {
+    return this.jobID;
+  }
+
+  /**
+   * @return the mapred ID of this job as assigned by the mapred framework.
+   */
+  public JobID getMapredJobID() {
+    return this.job.getJobID();
+  }
+
+  /**
+   * @return the mapreduce job
+   */
+  public synchronized Job getJob() {
+    return this.job;
+  }
+
+  /**
+   * @return the state of this job
+   */
+  public synchronized State getJobState() {
+    return this.state;
+  }
+
+  /**
+   * Set the state for this job.
+   * 
+   * @param state
+   *          the new state for this job.
+   */
+  protected synchronized void setJobState(State state) {
+    this.state = state;
+  }
+
+  /**
+   * @return the message of this job
+   */
+  public synchronized String getMessage() {
+    return this.message;
+  }
+
+  /**
+   * Set the message for this job.
+   * 
+   * @param message
+   *          the message for this job.
+   */
+  public synchronized void setMessage(String message) {
+    this.message = message;
+  }
+
+  /**
+   * @return the depending jobs of this job
+   */
+  public List<CrunchControlledJob> getDependentJobs() {
+    return this.dependingJobs;
+  }
+
+  /**
+   * Add a job to this jobs' dependency list. Dependent jobs can only be added
+   * while a Job is waiting to run, not during or afterwards.
+   * 
+   * @param dependingJob
+   *          Job that this Job depends on.
+   * @return <tt>true</tt> if the Job was added.
+   */
+  public synchronized boolean addDependingJob(CrunchControlledJob dependingJob) {
+    if (this.state == State.WAITING) { // only allowed to add jobs when waiting
+      return this.dependingJobs.add(dependingJob);
+    } else {
+      return false;
+    }
+  }
+
+  /**
+   * @return true if this job is in a complete state
+   */
+  public synchronized boolean isCompleted() {
+    return this.state == State.FAILED || this.state == State.DEPENDENT_FAILED
+        || this.state == State.SUCCESS;
+  }
+
+  /**
+   * @return true if this job is in READY state
+   */
+  public synchronized boolean isReady() {
+    return this.state == State.READY;
+  }
+
+  public void killJob() throws IOException, InterruptedException {
+    job.killJob();
+  }
+
+  /**
+   * Check the state of this running job. The state may remain the same, become
+   * SUCCEEDED or FAILED.
+   */
+  private void checkRunningState() throws IOException, InterruptedException {
+    try {
+      if (job.isComplete()) {
+        if (job.isSuccessful()) {
+          this.state = State.SUCCESS;
+        } else {
+          this.state = State.FAILED;
+          this.message = "Job failed!";
+        }
+      } else {
+        // still running
+        if (job.getConfiguration().getBoolean(RuntimeParameters.LOG_JOB_PROGRESS, false)) {
+          logJobProgress();
+        }
+      }
+    } catch (IOException ioe) {
+      this.state = State.FAILED;
+      this.message = StringUtils.stringifyException(ioe);
+      try {
+        if (job != null) {
+          job.killJob();
+        }
+      } catch (IOException e) {
+      }
+    }
+    if (isCompleted()) {
+      completionHook.run();
+    }
+  }
+
+  /**
+   * Check and update the state of this job. The state changes depending on its
+   * current state and the states of the depending jobs.
+   */
+  synchronized State checkState() throws IOException, InterruptedException {
+    if (this.state == State.RUNNING) {
+      checkRunningState();
+    }
+    if (this.state != State.WAITING) {
+      return this.state;
+    }
+    if (this.dependingJobs == null || this.dependingJobs.size() == 0) {
+      this.state = State.READY;
+      return this.state;
+    }
+    CrunchControlledJob pred = null;
+    int n = this.dependingJobs.size();
+    for (int i = 0; i < n; i++) {
+      pred = this.dependingJobs.get(i);
+      State s = pred.checkState();
+      if (s == State.WAITING || s == State.READY || s == State.RUNNING) {
+        break; // a pred is still not completed, continue in WAITING
+        // state
+      }
+      if (s == State.FAILED || s == State.DEPENDENT_FAILED) {
+        this.state = State.DEPENDENT_FAILED;
+        this.message = "depending job " + i + " with jobID " + pred.getJobID()
+            + " failed. " + pred.getMessage();
+        break;
+      }
+      // pred must be in success state
+      if (i == n - 1) {
+        this.state = State.READY;
+      }
+    }
+
+    return this.state;
+  }
+
+  /**
+   * Submit this job to mapred. The state becomes RUNNING if submission is
+   * successful, FAILED otherwise.
+   */
+  protected synchronized void submit() {
+    try {
+      prepareHook.run();
+      job.submit();
+      this.state = State.RUNNING;
+      LOG.info("Running job \"" + getJobName() + "\"");
+      LOG.info("Job status available at: " + job.getTrackingURL());
+    } catch (Exception ioe) {
+      this.state = State.FAILED;
+      this.message = StringUtils.stringifyException(ioe);
+      LOG.info("Error occurred starting job \"" + getJobName() + "\":");
+      LOG.info(getMessage());
+    }
+  }
+
+  private void logJobProgress() throws IOException, InterruptedException {
+    String progress = String.format("map %.0f%% reduce %.0f%%",
+        100.0 * job.mapProgress(), 100.0 * job.reduceProgress());
+    if (!Objects.equal(lastKnownProgress, progress)) {
+      LOG.info(job.getJobName() + " progress: " + progress);
+      lastKnownProgress = progress;
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch-core/src/main/java/org/apache/crunch/hadoop/mapreduce/lib/jobcontrol/CrunchJobControl.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/hadoop/mapreduce/lib/jobcontrol/CrunchJobControl.java b/crunch-core/src/main/java/org/apache/crunch/hadoop/mapreduce/lib/jobcontrol/CrunchJobControl.java
new file mode 100644
index 0000000..727ab6f
--- /dev/null
+++ b/crunch-core/src/main/java/org/apache/crunch/hadoop/mapreduce/lib/jobcontrol/CrunchJobControl.java
@@ -0,0 +1,211 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.hadoop.mapreduce.lib.jobcontrol;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Hashtable;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.crunch.hadoop.mapreduce.lib.jobcontrol.CrunchControlledJob.State;
+
+/**
+ * This class encapsulates a set of MapReduce jobs and its dependency.
+ * 
+ * It tracks the states of the jobs by placing them into different tables
+ * according to their states.
+ * 
+ * This class provides APIs for the client app to add a job to the group and to
+ * get the jobs in the group in different states. When a job is added, an ID
+ * unique to the group is assigned to the job.
+ */
+public class CrunchJobControl {
+
+  private Map<Integer, CrunchControlledJob> waitingJobs;
+  private Map<Integer, CrunchControlledJob> readyJobs;
+  private Map<Integer, CrunchControlledJob> runningJobs;
+  private Map<Integer, CrunchControlledJob> successfulJobs;
+  private Map<Integer, CrunchControlledJob> failedJobs;
+
+  private Log log = LogFactory.getLog(CrunchJobControl.class);
+
+  private final String groupName;
+
+  /**
+   * Construct a job control for a group of jobs.
+   * 
+   * @param groupName
+   *          a name identifying this group
+   */
+  public CrunchJobControl(String groupName) {
+    this.waitingJobs = new Hashtable<Integer, CrunchControlledJob>();
+    this.readyJobs = new Hashtable<Integer, CrunchControlledJob>();
+    this.runningJobs = new Hashtable<Integer, CrunchControlledJob>();
+    this.successfulJobs = new Hashtable<Integer, CrunchControlledJob>();
+    this.failedJobs = new Hashtable<Integer, CrunchControlledJob>();
+    this.groupName = groupName;
+  }
+
+  private static List<CrunchControlledJob> toList(Map<Integer, CrunchControlledJob> jobs) {
+    ArrayList<CrunchControlledJob> retv = new ArrayList<CrunchControlledJob>();
+    synchronized (jobs) {
+      for (CrunchControlledJob job : jobs.values()) {
+        retv.add(job);
+      }
+    }
+    return retv;
+  }
+
+  /**
+   * @return the jobs in the waiting state
+   */
+  public List<CrunchControlledJob> getWaitingJobList() {
+    return toList(this.waitingJobs);
+  }
+
+  /**
+   * @return the jobs in the running state
+   */
+  public List<CrunchControlledJob> getRunningJobList() {
+    return toList(this.runningJobs);
+  }
+
+  /**
+   * @return the jobs in the ready state
+   */
+  public List<CrunchControlledJob> getReadyJobsList() {
+    return toList(this.readyJobs);
+  }
+
+  /**
+   * @return the jobs in the success state
+   */
+  public List<CrunchControlledJob> getSuccessfulJobList() {
+    return toList(this.successfulJobs);
+  }
+
+  public List<CrunchControlledJob> getFailedJobList() {
+    return toList(this.failedJobs);
+  }
+
+  private static void addToQueue(CrunchControlledJob aJob,
+      Map<Integer, CrunchControlledJob> queue) {
+    synchronized (queue) {
+      queue.put(aJob.getJobID(), aJob);
+    }
+  }
+
+  private void addToQueue(CrunchControlledJob aJob) {
+    Map<Integer, CrunchControlledJob> queue = getQueue(aJob.getJobState());
+    addToQueue(aJob, queue);
+  }
+
+  private Map<Integer, CrunchControlledJob> getQueue(State state) {
+    Map<Integer, CrunchControlledJob> retv = null;
+    if (state == State.WAITING) {
+      retv = this.waitingJobs;
+    } else if (state == State.READY) {
+      retv = this.readyJobs;
+    } else if (state == State.RUNNING) {
+      retv = this.runningJobs;
+    } else if (state == State.SUCCESS) {
+      retv = this.successfulJobs;
+    } else if (state == State.FAILED || state == State.DEPENDENT_FAILED) {
+      retv = this.failedJobs;
+    }
+    return retv;
+  }
+
+  /**
+   * Add a new job.
+   * 
+   * @param aJob
+   *          the new job
+   */
+  synchronized public void addJob(CrunchControlledJob aJob) {
+    aJob.setJobState(State.WAITING);
+    this.addToQueue(aJob);
+  }
+
+  synchronized private void checkRunningJobs() throws IOException,
+      InterruptedException {
+
+    Map<Integer, CrunchControlledJob> oldJobs = null;
+    oldJobs = this.runningJobs;
+    this.runningJobs = new Hashtable<Integer, CrunchControlledJob>();
+
+    for (CrunchControlledJob nextJob : oldJobs.values()) {
+      nextJob.checkState();
+      this.addToQueue(nextJob);
+    }
+  }
+
+  synchronized private void checkWaitingJobs() throws IOException,
+      InterruptedException {
+    Map<Integer, CrunchControlledJob> oldJobs = null;
+    oldJobs = this.waitingJobs;
+    this.waitingJobs = new Hashtable<Integer, CrunchControlledJob>();
+
+    for (CrunchControlledJob nextJob : oldJobs.values()) {
+      nextJob.checkState();
+      this.addToQueue(nextJob);
+    }
+  }
+
+  synchronized private void startReadyJobs() {
+    Map<Integer, CrunchControlledJob> oldJobs = null;
+    oldJobs = this.readyJobs;
+    this.readyJobs = new Hashtable<Integer, CrunchControlledJob>();
+
+    for (CrunchControlledJob nextJob : oldJobs.values()) {
+      // Submitting Job to Hadoop
+      nextJob.submit();
+      this.addToQueue(nextJob);
+    }
+  }
+
+  synchronized public void killAllRunningJobs() {
+    for (CrunchControlledJob job : runningJobs.values()) {
+      if (!job.isCompleted()) {
+        try {
+          job.killJob();
+        } catch (Exception e) {
+          log.error("Exception killing job: " + job.getJobName(), e);
+        }
+      }
+    }
+  }
+
+  synchronized public boolean allFinished() {
+    return this.waitingJobs.size() == 0 && this.readyJobs.size() == 0
+        && this.runningJobs.size() == 0;
+  }
+
+  /**
+   * Checks the states of the running jobs Update the states of waiting jobs, and submits the jobs in
+   * ready state (i.e. whose dependencies are all finished in success).
+   */
+  public void pollJobStatusAndStartNewOnes() throws IOException, InterruptedException {
+    checkRunningJobs();
+    checkWaitingJobs();
+    startReadyJobs();
+  }
+}

http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch-core/src/main/java/org/apache/crunch/impl/SingleUseIterable.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/impl/SingleUseIterable.java b/crunch-core/src/main/java/org/apache/crunch/impl/SingleUseIterable.java
new file mode 100644
index 0000000..98f982f
--- /dev/null
+++ b/crunch-core/src/main/java/org/apache/crunch/impl/SingleUseIterable.java
@@ -0,0 +1,49 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.impl;
+
+import java.util.Iterator;
+
+/**
+ * Wrapper around a Reducer's input Iterable. Ensures that the
+ * {@link #iterator()} method is not called more than once.
+ */
+public class SingleUseIterable<T> implements Iterable<T> {
+
+  private boolean used = false;
+  private Iterable<T> wrappedIterable;
+
+  /**
+   * Instantiate around an Iterable that may only be used once.
+   * 
+   * @param toWrap iterable to wrap
+   */
+  public SingleUseIterable(Iterable<T> toWrap) {
+    this.wrappedIterable = toWrap;
+  }
+
+  @Override
+  public Iterator<T> iterator() {
+    if (used) {
+      throw new IllegalStateException("iterator() can only be called once on this Iterable");
+    }
+    used = true;
+    return wrappedIterable.iterator();
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch-core/src/main/java/org/apache/crunch/impl/mem/MemPipeline.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/impl/mem/MemPipeline.java b/crunch-core/src/main/java/org/apache/crunch/impl/mem/MemPipeline.java
new file mode 100644
index 0000000..272b2af
--- /dev/null
+++ b/crunch-core/src/main/java/org/apache/crunch/impl/mem/MemPipeline.java
@@ -0,0 +1,275 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.impl.mem;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.crunch.CrunchRuntimeException;
+import org.apache.crunch.PCollection;
+import org.apache.crunch.PTable;
+import org.apache.crunch.Pair;
+import org.apache.crunch.Pipeline;
+import org.apache.crunch.PipelineExecution;
+import org.apache.crunch.PipelineResult;
+import org.apache.crunch.Source;
+import org.apache.crunch.TableSource;
+import org.apache.crunch.Target;
+import org.apache.crunch.Target.WriteMode;
+import org.apache.crunch.impl.mem.collect.MemCollection;
+import org.apache.crunch.impl.mem.collect.MemTable;
+import org.apache.crunch.io.At;
+import org.apache.crunch.io.PathTarget;
+import org.apache.crunch.io.ReadableSource;
+import org.apache.crunch.types.PTableType;
+import org.apache.crunch.types.PType;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapreduce.Counters;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Sets;
+
+public class MemPipeline implements Pipeline {
+
+  private static final Log LOG = LogFactory.getLog(MemPipeline.class);
+  private static Counters COUNTERS = new Counters();
+  private static final MemPipeline INSTANCE = new MemPipeline();
+
+  private int outputIndex = 0;
+  
+  public static Counters getCounters() {
+    return COUNTERS;
+  }
+  
+  public static void clearCounters() {
+    COUNTERS = new Counters();
+  }
+
+  public static Pipeline getInstance() {
+    return INSTANCE;
+  }
+
+  public static <T> PCollection<T> collectionOf(T... ts) {
+    return new MemCollection<T>(ImmutableList.copyOf(ts));
+  }
+
+  public static <T> PCollection<T> collectionOf(Iterable<T> collect) {
+    return new MemCollection<T>(collect);
+  }
+
+  public static <T> PCollection<T> typedCollectionOf(PType<T> ptype, T... ts) {
+    return new MemCollection<T>(ImmutableList.copyOf(ts), ptype, null);
+  }
+
+  public static <T> PCollection<T> typedCollectionOf(PType<T> ptype, Iterable<T> collect) {
+    return new MemCollection<T>(collect, ptype, null);
+  }
+
+  public static <S, T> PTable<S, T> tableOf(S s, T t, Object... more) {
+    List<Pair<S, T>> pairs = Lists.newArrayList();
+    pairs.add(Pair.of(s, t));
+    for (int i = 0; i < more.length; i += 2) {
+      pairs.add(Pair.of((S) more[i], (T) more[i + 1]));
+    }
+    return new MemTable<S, T>(pairs);
+  }
+
+  public static <S, T> PTable<S, T> typedTableOf(PTableType<S, T> ptype, S s, T t, Object... more) {
+    List<Pair<S, T>> pairs = Lists.newArrayList();
+    pairs.add(Pair.of(s, t));
+    for (int i = 0; i < more.length; i += 2) {
+      pairs.add(Pair.of((S) more[i], (T) more[i + 1]));
+    }
+    return new MemTable<S, T>(pairs, ptype, null);
+  }
+
+  public static <S, T> PTable<S, T> tableOf(Iterable<Pair<S, T>> pairs) {
+    return new MemTable<S, T>(pairs);
+  }
+
+  public static <S, T> PTable<S, T> typedTableOf(PTableType<S, T> ptype, Iterable<Pair<S, T>> pairs) {
+    return new MemTable<S, T>(pairs, ptype, null);
+  }
+
+  private Configuration conf = new Configuration();
+  private Set<Target> activeTargets = Sets.newHashSet();
+  
+  private MemPipeline() {
+  }
+
+  @Override
+  public void setConfiguration(Configuration conf) {
+    this.conf = conf;
+  }
+
+  @Override
+  public Configuration getConfiguration() {
+    return conf;
+  }
+
+  @Override
+  public <T> PCollection<T> read(Source<T> source) {
+    if (source instanceof ReadableSource) {
+      try {
+        Iterable<T> iterable = ((ReadableSource<T>) source).read(conf);
+        return new MemCollection<T>(iterable, source.getType(), source.toString());
+      } catch (IOException e) {
+        LOG.error("Exception reading source: " + source.toString(), e);
+        throw new IllegalStateException(e);
+      }
+    }
+    LOG.error("Source " + source + " is not readable");
+    throw new IllegalStateException("Source " + source + " is not readable");
+  }
+
+  @Override
+  public <K, V> PTable<K, V> read(TableSource<K, V> source) {
+    if (source instanceof ReadableSource) {
+      try {
+        Iterable<Pair<K, V>> iterable = ((ReadableSource<Pair<K, V>>) source).read(conf);
+        return new MemTable<K, V>(iterable, source.getTableType(), source.toString());
+      } catch (IOException e) {
+        LOG.error("Exception reading source: " + source.toString(), e);
+        throw new IllegalStateException(e);
+      }
+    }
+    LOG.error("Source " + source + " is not readable");
+    throw new IllegalStateException("Source " + source + " is not readable");
+  }
+
+  @Override
+  public void write(PCollection<?> collection, Target target) {
+    write(collection, target, Target.WriteMode.DEFAULT);
+  }
+  
+  @Override
+  public void write(PCollection<?> collection, Target target,
+      Target.WriteMode writeMode) {
+    target.handleExisting(writeMode, getConfiguration());
+    if (writeMode != WriteMode.APPEND && activeTargets.contains(target)) {
+      throw new CrunchRuntimeException("Target " + target + " is already written in the current run." +
+          " Use WriteMode.APPEND in order to write additional data to it.");
+    }
+    activeTargets.add(target);
+    if (target instanceof PathTarget) {
+      Path path = ((PathTarget) target).getPath();
+      try {
+        FileSystem fs = path.getFileSystem(conf);
+        FSDataOutputStream os = fs.create(new Path(path, "out" + outputIndex));
+        outputIndex++;
+        if (collection instanceof PTable) {
+          for (Object o : collection.materialize()) {
+            Pair p = (Pair) o;
+            os.writeBytes(p.first().toString());
+            os.writeBytes("\t");
+            os.writeBytes(p.second().toString());
+            os.writeBytes("\r\n");
+          }
+        } else {
+          for (Object o : collection.materialize()) {
+            os.writeBytes(o.toString() + "\r\n");
+          }
+        }
+        os.close();
+      } catch (IOException e) {
+        LOG.error("Exception writing target: " + target, e);
+      }
+    } else {
+      LOG.error("Target " + target + " is not a PathTarget instance");
+    }
+  }
+
+  @Override
+  public PCollection<String> readTextFile(String pathName) {
+    return read(At.textFile(pathName));
+  }
+
+  @Override
+  public <T> void writeTextFile(PCollection<T> collection, String pathName) {
+    write(collection, At.textFile(pathName));
+  }
+
+  @Override
+  public <T> Iterable<T> materialize(PCollection<T> pcollection) {
+    return pcollection.materialize();
+  }
+
+  @Override
+  public PipelineExecution runAsync() {
+    activeTargets.clear();
+    return new PipelineExecution() {
+      @Override
+      public String getPlanDotFile() {
+        return "";
+      }
+
+      @Override
+      public void waitFor(long timeout, TimeUnit timeUnit) throws InterruptedException {
+        // no-po
+      }
+
+      @Override
+      public void waitUntilDone() throws InterruptedException {
+        // no-po
+      }
+
+      @Override
+      public Status getStatus() {
+        return Status.SUCCEEDED;
+      }
+
+      @Override
+      public PipelineResult getResult() {
+        return new PipelineResult(ImmutableList.of(new PipelineResult.StageResult("MemPipelineStage", COUNTERS)));
+      }
+
+      @Override
+      public void kill() {
+      }
+    };
+  }
+  
+  @Override
+  public PipelineResult run() {
+    activeTargets.clear();
+    return new PipelineResult(ImmutableList.of(new PipelineResult.StageResult("MemPipelineStage", COUNTERS)));
+  }
+
+  @Override
+  public PipelineResult done() {
+    return run();
+  }
+
+  @Override
+  public void enableDebug() {
+    LOG.info("Note: in-memory pipelines do not have debug logging");
+  }
+
+  @Override
+  public String getName() {
+    return "Memory Pipeline";
+  }
+}

http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch-core/src/main/java/org/apache/crunch/impl/mem/collect/MemCollection.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/impl/mem/collect/MemCollection.java b/crunch-core/src/main/java/org/apache/crunch/impl/mem/collect/MemCollection.java
new file mode 100644
index 0000000..c97fac6
--- /dev/null
+++ b/crunch-core/src/main/java/org/apache/crunch/impl/mem/collect/MemCollection.java
@@ -0,0 +1,295 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.impl.mem.collect;
+
+import java.lang.reflect.Method;
+import java.util.Collection;
+
+import javassist.util.proxy.MethodFilter;
+import javassist.util.proxy.MethodHandler;
+import javassist.util.proxy.ProxyFactory;
+
+import org.apache.crunch.DoFn;
+import org.apache.crunch.FilterFn;
+import org.apache.crunch.MapFn;
+import org.apache.crunch.PCollection;
+import org.apache.crunch.PObject;
+import org.apache.crunch.PTable;
+import org.apache.crunch.Pair;
+import org.apache.crunch.ParallelDoOptions;
+import org.apache.crunch.Pipeline;
+import org.apache.crunch.Target;
+import org.apache.crunch.fn.ExtractKeyFn;
+import org.apache.crunch.impl.mem.MemPipeline;
+import org.apache.crunch.impl.mem.emit.InMemoryEmitter;
+import org.apache.crunch.lib.Aggregate;
+import org.apache.crunch.materialize.pobject.CollectionPObject;
+import org.apache.crunch.types.PTableType;
+import org.apache.crunch.types.PType;
+import org.apache.crunch.types.PTypeFamily;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapreduce.OutputCommitter;
+import org.apache.hadoop.mapreduce.RecordWriter;
+import org.apache.hadoop.mapreduce.StatusReporter;
+import org.apache.hadoop.mapreduce.TaskAttemptID;
+import org.apache.hadoop.mapreduce.TaskInputOutputContext;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Lists;
+
+public class MemCollection<S> implements PCollection<S> {
+
+  private final Collection<S> collect;
+  private final PType<S> ptype;
+  private String name;
+
+  public MemCollection(Iterable<S> collect) {
+    this(collect, null, null);
+  }
+
+  public MemCollection(Iterable<S> collect, PType<S> ptype) {
+    this(collect, ptype, null);
+  }
+
+  public MemCollection(Iterable<S> collect, PType<S> ptype, String name) {
+    this.collect = ImmutableList.copyOf(collect);
+    this.ptype = ptype;
+    this.name = name;
+  }
+
+  @Override
+  public Pipeline getPipeline() {
+    return MemPipeline.getInstance();
+  }
+
+  @Override
+  public PCollection<S> union(PCollection<S> other) {
+    return union(new PCollection[] { other });
+  }
+  
+  @Override
+  public PCollection<S> union(PCollection<S>... collections) {
+    Collection<S> output = Lists.newArrayList();
+    for (PCollection<S> pcollect : collections) {
+      for (S s : pcollect.materialize()) {
+        output.add(s);
+      }
+    }
+    output.addAll(collect);
+    return new MemCollection<S>(output, collections[0].getPType());
+  }
+
+  @Override
+  public <T> PCollection<T> parallelDo(DoFn<S, T> doFn, PType<T> type) {
+    return parallelDo(null, doFn, type);
+  }
+
+  @Override
+  public <T> PCollection<T> parallelDo(String name, DoFn<S, T> doFn, PType<T> type) {
+    return parallelDo(name, doFn, type, ParallelDoOptions.builder().build());
+  }
+  
+  @Override
+  public <T> PCollection<T> parallelDo(String name, DoFn<S, T> doFn, PType<T> type,
+      ParallelDoOptions options) {
+    InMemoryEmitter<T> emitter = new InMemoryEmitter<T>();
+    doFn.setContext(getInMemoryContext(getPipeline().getConfiguration()));
+    doFn.initialize();
+    for (S s : collect) {
+      doFn.process(s, emitter);
+    }
+    doFn.cleanup(emitter);
+    return new MemCollection<T>(emitter.getOutput(), type, name);
+  }
+
+  @Override
+  public <K, V> PTable<K, V> parallelDo(DoFn<S, Pair<K, V>> doFn, PTableType<K, V> type) {
+    return parallelDo(null, doFn, type);
+  }
+
+  @Override
+  public <K, V> PTable<K, V> parallelDo(String name, DoFn<S, Pair<K, V>> doFn, PTableType<K, V> type) {
+    return parallelDo(name, doFn, type, ParallelDoOptions.builder().build());
+  }
+  
+  @Override
+  public <K, V> PTable<K, V> parallelDo(String name, DoFn<S, Pair<K, V>> doFn, PTableType<K, V> type,
+      ParallelDoOptions options) {
+    InMemoryEmitter<Pair<K, V>> emitter = new InMemoryEmitter<Pair<K, V>>();
+    doFn.setContext(getInMemoryContext(getPipeline().getConfiguration()));
+    doFn.initialize();
+    for (S s : collect) {
+      doFn.process(s, emitter);
+    }
+    doFn.cleanup(emitter);
+    return new MemTable<K, V>(emitter.getOutput(), type, name);
+  }
+
+  @Override
+  public PCollection<S> write(Target target) {
+    getPipeline().write(this, target);
+    return this;
+  }
+
+  @Override
+  public PCollection<S> write(Target target, Target.WriteMode writeMode) {
+    getPipeline().write(this, target, writeMode);
+    return this;
+  }
+
+  @Override
+  public Iterable<S> materialize() {
+    return collect;
+  }
+
+  /** {@inheritDoc} */
+  @Override
+  public PObject<Collection<S>> asCollection() {
+    return new CollectionPObject<S>(this);
+  }
+
+  public Collection<S> getCollection() {
+    return collect;
+  }
+
+  @Override
+  public PType<S> getPType() {
+    return ptype;
+  }
+
+  @Override
+  public PTypeFamily getTypeFamily() {
+    if (ptype != null) {
+      return ptype.getFamily();
+    }
+    return null;
+  }
+
+  @Override
+  public long getSize() {
+    return collect.isEmpty() ? 0 : 1; // getSize is only used for pipeline optimization in MR
+  }
+
+  @Override
+  public String getName() {
+    return name;
+  }
+
+  @Override
+  public String toString() {
+    return collect.toString();
+  }
+
+  @Override
+  public PTable<S, Long> count() {
+    return Aggregate.count(this);
+  }
+
+  @Override
+  public PObject<Long> length() {
+    return Aggregate.length(this);
+  }
+
+  @Override
+  public PObject<S> max() {
+    return Aggregate.max(this);
+  }
+
+  @Override
+  public PObject<S> min() {
+    return Aggregate.min(this);
+  }
+
+  @Override
+  public PCollection<S> filter(FilterFn<S> filterFn) {
+    return parallelDo(filterFn, getPType());
+  }
+
+  @Override
+  public PCollection<S> filter(String name, FilterFn<S> filterFn) {
+    return parallelDo(name, filterFn, getPType());
+  }
+
+  @Override
+  public <K> PTable<K, S> by(MapFn<S, K> mapFn, PType<K> keyType) {
+    return parallelDo(new ExtractKeyFn<K, S>(mapFn), getTypeFamily().tableOf(keyType, getPType()));
+  }
+
+  @Override
+  public <K> PTable<K, S> by(String name, MapFn<S, K> mapFn, PType<K> keyType) {
+    return parallelDo(name, new ExtractKeyFn<K, S>(mapFn), getTypeFamily().tableOf(keyType, getPType()));
+  }
+
+  /**
+   * The method creates a {@link TaskInputOutputContext} that will just provide
+   * {@linkplain Configuration}. The method has been implemented with javaassist
+   * as there are API changes in versions of Hadoop. In hadoop 1.0.3 the
+   * {@linkplain TaskInputOutputContext} is abstract class while in version 2
+   * the same is an interface.
+   * <p>
+   * Note: The intention of this is to provide the bare essentials that are
+   * required to make the {@linkplain MemPipeline} work. It lacks even the basic
+   * things that can proved some support for unit testing pipeline.
+   */
+  private static TaskInputOutputContext<?, ?, ?, ?> getInMemoryContext(final Configuration conf) {
+    ProxyFactory factory = new ProxyFactory();
+    Class<TaskInputOutputContext> superType = TaskInputOutputContext.class;
+    Class[] types = new Class[0];
+    Object[] args = new Object[0];
+    if (superType.isInterface()) {
+      factory.setInterfaces(new Class[] { superType });
+    } else {
+      types = new Class[] { Configuration.class, TaskAttemptID.class, RecordWriter.class, OutputCommitter.class,
+          StatusReporter.class };
+      args = new Object[] { conf, new TaskAttemptID(), null, null, null };
+      factory.setSuperclass(superType);
+    }
+    factory.setFilter(new MethodFilter() {
+      @Override
+      public boolean isHandled(Method m) {
+        String name = m.getName();
+        return "getConfiguration".equals(name) || "getCounter".equals(name) || "progress".equals(name);
+      }
+    });
+    MethodHandler handler = new MethodHandler() {
+      @Override
+      public Object invoke(Object arg0, Method m, Method arg2, Object[] args) throws Throwable {
+        String name = m.getName();
+        if ("getConfiguration".equals(name)) {
+          return conf;
+        } else if ("progress".equals(name)) {
+          // no-op
+          return null;
+        } else { // getCounter
+          if (args.length == 1) {
+            return MemPipeline.getCounters().findCounter((Enum<?>) args[0]);
+          } else {
+            return MemPipeline.getCounters().findCounter((String) args[0], (String) args[1]);
+          }
+        }
+      }
+    };
+    try {
+      Object newInstance = factory.create(types, args, handler);
+      return (TaskInputOutputContext<?, ?, ?, ?>) newInstance;
+    } catch (Exception e) {
+      e.printStackTrace();
+      throw new RuntimeException(e);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch-core/src/main/java/org/apache/crunch/impl/mem/collect/MemGroupedTable.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/impl/mem/collect/MemGroupedTable.java b/crunch-core/src/main/java/org/apache/crunch/impl/mem/collect/MemGroupedTable.java
new file mode 100644
index 0000000..d105bb4
--- /dev/null
+++ b/crunch-core/src/main/java/org/apache/crunch/impl/mem/collect/MemGroupedTable.java
@@ -0,0 +1,113 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.impl.mem.collect;
+
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.TreeMap;
+
+import org.apache.crunch.Aggregator;
+import org.apache.crunch.CombineFn;
+import org.apache.crunch.GroupingOptions;
+import org.apache.crunch.PCollection;
+import org.apache.crunch.PGroupedTable;
+import org.apache.crunch.PTable;
+import org.apache.crunch.Pair;
+import org.apache.crunch.Pipeline;
+import org.apache.crunch.Target;
+import org.apache.crunch.fn.Aggregators;
+import org.apache.crunch.types.PTableType;
+import org.apache.crunch.types.PType;
+import org.apache.crunch.types.PTypeFamily;
+import org.apache.hadoop.io.RawComparator;
+import org.apache.hadoop.util.ReflectionUtils;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+
+class MemGroupedTable<K, V> extends MemCollection<Pair<K, Iterable<V>>> implements PGroupedTable<K, V> {
+
+  private final MemTable<K, V> parent;
+
+  private static <S, T> Iterable<Pair<S, Iterable<T>>> buildMap(MemTable<S, T> parent, GroupingOptions options) {
+    PType<S> keyType = parent.getKeyType();
+    Shuffler<S, T> shuffler = Shuffler.create(keyType, options, parent.getPipeline());
+
+    for (Pair<S, T> pair : parent.materialize()) {
+      shuffler.add(pair);
+    }
+
+    return shuffler;
+  }
+
+  public MemGroupedTable(MemTable<K, V> parent, GroupingOptions options) {
+    super(buildMap(parent, options));
+    this.parent = parent;
+  }
+
+  @Override
+  public PCollection<Pair<K, Iterable<V>>> union(PCollection<Pair<K, Iterable<V>>>... collections) {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public PCollection<Pair<K, Iterable<V>>> write(Target target) {
+    getPipeline().write(this.ungroup(), target);
+    return this;
+  }
+
+  @Override
+  public PType<Pair<K, Iterable<V>>> getPType() {
+    PTableType<K, V> parentType = parent.getPTableType();
+    if (parentType != null) {
+      return parentType.getGroupedTableType();
+    }
+    return null;
+  }
+
+  @Override
+  public PTypeFamily getTypeFamily() {
+    return parent.getTypeFamily();
+  }
+
+  @Override
+  public long getSize() {
+    return 1; // getSize is only used for pipeline optimization in MR
+  }
+
+  @Override
+  public String getName() {
+    return "MemGrouped(" + parent.getName() + ")";
+  }
+
+  @Override
+  public PTable<K, V> combineValues(CombineFn<K, V> combineFn) {
+    return parallelDo(combineFn, parent.getPTableType());
+  }
+
+  @Override
+  public PTable<K, V> combineValues(Aggregator<V> agg) {
+    return combineValues(Aggregators.<K, V>toCombineFn(agg));
+  }
+
+  @Override
+  public PTable<K, V> ungroup() {
+    return parent;
+  }
+}

http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch-core/src/main/java/org/apache/crunch/impl/mem/collect/MemTable.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/impl/mem/collect/MemTable.java b/crunch-core/src/main/java/org/apache/crunch/impl/mem/collect/MemTable.java
new file mode 100644
index 0000000..f8a5960
--- /dev/null
+++ b/crunch-core/src/main/java/org/apache/crunch/impl/mem/collect/MemTable.java
@@ -0,0 +1,177 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.impl.mem.collect;
+
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.crunch.FilterFn;
+import org.apache.crunch.GroupingOptions;
+import org.apache.crunch.PCollection;
+import org.apache.crunch.PGroupedTable;
+import org.apache.crunch.PObject;
+import org.apache.crunch.PTable;
+import org.apache.crunch.Pair;
+import org.apache.crunch.Target;
+import org.apache.crunch.lib.Aggregate;
+import org.apache.crunch.lib.Cogroup;
+import org.apache.crunch.lib.Join;
+import org.apache.crunch.lib.PTables;
+import org.apache.crunch.materialize.MaterializableMap;
+import org.apache.crunch.materialize.pobject.MapPObject;
+import org.apache.crunch.types.PTableType;
+import org.apache.crunch.types.PType;
+
+import com.google.common.collect.Lists;
+
+public class MemTable<K, V> extends MemCollection<Pair<K, V>> implements PTable<K, V> {
+
+  private PTableType<K, V> ptype;
+
+  public MemTable(Iterable<Pair<K, V>> collect) {
+    this(collect, null, null);
+  }
+
+  public MemTable(Iterable<Pair<K, V>> collect, PTableType<K, V> ptype, String name) {
+    super(collect, ptype, name);
+    this.ptype = ptype;
+  }
+
+  @Override
+  public PTable<K, V> union(PTable<K, V> other) {
+    return union(new PTable[] { other });
+  }
+  
+  @Override
+  public PTable<K, V> union(PTable<K, V>... others) {
+    List<Pair<K, V>> values = Lists.newArrayList();
+    values.addAll(getCollection());
+    for (PTable<K, V> ptable : others) {
+      for (Pair<K, V> p : ptable.materialize()) {
+        values.add(p);
+      }
+    }
+    return new MemTable<K, V>(values, others[0].getPTableType(), null);
+  }
+
+  @Override
+  public PGroupedTable<K, V> groupByKey() {
+    return groupByKey(null);
+  }
+
+  @Override
+  public PGroupedTable<K, V> groupByKey(int numPartitions) {
+    return groupByKey(null);
+  }
+
+  @Override
+  public PGroupedTable<K, V> groupByKey(GroupingOptions options) {
+    return new MemGroupedTable<K, V>(this, options);
+  }
+
+  @Override
+  public PTable<K, V> write(Target target) {
+    super.write(target);
+    return this;
+  }
+
+  @Override
+  public PTable<K, V> write(Target target, Target.WriteMode writeMode) {
+    getPipeline().write(this, target, writeMode);
+    return this;
+  }
+  
+  @Override
+  public PTableType<K, V> getPTableType() {
+    return ptype;
+  }
+
+  @Override
+  public PType<K> getKeyType() {
+    if (ptype != null) {
+      return ptype.getKeyType();
+    }
+    return null;
+  }
+
+  @Override
+  public PType<V> getValueType() {
+    if (ptype != null) {
+      return ptype.getValueType();
+    }
+    return null;
+  }
+
+  @Override
+  public PTable<K, V> filter(FilterFn<Pair<K, V>> filterFn) {
+    return parallelDo(filterFn, getPTableType());
+  }
+  
+  @Override
+  public PTable<K, V> filter(String name, FilterFn<Pair<K, V>> filterFn) {
+    return parallelDo(name, filterFn, getPTableType());
+  }
+
+  @Override
+  public PTable<K, V> top(int count) {
+    return Aggregate.top(this, count, true);
+  }
+
+  @Override
+  public PTable<K, V> bottom(int count) {
+    return Aggregate.top(this, count, false);
+  }
+
+  @Override
+  public PTable<K, Collection<V>> collectValues() {
+    return Aggregate.collectValues(this);
+  }
+
+  @Override
+  public <U> PTable<K, Pair<V, U>> join(PTable<K, U> other) {
+    return Join.join(this, other);
+  }
+
+  @Override
+  public <U> PTable<K, Pair<Collection<V>, Collection<U>>> cogroup(PTable<K, U> other) {
+    return Cogroup.cogroup(this, other);
+  }
+
+  @Override
+  public PCollection<K> keys() {
+    return PTables.keys(this);
+  }
+
+  @Override
+  public PCollection<V> values() {
+    return PTables.values(this);
+  }
+
+  @Override
+  public Map<K, V> materializeToMap() {
+    return new MaterializableMap<K, V>(this.materialize());
+  }
+
+  /** {@inheritDoc} */
+  @Override
+  public PObject<Map<K, V>> asMap() {
+    return new MapPObject<K, V>(this);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch-core/src/main/java/org/apache/crunch/impl/mem/collect/Shuffler.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/impl/mem/collect/Shuffler.java b/crunch-core/src/main/java/org/apache/crunch/impl/mem/collect/Shuffler.java
new file mode 100644
index 0000000..2e8f9eb
--- /dev/null
+++ b/crunch-core/src/main/java/org/apache/crunch/impl/mem/collect/Shuffler.java
@@ -0,0 +1,149 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.impl.mem.collect;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.TreeMap;
+
+import org.apache.crunch.GroupingOptions;
+import org.apache.crunch.Pair;
+import org.apache.crunch.Pipeline;
+import org.apache.crunch.impl.SingleUseIterable;
+import org.apache.crunch.types.PType;
+import org.apache.hadoop.io.RawComparator;
+import org.apache.hadoop.util.ReflectionUtils;
+
+import com.google.common.base.Function;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Iterators;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+
+/**
+ * In-memory versions of common MapReduce patterns for aggregating key-value data.
+ */
+abstract class Shuffler<K, V> implements Iterable<Pair<K, Iterable<V>>> {
+
+  public abstract void add(Pair<K, V> record);
+  
+  private static <K, V> Map<K, V> getMapForKeyType(PType<?> ptype) {
+    if (ptype != null && Comparable.class.isAssignableFrom(ptype.getTypeClass())) {
+      return new TreeMap<K, V>();
+    } else {
+      return Maps.newHashMap();
+    }
+  }
+  
+  public static <S, T> Shuffler<S, T> create(PType<S> keyType, GroupingOptions options,
+      Pipeline pipeline) {
+    Map<S, Collection<T>> map = getMapForKeyType(keyType);
+    
+    if (options != null) {
+      if (Pair.class.equals(keyType.getTypeClass()) && options.getGroupingComparatorClass() != null) {
+        PType<?> pairKey = keyType.getSubTypes().get(0);
+        return new SecondarySortShuffler(getMapForKeyType(pairKey));
+      } else if (options.getSortComparatorClass() != null) {
+        RawComparator<S> rc = ReflectionUtils.newInstance(options.getSortComparatorClass(),
+            pipeline.getConfiguration());
+        map = new TreeMap<S, Collection<T>>(rc);
+      }
+    }
+    
+    return new MapShuffler<S, T>(map);
+  }
+  
+  private static class HFunction<K, V> implements Function<Map.Entry<K, Collection<V>>, Pair<K, Iterable<V>>> {
+    @Override
+    public Pair<K, Iterable<V>> apply(Map.Entry<K, Collection<V>> input) {
+      return Pair.<K, Iterable<V>>of(input.getKey(), new SingleUseIterable<V>(input.getValue()));
+    }
+  }
+  
+  private static class MapShuffler<K, V> extends Shuffler<K, V> {
+    private final Map<K, Collection<V>> map;
+    
+    public MapShuffler(Map<K, Collection<V>> map) {
+      this.map = map;
+    }
+    
+    @Override
+    public Iterator<Pair<K, Iterable<V>>> iterator() {
+      return Iterators.transform(map.entrySet().iterator(),
+          new HFunction<K, V>());
+    }
+
+    @Override
+    public void add(Pair<K, V> record) {
+      if (!map.containsKey(record.first())) {
+        Collection<V> values = Lists.newArrayList();
+        map.put(record.first(), values);
+      }
+      map.get(record.first()).add(record.second());
+    }
+  }
+
+  private static class SSFunction<K, SK, V> implements
+      Function<Map.Entry<K, List<Pair<SK, V>>>, Pair<Pair<K, SK>, Iterable<V>>> {
+    @Override
+    public Pair<Pair<K, SK>, Iterable<V>> apply(Entry<K, List<Pair<SK, V>>> input) {
+      List<Pair<SK, V>> values = input.getValue();
+      Collections.sort(values, new Comparator<Pair<SK, V>>() {
+        @Override
+        public int compare(Pair<SK, V> o1, Pair<SK, V> o2) {
+          return ((Comparable) o1.first()).compareTo(o2.first());
+        }
+      });
+      Pair<K, SK> key = Pair.of(input.getKey(), values.get(0).first());
+      return Pair.of(key, Iterables.transform(values, new Function<Pair<SK, V>, V>() {
+        @Override
+        public V apply(Pair<SK, V> input) {
+          return input.second();
+        }
+      }));
+    }
+  }
+
+  private static class SecondarySortShuffler<K, SK, V> extends Shuffler<Pair<K, SK>, V> {
+
+    private Map<K, List<Pair<SK, V>>> map;
+    
+    public SecondarySortShuffler(Map<K, List<Pair<SK, V>>> map) {
+      this.map = map;
+    }
+    
+    @Override
+    public Iterator<Pair<Pair<K, SK>, Iterable<V>>> iterator() {
+      return Iterators.transform(map.entrySet().iterator(), new SSFunction<K, SK, V>());
+    }
+
+    @Override
+    public void add(Pair<Pair<K, SK>, V> record) {
+      K primary = record.first().first();
+      if (!map.containsKey(primary)) {
+        map.put(primary, Lists.<Pair<SK, V>>newArrayList());
+      }
+      map.get(primary).add(Pair.of(record.first().second(), record.second()));
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch-core/src/main/java/org/apache/crunch/impl/mem/emit/InMemoryEmitter.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/impl/mem/emit/InMemoryEmitter.java b/crunch-core/src/main/java/org/apache/crunch/impl/mem/emit/InMemoryEmitter.java
new file mode 100644
index 0000000..6976615
--- /dev/null
+++ b/crunch-core/src/main/java/org/apache/crunch/impl/mem/emit/InMemoryEmitter.java
@@ -0,0 +1,57 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.impl.mem.emit;
+
+import java.util.List;
+
+import org.apache.crunch.Emitter;
+
+import com.google.common.collect.Lists;
+
+/**
+ * An {@code Emitter} instance that writes emitted records to a backing
+ * {@code List}.
+ * 
+ * @param <T>
+ */
+public class InMemoryEmitter<T> implements Emitter<T> {
+
+  private final List<T> output;
+
+  public InMemoryEmitter() {
+    this(Lists.<T> newArrayList());
+  }
+
+  public InMemoryEmitter(List<T> output) {
+    this.output = output;
+  }
+
+  @Override
+  public void emit(T emitted) {
+    output.add(emitted);
+  }
+
+  @Override
+  public void flush() {
+
+  }
+
+  public List<T> getOutput() {
+    return output;
+  }
+}

http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch-core/src/main/java/org/apache/crunch/impl/mem/package-info.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/impl/mem/package-info.java b/crunch-core/src/main/java/org/apache/crunch/impl/mem/package-info.java
new file mode 100644
index 0000000..a55b673
--- /dev/null
+++ b/crunch-core/src/main/java/org/apache/crunch/impl/mem/package-info.java
@@ -0,0 +1,22 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * In-memory Pipeline implementation for rapid prototyping and testing.
+ */
+package org.apache.crunch.impl.mem;

http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch-core/src/main/java/org/apache/crunch/impl/mr/MRPipeline.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/impl/mr/MRPipeline.java b/crunch-core/src/main/java/org/apache/crunch/impl/mr/MRPipeline.java
new file mode 100644
index 0000000..00cf486
--- /dev/null
+++ b/crunch-core/src/main/java/org/apache/crunch/impl/mr/MRPipeline.java
@@ -0,0 +1,396 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.impl.mr;
+
+import java.io.IOException;
+import java.util.Map;
+import java.util.Random;
+import java.util.Set;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.crunch.CrunchRuntimeException;
+import org.apache.crunch.MapFn;
+import org.apache.crunch.PCollection;
+import org.apache.crunch.PTable;
+import org.apache.crunch.Pipeline;
+import org.apache.crunch.PipelineExecution;
+import org.apache.crunch.PipelineResult;
+import org.apache.crunch.Source;
+import org.apache.crunch.SourceTarget;
+import org.apache.crunch.TableSource;
+import org.apache.crunch.Target;
+import org.apache.crunch.Target.WriteMode;
+import org.apache.crunch.fn.IdentityFn;
+import org.apache.crunch.impl.mr.collect.InputCollection;
+import org.apache.crunch.impl.mr.collect.InputTable;
+import org.apache.crunch.impl.mr.collect.PCollectionImpl;
+import org.apache.crunch.impl.mr.collect.PGroupedTableImpl;
+import org.apache.crunch.impl.mr.collect.UnionCollection;
+import org.apache.crunch.impl.mr.collect.UnionTable;
+import org.apache.crunch.impl.mr.exec.MRExecutor;
+import org.apache.crunch.impl.mr.plan.MSCRPlanner;
+import org.apache.crunch.impl.mr.run.RuntimeParameters;
+import org.apache.crunch.io.From;
+import org.apache.crunch.io.ReadableSource;
+import org.apache.crunch.io.ReadableSourceTarget;
+import org.apache.crunch.io.To;
+import org.apache.crunch.materialize.MaterializableIterable;
+import org.apache.crunch.types.PType;
+import org.apache.crunch.types.writable.Writables;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+
+import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
+
+/**
+ * Pipeline implementation that is executed within Hadoop MapReduce.
+ */
+public class MRPipeline implements Pipeline {
+
+  private static final Log LOG = LogFactory.getLog(MRPipeline.class);
+
+  private static final Random RANDOM = new Random();
+
+  private final Class<?> jarClass;
+  private final String name;
+  private final Map<PCollectionImpl<?>, Set<Target>> outputTargets;
+  private final Map<PCollectionImpl<?>, MaterializableIterable<?>> outputTargetsToMaterialize;
+  private Path tempDirectory;
+  private int tempFileIndex;
+  private int nextAnonymousStageId;
+
+  private Configuration conf;
+
+  /**
+   * Instantiate with a default Configuration and name.
+   * 
+   * @param jarClass Class containing the main driver method for running the pipeline
+   */
+  public MRPipeline(Class<?> jarClass) {
+    this(jarClass, new Configuration());
+  }
+
+  /**
+   * Instantiate with a custom pipeline name. The name will be displayed in the Hadoop JobTracker.
+   * 
+   * @param jarClass Class containing the main driver method for running the pipeline
+   * @param name Display name of the pipeline
+   */
+  public MRPipeline(Class<?> jarClass, String name) {
+    this(jarClass, name, new Configuration());
+  }
+
+  /**
+   * Instantiate with a custom configuration and default naming.
+   * 
+   * @param jarClass Class containing the main driver method for running the pipeline
+   * @param conf Configuration to be used within all MapReduce jobs run in the pipeline
+   */
+  public MRPipeline(Class<?> jarClass, Configuration conf) {
+    this(jarClass, jarClass.getName(), conf);
+  }
+
+  /**
+   * Instantiate with a custom name and configuration. The name will be displayed in the Hadoop
+   * JobTracker.
+   * 
+   * @param jarClass Class containing the main driver method for running the pipeline
+   * @param name Display name of the pipeline
+   * @param conf Configuration to be used within all MapReduce jobs run in the pipeline
+   */
+  public MRPipeline(Class<?> jarClass, String name, Configuration conf) {
+    this.jarClass = jarClass;
+    this.name = name;
+    this.outputTargets = Maps.newHashMap();
+    this.outputTargetsToMaterialize = Maps.newHashMap();
+    this.conf = conf;
+    this.tempDirectory = createTempDirectory(conf);
+    this.tempFileIndex = 0;
+    this.nextAnonymousStageId = 0;
+  }
+
+  @Override
+  public Configuration getConfiguration() {
+    return conf;
+  }
+
+  @Override
+  public void setConfiguration(Configuration conf) {
+    this.conf = conf;
+    this.tempDirectory = createTempDirectory(conf);
+  }
+
+  public MRExecutor plan() {
+    Map<PCollectionImpl<?>, MaterializableIterable> toMaterialize = Maps.newHashMap();
+    for (PCollectionImpl<?> c : outputTargets.keySet()) {
+      if (outputTargetsToMaterialize.containsKey(c)) {
+        toMaterialize.put(c, outputTargetsToMaterialize.get(c));
+        outputTargetsToMaterialize.remove(c);
+      }
+    }
+    MSCRPlanner planner = new MSCRPlanner(this, outputTargets, toMaterialize);
+    try {
+      return planner.plan(jarClass, conf);
+    } catch (IOException e) {
+      throw new CrunchRuntimeException(e);
+    }
+  }
+
+  @Override
+  public PipelineResult run() {
+    try {
+      PipelineExecution pipelineExecution = runAsync();
+      pipelineExecution.waitUntilDone();
+      return pipelineExecution.getResult();
+    } catch (InterruptedException e) {
+      // TODO: How to handle this without changing signature?
+      LOG.error("Exception running pipeline", e);
+      return PipelineResult.EMPTY;
+    }
+  }
+  
+  @Override
+  public PipelineExecution runAsync() {
+    PipelineExecution res = plan().execute();
+    outputTargets.clear();
+    return res;
+  }
+
+  @Override
+  public PipelineResult done() {
+    PipelineResult res = null;
+    if (!outputTargets.isEmpty()) {
+      res = run();
+    }
+    cleanup();
+    return res;
+  }
+
+  public <S> PCollection<S> read(Source<S> source) {
+    return new InputCollection<S>(source, this);
+  }
+
+  public <K, V> PTable<K, V> read(TableSource<K, V> source) {
+    return new InputTable<K, V>(source, this);
+  }
+
+  public PCollection<String> readTextFile(String pathName) {
+    return read(From.textFile(pathName));
+  }
+
+  public void write(PCollection<?> pcollection, Target target) {
+    write(pcollection, target, Target.WriteMode.DEFAULT);
+  }
+  
+  @SuppressWarnings("unchecked")
+  public void write(PCollection<?> pcollection, Target target,
+      Target.WriteMode writeMode) {
+    if (pcollection instanceof PGroupedTableImpl) {
+      pcollection = ((PGroupedTableImpl<?, ?>) pcollection).ungroup();
+    } else if (pcollection instanceof UnionCollection || pcollection instanceof UnionTable) {
+      pcollection = pcollection.parallelDo("UnionCollectionWrapper",
+          (MapFn) IdentityFn.<Object> getInstance(), pcollection.getPType());
+    }
+    target.handleExisting(writeMode, getConfiguration());
+    if (writeMode != WriteMode.APPEND && targetInCurrentRun(target)) {
+      throw new CrunchRuntimeException("Target " + target + " is already written in current run." +
+          " Use WriteMode.APPEND in order to write additional data to it.");
+    }
+    addOutput((PCollectionImpl<?>) pcollection, target);
+  }
+
+  private boolean targetInCurrentRun(Target target) {
+    for (Set<Target> targets : outputTargets.values()) {
+      if (targets.contains(target)) {
+        return true;
+      }
+    }
+    return false;
+  }
+  
+  private void addOutput(PCollectionImpl<?> impl, Target target) {
+    if (!outputTargets.containsKey(impl)) {
+      outputTargets.put(impl, Sets.<Target> newHashSet());
+    }
+    outputTargets.get(impl).add(target);
+  }
+
+  @Override
+  public <T> Iterable<T> materialize(PCollection<T> pcollection) {
+
+    PCollectionImpl<T> pcollectionImpl = toPcollectionImpl(pcollection);
+    ReadableSource<T> readableSrc = getMaterializeSourceTarget(pcollectionImpl);
+
+    MaterializableIterable<T> c = new MaterializableIterable<T>(this, readableSrc);
+    if (!outputTargetsToMaterialize.containsKey(pcollectionImpl)) {
+      outputTargetsToMaterialize.put(pcollectionImpl, c);
+    }
+    return c;
+  }
+
+  /**
+   * Retrieve a ReadableSourceTarget that provides access to the contents of a {@link PCollection}.
+   * This is primarily intended as a helper method to {@link #materialize(PCollection)}. The
+   * underlying data of the ReadableSourceTarget may not be actually present until the pipeline is
+   * run.
+   * 
+   * @param pcollection The collection for which the ReadableSourceTarget is to be retrieved
+   * @return The ReadableSourceTarget
+   * @throws IllegalArgumentException If no ReadableSourceTarget can be retrieved for the given
+   *           PCollection
+   */
+  public <T> ReadableSource<T> getMaterializeSourceTarget(PCollection<T> pcollection) {
+    PCollectionImpl<T> impl = toPcollectionImpl(pcollection);
+
+    // First, check to see if this is a readable input collection.
+    if (impl instanceof InputCollection) {
+      InputCollection<T> ic = (InputCollection<T>) impl;
+      if (ic.getSource() instanceof ReadableSource) {
+        return (ReadableSource) ic.getSource();
+      } else {
+        throw new IllegalArgumentException(
+            "Cannot materialize non-readable input collection: " + ic);
+      }
+    } else if (impl instanceof InputTable) {
+      InputTable it = (InputTable) impl;
+      if (it.getSource() instanceof ReadableSource) {
+        return (ReadableSource) it.getSource();
+      } else {
+        throw new IllegalArgumentException(
+            "Cannot materialize non-readable input table: " + it);
+      }
+    }
+
+    // Next, check to see if this pcollection has already been materialized.
+    SourceTarget<T> matTarget = impl.getMaterializedAt();
+    if (matTarget != null && matTarget instanceof ReadableSourceTarget) {
+      return (ReadableSourceTarget<T>) matTarget;
+    }
+    
+    // Check to see if we plan on materializing this collection on the
+    // next run.
+    ReadableSourceTarget<T> srcTarget = null;
+    if (outputTargets.containsKey(pcollection)) {
+      for (Target target : outputTargets.get(impl)) {
+        if (target instanceof ReadableSourceTarget) {
+          return (ReadableSourceTarget<T>) target;
+        }
+      }
+    }
+
+    // If we're not planning on materializing it already, create a temporary
+    // output to hold the materialized records and return that.
+    SourceTarget<T> st = createIntermediateOutput(pcollection.getPType());
+    if (!(st instanceof ReadableSourceTarget)) {
+      throw new IllegalArgumentException("The PType for the given PCollection is not readable"
+          + " and cannot be materialized");
+    } else {
+      srcTarget = (ReadableSourceTarget<T>) st;
+      addOutput(impl, srcTarget);
+      return srcTarget;
+    }
+  }
+
+  /**
+   * Safely cast a PCollection into a PCollectionImpl, including handling the case of
+   * UnionCollections.
+   * 
+   * @param pcollection The PCollection to be cast/transformed
+   * @return The PCollectionImpl representation
+   */
+  private <T> PCollectionImpl<T> toPcollectionImpl(PCollection<T> pcollection) {
+    PCollectionImpl<T> pcollectionImpl = null;
+    if (pcollection instanceof UnionCollection || pcollection instanceof UnionTable) {
+      pcollectionImpl = (PCollectionImpl<T>) pcollection.parallelDo("UnionCollectionWrapper",
+          (MapFn) IdentityFn.<Object> getInstance(), pcollection.getPType());
+    } else {
+      pcollectionImpl = (PCollectionImpl<T>) pcollection;
+    }
+    return pcollectionImpl;
+  }
+
+  public <T> SourceTarget<T> createIntermediateOutput(PType<T> ptype) {
+    return ptype.getDefaultFileSource(createTempPath());
+  }
+
+  public Path createTempPath() {
+    tempFileIndex++;
+    return new Path(tempDirectory, "p" + tempFileIndex);
+  }
+
+  private static Path createTempDirectory(Configuration conf) {
+    Path dir = createTemporaryPath(conf);
+    try {
+      dir.getFileSystem(conf).mkdirs(dir);
+    } catch (IOException e) {
+      throw new RuntimeException("Cannot create job output directory " + dir, e);
+    }
+    return dir;
+  }
+
+  private static Path createTemporaryPath(Configuration conf) {
+    String baseDir = conf.get(RuntimeParameters.TMP_DIR, "/tmp");
+    return new Path(baseDir, "crunch-" + (RANDOM.nextInt() & Integer.MAX_VALUE));
+  }
+
+  @Override
+  public <T> void writeTextFile(PCollection<T> pcollection, String pathName) {
+    pcollection.parallelDo("asText", new StringifyFn<T>(), Writables.strings())
+        .write(To.textFile(pathName));
+  }
+
+  private static class StringifyFn<T> extends MapFn<T, String> {
+    @Override
+    public String map(T input) {
+      return input.toString();
+    }
+  }
+  
+  private void cleanup() {
+    if (!outputTargets.isEmpty()) {
+      LOG.warn("Not running cleanup while output targets remain");
+      return;
+    }
+    try {
+      FileSystem fs = tempDirectory.getFileSystem(conf);
+      if (fs.exists(tempDirectory)) {
+        fs.delete(tempDirectory, true);
+      }
+    } catch (IOException e) {
+      LOG.info("Exception during cleanup", e);
+    }
+  }
+
+  public int getNextAnonymousStageId() {
+    return nextAnonymousStageId++;
+  }
+
+  @Override
+  public void enableDebug() {
+    // Turn on Crunch runtime error catching.
+    getConfiguration().setBoolean(RuntimeParameters.DEBUG, true);
+  }
+
+  @Override
+  public String getName() {
+    return name;
+  }
+}

http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch-core/src/main/java/org/apache/crunch/impl/mr/collect/DoCollectionImpl.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/impl/mr/collect/DoCollectionImpl.java b/crunch-core/src/main/java/org/apache/crunch/impl/mr/collect/DoCollectionImpl.java
new file mode 100644
index 0000000..7b8f2ea
--- /dev/null
+++ b/crunch-core/src/main/java/org/apache/crunch/impl/mr/collect/DoCollectionImpl.java
@@ -0,0 +1,74 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.impl.mr.collect;
+
+import java.util.List;
+import java.util.Set;
+
+import org.apache.crunch.DoFn;
+import org.apache.crunch.ParallelDoOptions;
+import org.apache.crunch.SourceTarget;
+import org.apache.crunch.impl.mr.plan.DoNode;
+import org.apache.crunch.types.PType;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableSet;
+
+public class DoCollectionImpl<S> extends PCollectionImpl<S> {
+
+  private final PCollectionImpl<Object> parent;
+  private final DoFn<Object, S> fn;
+  private final PType<S> ntype;
+
+  <T> DoCollectionImpl(String name, PCollectionImpl<T> parent, DoFn<T, S> fn, PType<S> ntype) {
+    this(name, parent, fn, ntype, ParallelDoOptions.builder().build());
+  }
+  
+  <T> DoCollectionImpl(String name, PCollectionImpl<T> parent, DoFn<T, S> fn, PType<S> ntype,
+      ParallelDoOptions options) {
+    super(name, options);
+    this.parent = (PCollectionImpl<Object>) parent;
+    this.fn = (DoFn<Object, S>) fn;
+    this.ntype = ntype;
+  }
+
+  @Override
+  protected long getSizeInternal() {
+    return (long) (fn.scaleFactor() * parent.getSize());
+  }
+
+  @Override
+  public PType<S> getPType() {
+    return ntype;
+  }
+
+  @Override
+  protected void acceptInternal(PCollectionImpl.Visitor visitor) {
+    visitor.visitDoFnCollection(this);
+  }
+
+  @Override
+  public List<PCollectionImpl<?>> getParents() {
+    return ImmutableList.<PCollectionImpl<?>> of(parent);
+  }
+
+  @Override
+  public DoNode createDoNode() {
+    return DoNode.createFnNode(getName(), fn, ntype);
+  }
+}

http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch-core/src/main/java/org/apache/crunch/impl/mr/collect/DoTableImpl.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/impl/mr/collect/DoTableImpl.java b/crunch-core/src/main/java/org/apache/crunch/impl/mr/collect/DoTableImpl.java
new file mode 100644
index 0000000..176643b
--- /dev/null
+++ b/crunch-core/src/main/java/org/apache/crunch/impl/mr/collect/DoTableImpl.java
@@ -0,0 +1,84 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.impl.mr.collect;
+
+import java.util.List;
+
+import org.apache.crunch.CombineFn;
+import org.apache.crunch.DoFn;
+import org.apache.crunch.PTable;
+import org.apache.crunch.Pair;
+import org.apache.crunch.ParallelDoOptions;
+import org.apache.crunch.impl.mr.plan.DoNode;
+import org.apache.crunch.types.PTableType;
+import org.apache.crunch.types.PType;
+
+import com.google.common.collect.ImmutableList;
+
+public class DoTableImpl<K, V> extends PTableBase<K, V> implements PTable<K, V> {
+
+  private final PCollectionImpl<?> parent;
+  private final DoFn<?, Pair<K, V>> fn;
+  private final PTableType<K, V> type;
+
+  <S> DoTableImpl(String name, PCollectionImpl<S> parent, DoFn<S, Pair<K, V>> fn, PTableType<K, V> ntype) {
+    this(name, parent, fn, ntype, ParallelDoOptions.builder().build());
+  }
+  
+  <S> DoTableImpl(String name, PCollectionImpl<S> parent, DoFn<S, Pair<K, V>> fn, PTableType<K, V> ntype,
+      ParallelDoOptions options) {
+    super(name, options);
+    this.parent = parent;
+    this.fn = fn;
+    this.type = ntype;
+  }
+
+  @Override
+  protected long getSizeInternal() {
+    return (long) (fn.scaleFactor() * parent.getSize());
+  }
+
+  @Override
+  public PTableType<K, V> getPTableType() {
+    return type;
+  }
+
+  @Override
+  protected void acceptInternal(PCollectionImpl.Visitor visitor) {
+    visitor.visitDoTable(this);
+  }
+
+  @Override
+  public PType<Pair<K, V>> getPType() {
+    return type;
+  }
+
+  @Override
+  public List<PCollectionImpl<?>> getParents() {
+    return ImmutableList.<PCollectionImpl<?>> of(parent);
+  }
+
+  @Override
+  public DoNode createDoNode() {
+    return DoNode.createFnNode(getName(), fn, type);
+  }
+
+  public boolean hasCombineFn() {
+    return fn instanceof CombineFn;
+  }
+}

http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch-core/src/main/java/org/apache/crunch/impl/mr/collect/InputCollection.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/impl/mr/collect/InputCollection.java b/crunch-core/src/main/java/org/apache/crunch/impl/mr/collect/InputCollection.java
new file mode 100644
index 0000000..ace5cc1
--- /dev/null
+++ b/crunch-core/src/main/java/org/apache/crunch/impl/mr/collect/InputCollection.java
@@ -0,0 +1,85 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.impl.mr.collect;
+
+import java.util.List;
+
+import org.apache.commons.lang.builder.HashCodeBuilder;
+import org.apache.crunch.Source;
+import org.apache.crunch.impl.mr.MRPipeline;
+import org.apache.crunch.impl.mr.plan.DoNode;
+import org.apache.crunch.types.PType;
+
+import com.google.common.collect.ImmutableList;
+
+public class InputCollection<S> extends PCollectionImpl<S> {
+
+  private final Source<S> source;
+
+  public InputCollection(Source<S> source, MRPipeline pipeline) {
+    super(source.toString());
+    this.source = source;
+    this.pipeline = pipeline;
+  }
+
+  @Override
+  public PType<S> getPType() {
+    return source.getType();
+  }
+
+  public Source<S> getSource() {
+    return source;
+  }
+
+  @Override
+  protected long getSizeInternal() {
+    long sz = source.getSize(pipeline.getConfiguration());
+    if (sz < 0) {
+      throw new IllegalStateException("Input source " + source + " does not exist!");
+    }
+    return sz;
+  }
+
+  @Override
+  protected void acceptInternal(PCollectionImpl.Visitor visitor) {
+    visitor.visitInputCollection(this);
+  }
+
+  @Override
+  public List<PCollectionImpl<?>> getParents() {
+    return ImmutableList.of();
+  }
+
+  @Override
+  public DoNode createDoNode() {
+    return DoNode.createInputNode(source);
+  }
+
+  @Override
+  public boolean equals(Object obj) {
+    if (obj == null || !(obj instanceof InputCollection)) {
+      return false;
+    }
+    return source.equals(((InputCollection) obj).source);
+  }
+
+  @Override
+  public int hashCode() {
+    return new HashCodeBuilder().append(source).toHashCode();
+  }
+}

http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch-core/src/main/java/org/apache/crunch/impl/mr/collect/InputTable.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/impl/mr/collect/InputTable.java b/crunch-core/src/main/java/org/apache/crunch/impl/mr/collect/InputTable.java
new file mode 100644
index 0000000..71f11c5
--- /dev/null
+++ b/crunch-core/src/main/java/org/apache/crunch/impl/mr/collect/InputTable.java
@@ -0,0 +1,86 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.impl.mr.collect;
+
+import java.util.List;
+
+import org.apache.crunch.Pair;
+import org.apache.crunch.TableSource;
+import org.apache.crunch.impl.mr.MRPipeline;
+import org.apache.crunch.impl.mr.plan.DoNode;
+import org.apache.crunch.types.PTableType;
+import org.apache.crunch.types.PType;
+
+import com.google.common.collect.ImmutableList;
+
+public class InputTable<K, V> extends PTableBase<K, V> {
+
+  private final TableSource<K, V> source;
+  private final InputCollection<Pair<K, V>> asCollection;
+
+  public InputTable(TableSource<K, V> source, MRPipeline pipeline) {
+    super(source.toString());
+    this.source = source;
+    this.pipeline = pipeline;
+    this.asCollection = new InputCollection<Pair<K, V>>(source, pipeline);
+  }
+
+  public TableSource<K, V> getSource() {
+    return source;
+  }
+  
+  @Override
+  protected long getSizeInternal() {
+    return asCollection.getSizeInternal();
+  }
+
+  @Override
+  public PTableType<K, V> getPTableType() {
+    return source.getTableType();
+  }
+
+  @Override
+  public PType<Pair<K, V>> getPType() {
+    return source.getType();
+  }
+
+  @Override
+  public List<PCollectionImpl<?>> getParents() {
+    return ImmutableList.of();
+  }
+
+  @Override
+  protected void acceptInternal(PCollectionImpl.Visitor visitor) {
+    visitor.visitInputCollection(asCollection);
+  }
+
+  @Override
+  public DoNode createDoNode() {
+    return DoNode.createInputNode(source);
+  }
+
+  @Override
+  public int hashCode() {
+    return asCollection.hashCode();
+  }
+
+  @Override
+  public boolean equals(Object other) {
+    return asCollection.equals(other);
+  }
+}