You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@crunch.apache.org by jw...@apache.org on 2012/08/08 02:47:06 UTC

[10/10] CRUNCH-32: Clean up namespaces.

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/dfd28922/scrunch/src/main/scala/org/apache/scrunch/PTable.scala
----------------------------------------------------------------------
diff --git a/scrunch/src/main/scala/org/apache/scrunch/PTable.scala b/scrunch/src/main/scala/org/apache/scrunch/PTable.scala
deleted file mode 100644
index 7f506e5..0000000
--- a/scrunch/src/main/scala/org/apache/scrunch/PTable.scala
+++ /dev/null
@@ -1,191 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.scrunch
-
-import java.util.{Collection => JCollect}
-
-import scala.collection.JavaConversions._
-
-import org.apache.crunch.{DoFn, Emitter, FilterFn, MapFn}
-import org.apache.crunch.{GroupingOptions, PTable => JTable, Pair => CPair}
-import org.apache.crunch.lib.{Join, Cartesian, Aggregate, Cogroup, PTables}
-import org.apache.scrunch.interpreter.InterpreterRunner
-
-class PTable[K, V](val native: JTable[K, V]) extends PCollectionLike[CPair[K, V], PTable[K, V], JTable[K, V]] {
-  import PTable._
-
-  def filter(f: (K, V) => Boolean): PTable[K, V] = {
-    parallelDo(filterFn[K, V](f), native.getPTableType())
-  }
-
-  def map[T, To](f: (K, V) => T)
-      (implicit pt: PTypeH[T], b: CanParallelTransform[T, To]): To = {
-    b(this, mapFn(f), pt.get(getTypeFamily()))
-  }
-
-  def mapValues[T](f: V => T)(implicit pt: PTypeH[T]) = {
-    val ptf = getTypeFamily()
-    val ptype = ptf.tableOf(native.getKeyType(), pt.get(ptf))
-    parallelDo(mapValuesFn[K, V, T](f), ptype)
-  }
-
-  def mapKeys[T](f: K => T)(implicit pt: PTypeH[T]) = {
-    val ptf = getTypeFamily()
-    val ptype = ptf.tableOf(pt.get(ptf), native.getValueType())
-    parallelDo(mapKeysFn[K, V, T](f), ptype)
-  }
-
-  def flatMap[T, To](f: (K, V) => Traversable[T])
-      (implicit pt: PTypeH[T], b: CanParallelTransform[T, To]): To = {
-    b(this, flatMapFn(f), pt.get(getTypeFamily()))
-  }
-
-  def union(others: PTable[K, V]*) = {
-    new PTable[K, V](native.union(others.map(_.native) : _*))
-  }
-
-  def keys() = new PCollection[K](PTables.keys(native))
-
-  def values() = new PCollection[V](PTables.values(native))
-
-  def cogroup[V2](other: PTable[K, V2]) = {
-    val jres = Cogroup.cogroup[K, V, V2](this.native, other.native)
-    val ptf = getTypeFamily()
-    val inter = new PTable[K, CPair[JCollect[V], JCollect[V2]]](jres)
-    inter.parallelDo(new SMapTableValuesFn[K, CPair[JCollect[V], JCollect[V2]], (Iterable[V], Iterable[V2])] {
-      def apply(x: CPair[JCollect[V], JCollect[V2]]) = {
-        (collectionAsScalaIterable[V](x.first()), collectionAsScalaIterable[V2](x.second()))
-      }
-    }, ptf.tableOf(keyType, ptf.tuple2(ptf.collections(valueType), ptf.collections(other.valueType))))
-  }
-
-  type JoinFn[V2] = (JTable[K, V], JTable[K, V2]) => JTable[K, CPair[V, V2]]
-
-  protected def join[V2](joinFn: JoinFn[V2], other: PTable[K, V2]): PTable[K, (V, V2)] = {
-    val jres = joinFn(this.native, other.native)
-    val ptf = getTypeFamily()
-    val ptype = ptf.tableOf(keyType, ptf.tuple2(valueType, other.valueType))
-    val inter = new PTable[K, CPair[V, V2]](jres)
-    inter.parallelDo(new SMapTableValuesFn[K, CPair[V, V2], (V, V2)] {
-      def apply(x: CPair[V, V2]) = (x.first(), x.second())
-    }, ptype)
-  }
-
-  def join[V2](other: PTable[K, V2]): PTable[K, (V, V2)] = {
-    innerJoin(other)
-  }
-
-  def innerJoin[V2](other: PTable[K, V2]): PTable[K, (V, V2)] = {
-    join[V2](Join.innerJoin[K, V, V2](_, _), other)
-  }
-
-  def leftJoin[V2](other: PTable[K, V2]): PTable[K, (V, V2)] = {
-    join[V2](Join.leftJoin[K, V, V2](_, _), other)
-  }
-
-  def rightJoin[V2](other: PTable[K, V2]): PTable[K, (V, V2)] = {
-    join[V2](Join.rightJoin[K, V, V2](_, _), other)
-  }
-
-  def fullJoin[V2](other: PTable[K, V2]): PTable[K, (V, V2)] = {
-    join[V2](Join.fullJoin[K, V, V2](_, _), other)
-  }
-
-  def cross[K2, V2](other: PTable[K2, V2]): PTable[(K, K2), (V, V2)] = {
-    val ptf = getTypeFamily()
-    val inter = new PTable(Cartesian.cross(this.native, other.native))
-    val f = (k: CPair[K,K2], v: CPair[V,V2]) => CPair.of((k.first(), k.second()), (v.first(), v.second()))
-    inter.parallelDo(mapFn(f), ptf.tableOf(ptf.tuple2(keyType, other.keyType), ptf.tuple2(valueType, other.valueType)))
-  }
-
-  def top(limit: Int, maximize: Boolean) = {
-    wrap(Aggregate.top(this.native, limit, maximize))
-  }
-
-  def groupByKey() = new PGroupedTable(native.groupByKey())
-
-  def groupByKey(partitions: Int) = new PGroupedTable(native.groupByKey(partitions))
-
-  def groupByKey(options: GroupingOptions) = new PGroupedTable(native.groupByKey(options))
-
-  def wrap(newNative: AnyRef) = {
-    new PTable[K, V](newNative.asInstanceOf[JTable[K, V]])
-  }
-
-  def unwrap(sc: PTable[K, V]): JTable[K, V] = sc.native
-
-  def materialize(): Iterable[(K, V)] = {
-    InterpreterRunner.addReplJarsToJob(native.getPipeline().getConfiguration())
-    native.materialize.view.map(x => (x.first, x.second))
-  }
-
-  def materializeToMap(): Map[K, V] = {
-    InterpreterRunner.addReplJarsToJob(native.getPipeline().getConfiguration())
-    native.materializeToMap().view.toMap
-  }
-
-  def keyType() = native.getPTableType().getKeyType()
-
-  def valueType() = native.getPTableType().getValueType()
-}
-
-trait SFilterTableFn[K, V] extends FilterFn[CPair[K, V]] with Function2[K, V, Boolean] {
-  override def accept(input: CPair[K, V]) = apply(input.first(), input.second())
-}
-
-trait SDoTableFn[K, V, T] extends DoFn[CPair[K, V], T] with Function2[K, V, Traversable[T]] {
-  override def process(input: CPair[K, V], emitter: Emitter[T]) {
-    for (v <- apply(input.first(), input.second())) {
-      emitter.emit(v)
-    }
-  }
-}
-
-trait SMapTableFn[K, V, T] extends MapFn[CPair[K, V], T] with Function2[K, V, T] {
-  override def map(input: CPair[K, V]) = apply(input.first(), input.second())
-}
-
-trait SMapTableValuesFn[K, V, T] extends MapFn[CPair[K, V], CPair[K, T]] with Function1[V, T] {
-  override def map(input: CPair[K, V]) = CPair.of(input.first(), apply(input.second()))
-}
-
-trait SMapTableKeysFn[K, V, T] extends MapFn[CPair[K, V], CPair[T, V]] with Function1[K, T] {
-  override def map(input: CPair[K, V]) = CPair.of(apply(input.first()), input.second())
-}
-
-object PTable {
-  def filterFn[K, V](fn: (K, V) => Boolean) = {
-    new SFilterTableFn[K, V] { def apply(k: K, v: V) = fn(k, v) }
-  }
-
-  def mapValuesFn[K, V, T](fn: V => T) = {
-    new SMapTableValuesFn[K, V, T] { def apply(v: V) = fn(v) }
-  }
-
-  def mapKeysFn[K, V, T](fn: K => T) = {
-    new SMapTableKeysFn[K, V, T] { def apply(k: K) = fn(k) }
-  }
-
-  def mapFn[K, V, T](fn: (K, V) => T) = {
-    new SMapTableFn[K, V, T] { def apply(k: K, v: V) = fn(k, v) }
-  }
-
-  def flatMapFn[K, V, T](fn: (K, V) => Traversable[T]) = {
-    new SDoTableFn[K, V, T] { def apply(k: K, v: V) = fn(k, v) }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/dfd28922/scrunch/src/main/scala/org/apache/scrunch/PTypeFamily.scala
----------------------------------------------------------------------
diff --git a/scrunch/src/main/scala/org/apache/scrunch/PTypeFamily.scala b/scrunch/src/main/scala/org/apache/scrunch/PTypeFamily.scala
deleted file mode 100644
index 1bd3db6..0000000
--- a/scrunch/src/main/scala/org/apache/scrunch/PTypeFamily.scala
+++ /dev/null
@@ -1,127 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.scrunch
-
-import org.apache.crunch.{Pair => CPair, Tuple3 => CTuple3, Tuple4 => CTuple4, MapFn}
-import org.apache.crunch.types.{PType, PTypeFamily => PTF}
-import org.apache.crunch.types.writable.WritableTypeFamily
-import org.apache.crunch.types.avro.{AvroTypeFamily, Avros => CAvros}
-import java.lang.{Long => JLong, Double => JDouble, Integer => JInt, Float => JFloat, Boolean => JBoolean}
-import java.util.{Collection => JCollection}
-import scala.collection.JavaConversions._
-
-class TMapFn[S, T](f: S => T) extends MapFn[S, T] {
-  override def map(input: S) = f(input)
-}
-
-trait PTypeFamily {
-
-  def ptf: PTF
-
-  val strings = ptf.strings()
-
-  val bytes = ptf.bytes()
-
-  def records[T: ClassManifest] = ptf.records(classManifest[T].erasure)
-
-  def derived[S, T](cls: java.lang.Class[T], in: S => T, out: T => S, pt: PType[S]) = {
-    ptf.derived(cls, new TMapFn[S, T](in), new TMapFn[T, S](out), pt)
-  }
-
-  val longs = {
-    val in = (x: JLong) => x.longValue()
-    val out = (x: Long) => new JLong(x)
-    derived(classOf[Long], in, out, ptf.longs())
-  }
-
-  val ints = {
-    val in = (x: JInt) => x.intValue()
-    val out = (x: Int) => new JInt(x)
-    derived(classOf[Int], in, out, ptf.ints())
-  }
-
-  val floats = {
-    val in = (x: JFloat) => x.floatValue()
-    val out = (x: Float) => new JFloat(x)
-    derived(classOf[Float], in, out, ptf.floats())
-  }
-
-  val doubles = {
-    val in = (x: JDouble) => x.doubleValue()
-    val out = (x: Double) => new JDouble(x)
-    derived(classOf[Double], in, out, ptf.doubles())
-  }
-
-  val booleans = {
-    val in = (x: JBoolean) => x.booleanValue()
-    val out = (x: Boolean) => new JBoolean(x)
-    derived(classOf[Boolean], in, out, ptf.booleans())
-  }
-
-  def collections[T](ptype: PType[T]) = {
-    derived(classOf[Iterable[T]], collectionAsScalaIterable[T], asJavaCollection[T], ptf.collections(ptype))
-  }
-
-  def maps[T](ptype: PType[T]) = {
-    derived(classOf[scala.collection.Map[String, T]], mapAsScalaMap[String, T], mapAsJavaMap[String, T], ptf.maps(ptype))
-  }
-
-  def lists[T](ptype: PType[T]) = {
-    val in = (x: JCollection[T]) => collectionAsScalaIterable[T](x).toList
-    val out = (x: List[T]) => asJavaCollection[T](x)
-    derived(classOf[List[T]], in, out, ptf.collections(ptype))
-  }
-
-  def sets[T](ptype: PType[T]) = {
-    val in = (x: JCollection[T]) => collectionAsScalaIterable[T](x).toSet
-    val out = (x: Set[T]) => asJavaCollection[T](x)
-    derived(classOf[Set[T]], in, out, ptf.collections(ptype))
-  }
-
-  def tuple2[T1, T2](p1: PType[T1], p2: PType[T2]) = {
-    val in = (x: CPair[T1, T2]) => (x.first(), x.second())
-    val out = (x: (T1, T2)) => CPair.of(x._1, x._2)
-    derived(classOf[(T1, T2)], in, out, ptf.pairs(p1, p2))
-  }
-
-  def tuple3[T1, T2, T3](p1: PType[T1], p2: PType[T2], p3: PType[T3]) = {
-    val in = (x: CTuple3[T1, T2, T3]) => (x.first(), x.second(), x.third())
-    val out = (x: (T1, T2, T3)) => CTuple3.of(x._1, x._2, x._3)
-    derived(classOf[(T1, T2, T3)], in, out, ptf.triples(p1, p2, p3))
-  }
-
-  def tuple4[T1, T2, T3, T4](p1: PType[T1], p2: PType[T2], p3: PType[T3], p4: PType[T4]) = {
-    val in = (x: CTuple4[T1, T2, T3, T4]) => (x.first(), x.second(), x.third(), x.fourth())
-    val out = (x: (T1, T2, T3, T4)) => CTuple4.of(x._1, x._2, x._3, x._4)
-    derived(classOf[(T1, T2, T3, T4)], in, out, ptf.quads(p1, p2, p3, p4))
-  }
-
-  def tableOf[K, V](keyType: PType[K], valueType: PType[V]) = ptf.tableOf(keyType, valueType)
-}
-
-object Writables extends PTypeFamily {
-  override def ptf = WritableTypeFamily.getInstance()
-}
-
-object Avros extends PTypeFamily {
-  override def ptf = AvroTypeFamily.getInstance()
-
-  CAvros.REFLECT_DATA_FACTORY = new ScalaReflectDataFactory()
-
-  def reflects[T: ClassManifest]() = CAvros.reflects(classManifest[T].erasure)
-}

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/dfd28922/scrunch/src/main/scala/org/apache/scrunch/Pipeline.scala
----------------------------------------------------------------------
diff --git a/scrunch/src/main/scala/org/apache/scrunch/Pipeline.scala b/scrunch/src/main/scala/org/apache/scrunch/Pipeline.scala
deleted file mode 100644
index fa13d3a..0000000
--- a/scrunch/src/main/scala/org/apache/scrunch/Pipeline.scala
+++ /dev/null
@@ -1,164 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.scrunch
-
-import java.io.File
-
-import org.apache.hadoop.conf.Configuration
-import org.slf4j.LoggerFactory
-
-import org.apache.crunch.{Pipeline => JPipeline}
-import org.apache.crunch.impl.mem.MemPipeline
-import org.apache.crunch.impl.mr.MRPipeline
-import org.apache.crunch.util.DistCache
-import org.apache.scrunch.interpreter.InterpreterRunner
-
-/**
- * Manages the state of a pipeline execution.
- *
- * ==Overview==
- * There are two subtypes of [[org.apache.crunch.Pipeline]]:
- * [[org.apache.crunch.Pipeline#MapReduce]] - for jobs run on a Hadoop cluster.
- * [[org.apache.crunch.Pipeline#InMemory]] - for jobs run in memory.
- *
- * To create a Hadoop pipeline:
- * {{{
- * import org.apache.scrunch.Pipeline
- *
- * Pipeline.mapreduce[MyClass]
- * }}}
- *
- * To get an in memory pipeline:
- * {{{
- * import org.apache.scrunch.Pipeline
- *
- * Pipeline.inMemory
- * }}}
- */
-class Pipeline(val jpipeline: JPipeline) extends PipelineLike {
-  /**
-   * A convenience method for reading a text file.
-   *
-   * @param pathName Path to desired text file.
-   * @return A PCollection containing the lines in the specified file.
-   */
-  def readTextFile(pathName: String): PCollection[String] = {
-    new PCollection[String](jpipeline.readTextFile(pathName))
-  }
-
-  /**
-   * A convenience method for writing a text file.
-   *
-   * @param pcollect A PCollection to write to text.
-   * @param pathName Path to desired output text file.
-   */
-  def writeTextFile[T](pcollect: PCollection[T], pathName: String) {
-    jpipeline.writeTextFile(pcollect.native, pathName)
-  }
-}
-
-/**
- * Companion object. Contains subclasses of Pipeline.
- */
-object Pipeline {
-  val log = LoggerFactory.getLogger(classOf[Pipeline])
-
-  /**
-   * Pipeline for running jobs on a hadoop cluster.
-   *
-   * @param clazz Type of the class using the pipeline.
-   * @param configuration Hadoop configuration to use.
-   */
-  class MapReducePipeline (clazz: Class[_], configuration: Configuration) extends Pipeline(
-      {
-        // Attempt to add all jars in the Scrunch distribution lib directory to the job that will
-        // be run.
-        val jarPath = DistCache.findContainingJar(classOf[org.apache.scrunch.Pipeline])
-        if (jarPath != null) {
-          val scrunchJarFile = new File(jarPath)
-          DistCache.addJarDirToDistributedCache(configuration, scrunchJarFile.getParent())
-        } else {
-          log.warn("Could not locate Scrunch jar file, so could not add Scrunch jars to the " +
-              "job(s) about to be run.")
-        }
-        if (InterpreterRunner.repl == null) {
-          new MRPipeline(clazz, configuration)
-        } else {
-          // We're running in the REPL, so we'll use the crunch jar as the job jar.
-          new MRPipeline(classOf[org.apache.scrunch.Pipeline], configuration)
-        }
-      })
-
-  /**
-   * Pipeline for running jobs in memory.
-   */
-  object InMemoryPipeline extends Pipeline(MemPipeline.getInstance())
-
-  /**
-   * Creates a pipeline for running jobs on a hadoop cluster using the default configuration.
-   *
-   * @param clazz Type of the class using the pipeline.
-   */
-  def mapReduce(clazz: Class[_]): MapReducePipeline = mapReduce(clazz, new Configuration())
-
-  /**
-   * Creates a pipeline for running jobs on a hadoop cluster.
-   *
-   * @param clazz Type of the class using the pipeline.
-   * @param configuration Hadoop configuration to use.
-   */
-  def mapReduce(clazz: Class[_], configuration: Configuration): MapReducePipeline = {
-    new MapReducePipeline(clazz, configuration)
-  }
-
-  /**
-   * Creates a pipeline for running jobs on a hadoop cluster using the default configuration.
-   *
-   * @tparam T Type of the class using the pipeline.
-   */
-  def mapReduce[T : ClassManifest]: MapReducePipeline = mapReduce[T](new Configuration())
-
-  /**
-   * Creates a pipeline for running jobs on a hadoop cluster.
-   *
-   * @param configuration Hadoop configuration to use.
-   * @tparam T Type of the class using the pipeline.
-   */
-  def mapReduce[T : ClassManifest](configuration: Configuration): MapReducePipeline = {
-    new MapReducePipeline(implicitly[ClassManifest[T]].erasure, configuration)
-  }
-
-  /**
-   * Gets a pipeline for running jobs in memory.
-   */
-  def inMemory: InMemoryPipeline.type = InMemoryPipeline
-
-  /**
-   * Creates a new Pipeline according to the provided specifications.
-   *
-   * @param configuration Configuration for connecting to a Hadoop cluster.
-   * @param memory Option specifying whether or not the pipeline is an in memory or mapreduce pipeline.
-   * @param manifest ClassManifest for the class using the pipeline.
-   * @tparam T type of the class using the pipeline.
-   * @deprecated Use either {{{Pipeline.mapReduce(class, conf)}}} or {{{Pipeline.inMemory}}}
-   */
-  def apply[T](
-    configuration: Configuration = new Configuration(),
-    memory: Boolean = false)(implicit manifest: ClassManifest[T]
-  ): Pipeline = if (memory) inMemory else mapReduce(manifest.erasure, configuration)
-}

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/dfd28922/scrunch/src/main/scala/org/apache/scrunch/PipelineApp.scala
----------------------------------------------------------------------
diff --git a/scrunch/src/main/scala/org/apache/scrunch/PipelineApp.scala b/scrunch/src/main/scala/org/apache/scrunch/PipelineApp.scala
deleted file mode 100644
index b427f20..0000000
--- a/scrunch/src/main/scala/org/apache/scrunch/PipelineApp.scala
+++ /dev/null
@@ -1,64 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.scrunch
-
-import java.io.Serializable
-
-import scala.collection.mutable.ListBuffer
-
-import org.apache.hadoop.conf.Configuration
-import org.apache.hadoop.fs.{FileSystem, Path}
-import org.apache.hadoop.util.GenericOptionsParser
-
-import org.apache.crunch.{Source, TableSource, Target}
-
-trait PipelineApp extends MREmbeddedPipeline with PipelineHelper with DelayedInit {
-  implicit def _string2path(str: String) = new Path(str)
-
-  /** Contains factory methods used to create `Source`s. */
-  val from = From
-
-  /** Contains factory methods used to create `Target`s. */
-  val to = To
-
-  /** Contains factory methods used to create `SourceTarget`s. */
-  val at = At
-
-  private val initCode = new ListBuffer[() => Unit]
-
-  private var _args: Array[String] = _
-
-  /** Command-line arguments passed to this application. */
-  protected def args: Array[String] = _args
-
-  def configuration: Configuration = pipeline.getConfiguration
-
-  /** Gets the distributed filesystem associated with this application's configuration. */
-  def fs: FileSystem = FileSystem.get(configuration)
-
-  override def delayedInit(body: => Unit) {
-    initCode += (() => body)
-  }
-
-  def main(args: Array[String]) = {
-    val parser = new GenericOptionsParser(configuration, args)
-    _args = parser.getRemainingArgs()
-    for (proc <- initCode) proc()
-    done
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/dfd28922/scrunch/src/main/scala/org/apache/scrunch/PipelineHelper.scala
----------------------------------------------------------------------
diff --git a/scrunch/src/main/scala/org/apache/scrunch/PipelineHelper.scala b/scrunch/src/main/scala/org/apache/scrunch/PipelineHelper.scala
deleted file mode 100644
index cdeb37b..0000000
--- a/scrunch/src/main/scala/org/apache/scrunch/PipelineHelper.scala
+++ /dev/null
@@ -1,74 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.scrunch
-
-/**
- * This trait provides convenience methods for building pipelines.
- */
-trait PipelineHelper {
-  /**
-   * Materializes the specified PCollection and displays its contents.
-   */
-  def dump(data: PCollection[_]) {
-    data.materialize.foreach(println(_))
-  }
-
-  /**
-   * Materializes the specified PTable and displays its contents.
-   */
-  def dump(data: PTable[_, _]) {
-    data.materialize.foreach(println(_))
-  }
-
-  /**
-   * Performs a cogroup on the two specified PTables.
-   */
-  def cogroup[K : PTypeH, V1 : PTypeH, V2 : PTypeH](t1: PTable[K, V1], t2: PTable[K, V2])
-      : PTable[K, (Iterable[V1], Iterable[V2])] = {
-    t1.cogroup(t2)
-  }
-
-  /**
-   * Performs an innerjoin on the two specified PTables.
-   */
-  def join[K : PTypeH, V1 : PTypeH, V2 : PTypeH](t1: PTable[K, V1], t2: PTable[K, V2])
-      : PTable[K, (V1, V2)] = {
-    t1.join(t2)
-  }
-
-  /**
-   * Unions the specified PCollections.
-   */
-  def union[T](first: PCollection[T], others: PCollection[T]*)
-      : PCollection[T] = {
-    first.union(others: _*)
-  }
-
-  /**
-   * Unions the specified PTables.
-   */
-  def union[K, V](first: PTable[K, V], others: PTable[K, V]*)
-      : PTable[K, V] = {
-    first.union(others: _*)
-  }
-}
-
-/**
- * Companion object containing convenience methods for building pipelines.
- */
-object PipelineHelper extends PipelineHelper

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/dfd28922/scrunch/src/main/scala/org/apache/scrunch/PipelineLike.scala
----------------------------------------------------------------------
diff --git a/scrunch/src/main/scala/org/apache/scrunch/PipelineLike.scala b/scrunch/src/main/scala/org/apache/scrunch/PipelineLike.scala
deleted file mode 100644
index 92dbaf3..0000000
--- a/scrunch/src/main/scala/org/apache/scrunch/PipelineLike.scala
+++ /dev/null
@@ -1,91 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.scrunch
-
-import org.apache.hadoop.conf.Configuration
-
-import org.apache.crunch.{Pipeline => JPipeline}
-import org.apache.crunch.Source
-import org.apache.crunch.TableSource
-import org.apache.crunch.Target
-import org.apache.scrunch.interpreter.InterpreterRunner
-
-trait PipelineLike {
-  def jpipeline: JPipeline
-
-  /**
-   * Gets the configuration object associated with this pipeline.
-   */
-  def getConfiguration(): Configuration = jpipeline.getConfiguration()
-
-  /**
-   * Reads a source into a [[org.apache.scrunch.PCollection]]
-   *
-   * @param source The source to read from.
-   * @tparam T The type of the values being read.
-   * @return A PCollection containing data read from the specified source.
-   */
-  def read[T](source: Source[T]): PCollection[T] = new PCollection(jpipeline.read(source))
-
-  /**
-   * Reads a source into a [[org.apache.scrunch.PTable]]
-   *
-   * @param source The source to read from.
-   * @tparam K The type of the keys being read.
-   * @tparam V The type of the values being read.
-   * @return A PCollection containing data read from the specified source.
-   */
-  def read[K, V](source: TableSource[K, V]): PTable[K, V] = new PTable(jpipeline.read(source))
-
-  /**
-   * Writes a parallel collection to a target.
-   *
-   * @param collection The collection to write.
-   * @param target The destination target for this write.
-   */
-  def write(collection: PCollection[_], target: Target): Unit = jpipeline.write(collection.native, target)
-
-  /**
-   * Writes a parallel table to a target.
-   *
-   * @param table The table to write.
-   * @param target The destination target for this write.
-   */
-  def write(table: PTable[_, _], target: Target): Unit = jpipeline.write(table.native, target)
-
-  /**
-   * Constructs and executes a series of MapReduce jobs in order
-   * to write data to the output targets.
-   */
-  def run(): Unit = {
-    InterpreterRunner.addReplJarsToJob(getConfiguration())
-    jpipeline.run()
-  }
-
-  /**
-   * Run any remaining jobs required to generate outputs and then
-   * clean up any intermediate data files that were created in
-   * this run or previous calls to `run`.
-   */
-  def done(): Unit = jpipeline.done()
-
-  /**
-   * Turn on debug logging for jobs that are run from this pipeline.
-   */
-  def debug(): Unit = jpipeline.enableDebug()
-}

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/dfd28922/scrunch/src/main/scala/org/apache/scrunch/interpreter/InterpreterRunner.scala
----------------------------------------------------------------------
diff --git a/scrunch/src/main/scala/org/apache/scrunch/interpreter/InterpreterRunner.scala b/scrunch/src/main/scala/org/apache/scrunch/interpreter/InterpreterRunner.scala
deleted file mode 100644
index e37a0c7..0000000
--- a/scrunch/src/main/scala/org/apache/scrunch/interpreter/InterpreterRunner.scala
+++ /dev/null
@@ -1,208 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.scrunch.interpreter
-
-import java.io.File
-import java.io.FileOutputStream
-import java.util.jar.JarEntry
-import java.util.jar.JarOutputStream
-
-import scala.tools.nsc.GenericRunnerCommand
-import scala.tools.nsc.Global
-import scala.tools.nsc.MainGenericRunner
-import scala.tools.nsc.ObjectRunner
-import scala.tools.nsc.Properties
-import scala.tools.nsc.ScriptRunner
-import scala.tools.nsc.interpreter.ILoop
-import scala.tools.nsc.io.Jar
-import scala.tools.nsc.io.VirtualDirectory
-
-import com.google.common.io.Files
-import org.apache.commons.io.IOUtils
-import org.apache.hadoop.conf.Configuration
-
-import org.apache.crunch.util.DistCache
-
-/**
- * An object used to run a Scala REPL with modifications to facilitate Scrunch jobs running
- * within the REPL.
- */
-object InterpreterRunner extends MainGenericRunner {
-
-  // The actual Scala repl.
-  var repl: ILoop = null
-
-  /**
-   * Checks whether or not the Scala repl has been started.
-   *
-   * @return <code>true</code> if the repl is running, <code>false</code> otherwise.
-   */
-  def isReplRunning() = repl == null
-
-  /**
-   * The main entry point for the REPL.  This method is lifted from
-   * {@link scala.tools.nsc.MainGenericRunner} and modified to facilitate testing whether or not
-   * the REPL is actually running.
-   *
-   * @param args Arguments used on the command line to start the REPL.
-   * @return <code>true</code> if execution was successful, <code>false</code> otherwise.
-   */
-  override def process(args: Array[String]): Boolean = {
-    val command = new GenericRunnerCommand(args.toList, (x: String) => errorFn(x))
-    import command.{settings, howToRun, thingToRun}
-    // Defines a nested function to retrieve a sample compiler if necessary.
-    def sampleCompiler = new Global(settings)
-
-    import Properties.{versionString, copyrightString}
-    if (!command.ok) {
-      return errorFn("\n" + command.shortUsageMsg)
-    } else if (settings.version.value) {
-      return errorFn("Scala code runner %s -- %s".format(versionString, copyrightString))
-    } else if (command.shouldStopWithInfo) {
-      return errorFn(command getInfoMessage sampleCompiler)
-    }
-
-    // Functions to retrieve settings values that were passed to REPL invocation.
-    // The -e argument provides a Scala statement to execute.
-    // The -i option requests that a file be preloaded into the interactive shell.
-    def isE = !settings.execute.isDefault
-    def dashe = settings.execute.value
-    def isI = !settings.loadfiles.isDefault
-    def dashi = settings.loadfiles.value
-
-    // Function to retrieve code passed by -e and -i options to REPL.
-    def combinedCode = {
-      val files = if (isI) dashi map (file => scala.tools.nsc.io.File(file).slurp()) else Nil
-      val str = if (isE) List(dashe) else Nil
-      files ++ str mkString "\n\n"
-    }
-
-    import GenericRunnerCommand._
-
-    // Function for running the target command. It can run an object with main, a script, or
-    // an interactive REPL.
-    def runTarget(): Either[Throwable, Boolean] = howToRun match {
-      case AsObject =>
-        ObjectRunner.runAndCatch(settings.classpathURLs, thingToRun, command.arguments)
-      case AsScript =>
-        ScriptRunner.runScriptAndCatch(settings, thingToRun, command.arguments)
-      case AsJar =>
-        ObjectRunner.runAndCatch(
-          scala.tools.nsc.io.File(thingToRun).toURL +: settings.classpathURLs,
-          new Jar(thingToRun).mainClass getOrElse sys.error("Cannot find main class for jar: " +
-            thingToRun),
-          command.arguments
-        )
-      case Error =>
-        Right(false)
-      case _ =>
-        // We start the shell when no arguments are given.
-        repl = new ILoop
-        Right(repl.process(settings))
-    }
-
-    /**If -e and -i were both given, we want to execute the -e code after the
-     *  -i files have been included, so they are read into strings and prepended to
-     *  the code given in -e.  The -i option is documented to only make sense
-     *  interactively so this is a pretty reasonable assumption.
-     *
-     *  This all needs a rewrite though.
-     */
-    if (isE) {
-      ScriptRunner.runCommand(settings, combinedCode, thingToRun +: command.arguments)
-    }
-    else runTarget() match {
-      case Left(ex) => errorFn(ex)
-      case Right(b) => b
-    }
-  }
-
-  def main(args: Array[String]) {
-    val retVal = process(args)
-    if (!retVal)
-      sys.exit(1)
-  }
-
-  /**
-   * Creates a jar file containing the code thus far compiled by the REPL in a temporary directory.
-   *
-   * @return A file object representing the jar file created.
-   */
-  def createReplCodeJar(): File = {
-    var jarStream: JarOutputStream = null
-    try {
-      val virtualDirectory = repl.virtualDirectory
-      val tempDir = Files.createTempDir()
-      val tempJar = new File(tempDir, "replJar.jar")
-      jarStream = new JarOutputStream(new FileOutputStream(tempJar))
-      addVirtualDirectoryToJar(virtualDirectory, "", jarStream)
-      return tempJar
-    } finally {
-      IOUtils.closeQuietly(jarStream)
-    }
-  }
-
-  /**
-   * Add the contents of the specified virtual directory to a jar. This method will recursively
-   * descend into subdirectories to add their contents.
-   *
-   * @param dir The virtual directory whose contents should be added.
-   * @param entryPath The entry path for classes found in the virtual directory.
-   * @param jarStream An output stream for writing the jar file.
-   */
-  def addVirtualDirectoryToJar(dir: VirtualDirectory, entryPath: String, jarStream:
-      JarOutputStream): Unit = {
-    dir.foreach { file =>
-      if (file.isDirectory) {
-        // Recursively descend into subdirectories, adjusting the package name as we do.
-        val dirPath = entryPath + file.name + "/"
-        val entry: JarEntry = new JarEntry(dirPath)
-        jarStream.putNextEntry(entry)
-        jarStream.closeEntry()
-        addVirtualDirectoryToJar(file.asInstanceOf[VirtualDirectory],
-            dirPath, jarStream)
-      } else if (file.hasExtension("class")) {
-        // Add class files as an entry in the jar file and write the class to the jar.
-        val entry: JarEntry = new JarEntry(entryPath + file.name)
-        jarStream.putNextEntry(entry)
-        jarStream.write(file.toByteArray)
-        jarStream.closeEntry()
-      }
-    }
-  }
-
-  /**
-   * Generates a jar containing the code thus far compiled by the REPL,
-   * and adds that jar file to the distributed cache of jobs using the specified configuration.
-   * Also adds any jars added with the :cp command to the user's job.
-   *
-   * @param configuration The configuration of jobs that should use the REPL code jar.
-   */
-  def addReplJarsToJob(configuration: Configuration): Unit = {
-    if (repl != null) {
-      // Generate a jar of REPL code and add to the distributed cache.
-      val replJarFile = createReplCodeJar()
-      DistCache.addJarToDistributedCache(configuration, replJarFile)
-      // Get the paths to jars added with the :cp command.
-      val addedJarPaths = repl.addedClasspath.split(':')
-      addedJarPaths.foreach {
-        path => if (path.endsWith(".jar")) DistCache.addJarToDistributedCache(configuration, path)
-      }
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/dfd28922/scrunch/src/main/scripts/imports.scala
----------------------------------------------------------------------
diff --git a/scrunch/src/main/scripts/imports.scala b/scrunch/src/main/scripts/imports.scala
deleted file mode 100644
index 64d7149..0000000
--- a/scrunch/src/main/scripts/imports.scala
+++ /dev/null
@@ -1,19 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-import org.apache.scrunch._
-

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/dfd28922/scrunch/src/main/scripts/scrunch
----------------------------------------------------------------------
diff --git a/scrunch/src/main/scripts/scrunch b/scrunch/src/main/scripts/scrunch
deleted file mode 100755
index 44cb6fb..0000000
--- a/scrunch/src/main/scripts/scrunch
+++ /dev/null
@@ -1,163 +0,0 @@
-#!/bin/bash --posix
-#
-##############################################################################
-# Copyright 2002-2011, LAMP/EPFL
-#
-# This is free software; see the distribution for copying conditions.
-# There is NO warranty; not even for MERCHANTABILITY or FITNESS FOR A
-# PARTICULAR PURPOSE.
-##############################################################################
-
-# Identify the bin dir in the distribution from which this script is running.
-bin=`dirname $0`
-bin=`cd ${bin} && pwd`
-
-# Set the directory where libraries for scrunch shell live.
-SCRUNCH_LIB_DIR="${bin}/../lib"
-# Set the conf directory for the scrunch distribution.
-SCRUNCH_CONF_DIR="${bin}/../conf"
-# Set the main class used to run scrunch shell.
-MAIN_CLASS="org.apache.scrunch.interpreter.InterpreterRunner"
-
-# Not sure what the right default is here: trying nonzero.
-scala_exit_status=127
-saved_stty=""
-
-# restore stty settings (echo in particular)
-function restoreSttySettings() {
-  if [[ -n $SCALA_RUNNER_DEBUG ]]; then
-    echo "restoring stty: $saved_stty"
-  fi
-
-  stty $saved_stty
-  saved_stty=""
-}
-
-function onExit() {
-  if [[ "$saved_stty" != "" ]]; then
-    restoreSttySettings
-    exit $scala_exit_status
-  fi
-}
-
-# to reenable echo if we are interrupted before completing.
-trap onExit INT
-
-# save terminal settings
-saved_stty=$(stty -g 2>/dev/null)
-# clear on error so we don't later try to restore them
-if [[ ! $? ]]; then
-  saved_stty=""
-fi
-if [[ -n $SCALA_RUNNER_DEBUG ]]; then
-  echo "saved stty: $saved_stty"
-fi
-
-cygwin=false;
-case "`uname`" in
-    CYGWIN*) cygwin=true ;;
-esac
-
-# Constructing scrunch shell classpath.
-SCRUNCH_SHELL_CLASSPATH=""
-# Add files in conf dir.
-for ext in "$SCRUNCH_CONF_DIR"/* ; do
-    if [ -z "$SCRUNCH_SHELL_CLASSPATH" ] ; then
-        SCRUNCH_SHELL_CLASSPATH="$ext"
-    else
-        SCRUNCH_SHELL_CLASSPATH="$SCRUNCH_SHELL_CLASSPATH:$ext"
-    fi
-done
-# Add files in lib dir.
-for ext in "$SCRUNCH_LIB_DIR"/*.jar ; do
-    if [ -z "$SCRUNCH_SHELL_CLASSPATH" ] ; then
-        SCRUNCH_SHELL_CLASSPATH="$ext"
-    else
-        SCRUNCH_SHELL_CLASSPATH="$SCRUNCH_SHELL_CLASSPATH:$ext"
-    fi
-done
-
-# Constructing Hadoop classpath.
-if [ -z "$HADOOP_HOME" ]; then
-    echo "HADOOP_HOME must be set to run the Scrunch shell."
-    exit 1
-fi
-HADOOP_CLASSPATH=`$HADOOP_HOME/bin/hadoop classpath`
-
-CYGWIN_JLINE_TERMINAL=
-if $cygwin; then
-    if [ "$OS" = "Windows_NT" ] && cygpath -m .>/dev/null 2>/dev/null ; then
-        format=mixed
-    else
-        format=windows
-    fi
-    SCRUNCH_SHELL_CLASSPATH=`cygpath --path --$format "$SCRUNCH_SHELL_CLASSPATH"`
-    case "$TERM" in
-        rxvt* | xterm*)
-            stty -icanon min 1 -echo
-            CYGWIN_JLINE_TERMINAL="-Djline.terminal=scala.tools.jline.UnixTerminal"
-        ;;
-    esac
-fi
-
-[ -n "$JAVA_OPTS" ] || JAVA_OPTS="-Xmx256M -Xms32M"
-
-# break out -D and -J options and add them to JAVA_OPTS as well
-# so they reach the underlying JVM in time to do some good.  The
-# -D options will be available as system properties.
-declare -a java_args
-declare -a scala_args
-
-# Don't use the bootstrap classloader.
-CPSELECT="-classpath "
-
-while [ $# -gt 0 ]; do
-  case "$1" in
-    -D*)
-      # pass to scala as well: otherwise we lose it sometimes when we
-      # need it, e.g. communicating with a server compiler.
-      java_args=("${java_args[@]}" "$1")
-      scala_args=("${scala_args[@]}" "$1")
-      shift
-      ;;
-    -J*)
-      # as with -D, pass to scala even though it will almost
-      # never be used.
-      java_args=("${java_args[@]}" "${1:2}")
-      scala_args=("${scala_args[@]}" "$1")
-      shift
-      ;;
-    -toolcp)
-      TOOL_CLASSPATH="$TOOL_CLASSPATH:$2"
-      shift 2
-      ;;
-    *)
-      scala_args=("${scala_args[@]}" "$1")
-      shift
-      ;;
-  esac
-done
-
-# reset "$@" to the remaining args
-set -- "${scala_args[@]}"
-
-if [ -z "$JAVACMD" -a -n "$JAVA_HOME" -a -x "$JAVA_HOME/bin/java" ]; then
-    JAVACMD="$JAVA_HOME/bin/java"
-fi
-
-"${JAVACMD:=java}" \
-  $JAVA_OPTS \
-  "${java_args[@]}" \
-  ${CPSELECT}${TOOL_CLASSPATH}":"${SCRUNCH_SHELL_CLASSPATH}":"${HADOOP_CLASSPATH} \
-  -Dscala.usejavacp=true \
-  -Denv.emacs="$EMACS" \
-  $CYGWIN_JLINE_TERMINAL \
-  $MAIN_CLASS  "$@" \
-  -i ${bin}/imports.scala \
-  -Yrepl-sync
-# The -Yrepl-sync option is a fix for the 2.9.1 REPL. This should probably not be necessary in the future.
-
-# record the exit status lest it be overwritten:
-# then reenable echo and propagate the code.
-scala_exit_status=$?
-onExit

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/dfd28922/scrunch/src/main/scripts/scrunch-job.py
----------------------------------------------------------------------
diff --git a/scrunch/src/main/scripts/scrunch-job.py b/scrunch/src/main/scripts/scrunch-job.py
deleted file mode 100755
index 2a61b3b..0000000
--- a/scrunch/src/main/scripts/scrunch-job.py
+++ /dev/null
@@ -1,133 +0,0 @@
-#!/usr/bin/python
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements.  See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership.  The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License.  You may obtain a copy of the License at
-#
-#     http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-import glob
-import os
-import re
-import shutil
-import subprocess
-import sys
-
-# Configuration in script
-##############################################################
-if not "SCALA_HOME" in os.environ:
-  sys.stderr.write("Environment variable SCALA_HOME must be set\n")
-  sys.exit(1)
-SCALA_LIB = os.path.join(os.environ["SCALA_HOME"], "lib")
-
-if not "HADOOP_HOME" in os.environ:
-  sys.stderr.write("Environment variable HADOOP_HOME must be set\n")
-  sys.exit(1)
-HADOOP_HOME = os.environ["HADOOP_HOME"]
-HADOOP_JARS = ":".join(glob.glob(os.path.join(HADOOP_HOME, "*.jar")))
-
-#Get the absolute path of the original (non-symlink) file.
-if os.path.islink(__file__):
-  ORIGINAL_FILE = os.readlink(__file__)
-else:
-  ORIGINAL_FILE = __file__
-
-DIST_ROOT = os.path.abspath(os.path.dirname(ORIGINAL_FILE)+"/../")
-LIB_DIR = DIST_ROOT + "/lib" # Dir with all scrunch dependencies.
-TMPDIR = "/tmp"
-BUILDDIR = TMPDIR + "/script-build"
-COMPILE_CMD = "java -cp %s/scala-library.jar:%s/scala-compiler.jar -Dscala.home=%s scala.tools.nsc.Main" % (SCALA_LIB, SCALA_LIB, SCALA_LIB)
-##############################################################
-
-argv = sys.argv[1:]
-if len(argv) < 1:
-  sys.stderr.write("ERROR: insufficient args.\n")
-  sys.exit(1)
-
-JOBFILE = argv.pop(0)
-
-def file_type():
-  m = re.search(r'\.(scala|java)$', JOBFILE)
-  if m:
-    return m.group(1)
-  return None
-
-def is_file():
-  return file_type() is not None
-
-PACK_RE = r'package ([^;]+)'
-OBJECT_RE = r'object\s+([^\s(]+).*(extends|with)\s+PipelineApp.*'
-EXTENSION_RE = r'(.*)\.(scala|java)$'
-
-#Get the name of the job from the file.
-#the rule is: last class in the file, or the one that matches the filename
-def get_job_name(file):
-  package = ""
-  job = None
-  default = None
-  match = re.search(EXTENSION_RE, file)
-  if match:
-    default = match.group(1)
-    for s in open(file, "r"):
-      mp = re.search(PACK_RE, s)
-      mo = re.search(OBJECT_RE, s)
-      if mp:
-        package = mp.group(1).trim() + "."
-      elif mo:
-        if not job or not default or not job.tolower() == default.tolower():
-          #use either the last class, or the one with the same name as the file
-          job = mo.group(1)
-    if not job:
-      raise "Could not find job name"
-    return "%s%s" % (package, job)
-  else:
-    return file
-
-LIB_PATH = os.path.abspath(LIB_DIR)
-if not os.path.exists(LIB_PATH):
-  sys.stderr.write("Scrunch distribution lib directory not found; run mvn package to construct a distribution to run examples from.\n")
-  sys.exit(1)
-LIB_JARS = glob.glob(os.path.join(LIB_PATH, "*.jar"))
-LIB_CP = ":".join(LIB_JARS)
-
-JOBPATH = os.path.abspath(JOBFILE)
-JOB = get_job_name(JOBFILE)
-JOBJAR = JOB + ".jar"
-JOBJARPATH = os.path.join(TMPDIR, JOBJAR)
-
-def needs_rebuild():
-  return not os.path.exists(JOBJARPATH) or os.stat(JOBJARPATH).st_mtime < os.stat(JOBPATH).st_mtime
-
-def build_job_jar():
-  sys.stderr.write("compiling " + JOBFILE + "\n")
-  if os.path.exists(BUILDDIR):
-    shutil.rmtree(BUILDDIR)
-  os.makedirs(BUILDDIR)
-  cmd = "%s -classpath %s:%s -d %s %s" % (COMPILE_CMD, LIB_CP, HADOOP_JARS, BUILDDIR, JOBFILE)
-  print cmd
-  if subprocess.call(cmd, shell=True):
-    shutil.rmtree(BUILDDIR)
-    sys.exit(1)
-
-  jar_cmd = "jar cf %s -C %s ." % (JOBJARPATH, BUILDDIR)
-  subprocess.call(jar_cmd, shell=True)
-  shutil.rmtree(BUILDDIR)
-
-def hadoop_command():
-  return "HADOOP_CLASSPATH=%s ; %s/bin/hadoop jar %s %s %s" % (LIB_CP, HADOOP_HOME, JOBJARPATH, JOB, " ".join(argv))
-
-if is_file() and needs_rebuild():
-  build_job_jar()
-
-SHELL_COMMAND = hadoop_command()
-print SHELL_COMMAND
-os.system(SHELL_COMMAND)

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/dfd28922/scrunch/src/site/markdown/index.md
----------------------------------------------------------------------
diff --git a/scrunch/src/site/markdown/index.md b/scrunch/src/site/markdown/index.md
deleted file mode 100644
index 32a9279..0000000
--- a/scrunch/src/site/markdown/index.md
+++ /dev/null
@@ -1,20 +0,0 @@
-<!--
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements.  See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership.  The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License.  You may obtain a copy of the License at
-
-http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied.  See the License for the
-specific language governing permissions and limitations
-under the License.
--->
-# Scrunch - A Scala Wrapper for Crunch
----

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/dfd28922/scrunch/src/site/site.xml
----------------------------------------------------------------------
diff --git a/scrunch/src/site/site.xml b/scrunch/src/site/site.xml
deleted file mode 100644
index 73fbd17..0000000
--- a/scrunch/src/site/site.xml
+++ /dev/null
@@ -1,34 +0,0 @@
-<?xml version="1.0" encoding="UTF-8"?>
-<!--
-  Licensed to the Apache Software Foundation (ASF) under one or more
-  contributor license agreements.  See the NOTICE file distributed with
-  this work for additional information regarding copyright ownership.
-  The ASF licenses this file to You under the Apache License, Version 2.0
-  (the "License"); you may not use this file except in compliance with
-  the License.  You may obtain a copy of the License at
-
-      http://www.apache.org/licenses/LICENSE-2.0
-
-  Unless required by applicable law or agreed to in writing, software
-  distributed under the License is distributed on an "AS IS" BASIS,
-  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-  See the License for the specific language governing permissions and
-  limitations under the License.
--->
-<project name="${project.name}"
-  xmlns="http://maven.apache.org/DECORATION/1.3.0"
-  xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
-  xsi:schemaLocation="http://maven.apache.org/DECORATION/1.3.0
-                      http://maven.apache.org/xsd/decoration-1.3.0.xsd">
-
-  <body>
-    <!-- Note: Breadcrumbs for Doxia's Markdown parser are currently broken,
-               see https://jira.codehaus.org/browse/DOXIA-472 -->
-    <breadcrumbs>
-      <item name="Apache" href="http://www.apache.org/index.html" />
-      <item name="Crunch" href="../index.html"/>
-    </breadcrumbs>
-
-  </body>
-
-</project>

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/dfd28922/src/site/markdown/scrunch.md
----------------------------------------------------------------------
diff --git a/src/site/markdown/scrunch.md b/src/site/markdown/scrunch.md
index 7d0585a..324b88a 100644
--- a/src/site/markdown/scrunch.md
+++ b/src/site/markdown/scrunch.md
@@ -32,9 +32,9 @@ a mixture of functional and object-oriented programming styles and has powerful
 capabilities, allowing us to create complex pipelines using very few keystrokes. Here is
 the Scrunch analogue of the classic WordCount problem:
 
-	import com.cloudera.crunch.io.{From => from}
-	import com.cloudera.scrunch._
-	import com.cloudera.scrunch.Conversions_  # For implicit type conversions
+	import org.apache.crunch.io.{From => from}
+	import org.apache.crunch.scrunch._
+	import org.apache.crunch.scrunch.Conversions_  # For implicit type conversions
 
 	class WordCountExample {
 	  val pipeline = new Pipeline[WordCountExample]
@@ -80,7 +80,6 @@ the output of a Crunch pipeline into the client:
 Scrunch is alpha-quality code, written by someone who was learning Scala on the fly. There will be bugs,
 rough edges, and non-idiomatic Scala usage all over the place. This will improve with time, and we welcome
 contributions from Scala experts who are interested in helping us make Scrunch into a first-class project.
-The Crunch developers mailing list is [here](https://groups.google.com/a/cloudera.org/group/crunch-dev/topics).
 
 Scrunch emerged out of conversations with [Dmitriy Ryaboy](http://twitter.com/#!/squarecog),
 [Oscar Boykin](http://twitter.com/#!/posco), and [Avi Bryant](http://twitter.com/#!/avibryant) from Twitter.