You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@crunch.apache.org by jw...@apache.org on 2012/08/08 02:47:06 UTC
[10/10] CRUNCH-32: Clean up namespaces.
http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/dfd28922/scrunch/src/main/scala/org/apache/scrunch/PTable.scala
----------------------------------------------------------------------
diff --git a/scrunch/src/main/scala/org/apache/scrunch/PTable.scala b/scrunch/src/main/scala/org/apache/scrunch/PTable.scala
deleted file mode 100644
index 7f506e5..0000000
--- a/scrunch/src/main/scala/org/apache/scrunch/PTable.scala
+++ /dev/null
@@ -1,191 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.scrunch
-
-import java.util.{Collection => JCollect}
-
-import scala.collection.JavaConversions._
-
-import org.apache.crunch.{DoFn, Emitter, FilterFn, MapFn}
-import org.apache.crunch.{GroupingOptions, PTable => JTable, Pair => CPair}
-import org.apache.crunch.lib.{Join, Cartesian, Aggregate, Cogroup, PTables}
-import org.apache.scrunch.interpreter.InterpreterRunner
-
-class PTable[K, V](val native: JTable[K, V]) extends PCollectionLike[CPair[K, V], PTable[K, V], JTable[K, V]] {
- import PTable._
-
- def filter(f: (K, V) => Boolean): PTable[K, V] = {
- parallelDo(filterFn[K, V](f), native.getPTableType())
- }
-
- def map[T, To](f: (K, V) => T)
- (implicit pt: PTypeH[T], b: CanParallelTransform[T, To]): To = {
- b(this, mapFn(f), pt.get(getTypeFamily()))
- }
-
- def mapValues[T](f: V => T)(implicit pt: PTypeH[T]) = {
- val ptf = getTypeFamily()
- val ptype = ptf.tableOf(native.getKeyType(), pt.get(ptf))
- parallelDo(mapValuesFn[K, V, T](f), ptype)
- }
-
- def mapKeys[T](f: K => T)(implicit pt: PTypeH[T]) = {
- val ptf = getTypeFamily()
- val ptype = ptf.tableOf(pt.get(ptf), native.getValueType())
- parallelDo(mapKeysFn[K, V, T](f), ptype)
- }
-
- def flatMap[T, To](f: (K, V) => Traversable[T])
- (implicit pt: PTypeH[T], b: CanParallelTransform[T, To]): To = {
- b(this, flatMapFn(f), pt.get(getTypeFamily()))
- }
-
- def union(others: PTable[K, V]*) = {
- new PTable[K, V](native.union(others.map(_.native) : _*))
- }
-
- def keys() = new PCollection[K](PTables.keys(native))
-
- def values() = new PCollection[V](PTables.values(native))
-
- def cogroup[V2](other: PTable[K, V2]) = {
- val jres = Cogroup.cogroup[K, V, V2](this.native, other.native)
- val ptf = getTypeFamily()
- val inter = new PTable[K, CPair[JCollect[V], JCollect[V2]]](jres)
- inter.parallelDo(new SMapTableValuesFn[K, CPair[JCollect[V], JCollect[V2]], (Iterable[V], Iterable[V2])] {
- def apply(x: CPair[JCollect[V], JCollect[V2]]) = {
- (collectionAsScalaIterable[V](x.first()), collectionAsScalaIterable[V2](x.second()))
- }
- }, ptf.tableOf(keyType, ptf.tuple2(ptf.collections(valueType), ptf.collections(other.valueType))))
- }
-
- type JoinFn[V2] = (JTable[K, V], JTable[K, V2]) => JTable[K, CPair[V, V2]]
-
- protected def join[V2](joinFn: JoinFn[V2], other: PTable[K, V2]): PTable[K, (V, V2)] = {
- val jres = joinFn(this.native, other.native)
- val ptf = getTypeFamily()
- val ptype = ptf.tableOf(keyType, ptf.tuple2(valueType, other.valueType))
- val inter = new PTable[K, CPair[V, V2]](jres)
- inter.parallelDo(new SMapTableValuesFn[K, CPair[V, V2], (V, V2)] {
- def apply(x: CPair[V, V2]) = (x.first(), x.second())
- }, ptype)
- }
-
- def join[V2](other: PTable[K, V2]): PTable[K, (V, V2)] = {
- innerJoin(other)
- }
-
- def innerJoin[V2](other: PTable[K, V2]): PTable[K, (V, V2)] = {
- join[V2](Join.innerJoin[K, V, V2](_, _), other)
- }
-
- def leftJoin[V2](other: PTable[K, V2]): PTable[K, (V, V2)] = {
- join[V2](Join.leftJoin[K, V, V2](_, _), other)
- }
-
- def rightJoin[V2](other: PTable[K, V2]): PTable[K, (V, V2)] = {
- join[V2](Join.rightJoin[K, V, V2](_, _), other)
- }
-
- def fullJoin[V2](other: PTable[K, V2]): PTable[K, (V, V2)] = {
- join[V2](Join.fullJoin[K, V, V2](_, _), other)
- }
-
- def cross[K2, V2](other: PTable[K2, V2]): PTable[(K, K2), (V, V2)] = {
- val ptf = getTypeFamily()
- val inter = new PTable(Cartesian.cross(this.native, other.native))
- val f = (k: CPair[K,K2], v: CPair[V,V2]) => CPair.of((k.first(), k.second()), (v.first(), v.second()))
- inter.parallelDo(mapFn(f), ptf.tableOf(ptf.tuple2(keyType, other.keyType), ptf.tuple2(valueType, other.valueType)))
- }
-
- def top(limit: Int, maximize: Boolean) = {
- wrap(Aggregate.top(this.native, limit, maximize))
- }
-
- def groupByKey() = new PGroupedTable(native.groupByKey())
-
- def groupByKey(partitions: Int) = new PGroupedTable(native.groupByKey(partitions))
-
- def groupByKey(options: GroupingOptions) = new PGroupedTable(native.groupByKey(options))
-
- def wrap(newNative: AnyRef) = {
- new PTable[K, V](newNative.asInstanceOf[JTable[K, V]])
- }
-
- def unwrap(sc: PTable[K, V]): JTable[K, V] = sc.native
-
- def materialize(): Iterable[(K, V)] = {
- InterpreterRunner.addReplJarsToJob(native.getPipeline().getConfiguration())
- native.materialize.view.map(x => (x.first, x.second))
- }
-
- def materializeToMap(): Map[K, V] = {
- InterpreterRunner.addReplJarsToJob(native.getPipeline().getConfiguration())
- native.materializeToMap().view.toMap
- }
-
- def keyType() = native.getPTableType().getKeyType()
-
- def valueType() = native.getPTableType().getValueType()
-}
-
-trait SFilterTableFn[K, V] extends FilterFn[CPair[K, V]] with Function2[K, V, Boolean] {
- override def accept(input: CPair[K, V]) = apply(input.first(), input.second())
-}
-
-trait SDoTableFn[K, V, T] extends DoFn[CPair[K, V], T] with Function2[K, V, Traversable[T]] {
- override def process(input: CPair[K, V], emitter: Emitter[T]) {
- for (v <- apply(input.first(), input.second())) {
- emitter.emit(v)
- }
- }
-}
-
-trait SMapTableFn[K, V, T] extends MapFn[CPair[K, V], T] with Function2[K, V, T] {
- override def map(input: CPair[K, V]) = apply(input.first(), input.second())
-}
-
-trait SMapTableValuesFn[K, V, T] extends MapFn[CPair[K, V], CPair[K, T]] with Function1[V, T] {
- override def map(input: CPair[K, V]) = CPair.of(input.first(), apply(input.second()))
-}
-
-trait SMapTableKeysFn[K, V, T] extends MapFn[CPair[K, V], CPair[T, V]] with Function1[K, T] {
- override def map(input: CPair[K, V]) = CPair.of(apply(input.first()), input.second())
-}
-
-object PTable {
- def filterFn[K, V](fn: (K, V) => Boolean) = {
- new SFilterTableFn[K, V] { def apply(k: K, v: V) = fn(k, v) }
- }
-
- def mapValuesFn[K, V, T](fn: V => T) = {
- new SMapTableValuesFn[K, V, T] { def apply(v: V) = fn(v) }
- }
-
- def mapKeysFn[K, V, T](fn: K => T) = {
- new SMapTableKeysFn[K, V, T] { def apply(k: K) = fn(k) }
- }
-
- def mapFn[K, V, T](fn: (K, V) => T) = {
- new SMapTableFn[K, V, T] { def apply(k: K, v: V) = fn(k, v) }
- }
-
- def flatMapFn[K, V, T](fn: (K, V) => Traversable[T]) = {
- new SDoTableFn[K, V, T] { def apply(k: K, v: V) = fn(k, v) }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/dfd28922/scrunch/src/main/scala/org/apache/scrunch/PTypeFamily.scala
----------------------------------------------------------------------
diff --git a/scrunch/src/main/scala/org/apache/scrunch/PTypeFamily.scala b/scrunch/src/main/scala/org/apache/scrunch/PTypeFamily.scala
deleted file mode 100644
index 1bd3db6..0000000
--- a/scrunch/src/main/scala/org/apache/scrunch/PTypeFamily.scala
+++ /dev/null
@@ -1,127 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.scrunch
-
-import org.apache.crunch.{Pair => CPair, Tuple3 => CTuple3, Tuple4 => CTuple4, MapFn}
-import org.apache.crunch.types.{PType, PTypeFamily => PTF}
-import org.apache.crunch.types.writable.WritableTypeFamily
-import org.apache.crunch.types.avro.{AvroTypeFamily, Avros => CAvros}
-import java.lang.{Long => JLong, Double => JDouble, Integer => JInt, Float => JFloat, Boolean => JBoolean}
-import java.util.{Collection => JCollection}
-import scala.collection.JavaConversions._
-
-class TMapFn[S, T](f: S => T) extends MapFn[S, T] {
- override def map(input: S) = f(input)
-}
-
-trait PTypeFamily {
-
- def ptf: PTF
-
- val strings = ptf.strings()
-
- val bytes = ptf.bytes()
-
- def records[T: ClassManifest] = ptf.records(classManifest[T].erasure)
-
- def derived[S, T](cls: java.lang.Class[T], in: S => T, out: T => S, pt: PType[S]) = {
- ptf.derived(cls, new TMapFn[S, T](in), new TMapFn[T, S](out), pt)
- }
-
- val longs = {
- val in = (x: JLong) => x.longValue()
- val out = (x: Long) => new JLong(x)
- derived(classOf[Long], in, out, ptf.longs())
- }
-
- val ints = {
- val in = (x: JInt) => x.intValue()
- val out = (x: Int) => new JInt(x)
- derived(classOf[Int], in, out, ptf.ints())
- }
-
- val floats = {
- val in = (x: JFloat) => x.floatValue()
- val out = (x: Float) => new JFloat(x)
- derived(classOf[Float], in, out, ptf.floats())
- }
-
- val doubles = {
- val in = (x: JDouble) => x.doubleValue()
- val out = (x: Double) => new JDouble(x)
- derived(classOf[Double], in, out, ptf.doubles())
- }
-
- val booleans = {
- val in = (x: JBoolean) => x.booleanValue()
- val out = (x: Boolean) => new JBoolean(x)
- derived(classOf[Boolean], in, out, ptf.booleans())
- }
-
- def collections[T](ptype: PType[T]) = {
- derived(classOf[Iterable[T]], collectionAsScalaIterable[T], asJavaCollection[T], ptf.collections(ptype))
- }
-
- def maps[T](ptype: PType[T]) = {
- derived(classOf[scala.collection.Map[String, T]], mapAsScalaMap[String, T], mapAsJavaMap[String, T], ptf.maps(ptype))
- }
-
- def lists[T](ptype: PType[T]) = {
- val in = (x: JCollection[T]) => collectionAsScalaIterable[T](x).toList
- val out = (x: List[T]) => asJavaCollection[T](x)
- derived(classOf[List[T]], in, out, ptf.collections(ptype))
- }
-
- def sets[T](ptype: PType[T]) = {
- val in = (x: JCollection[T]) => collectionAsScalaIterable[T](x).toSet
- val out = (x: Set[T]) => asJavaCollection[T](x)
- derived(classOf[Set[T]], in, out, ptf.collections(ptype))
- }
-
- def tuple2[T1, T2](p1: PType[T1], p2: PType[T2]) = {
- val in = (x: CPair[T1, T2]) => (x.first(), x.second())
- val out = (x: (T1, T2)) => CPair.of(x._1, x._2)
- derived(classOf[(T1, T2)], in, out, ptf.pairs(p1, p2))
- }
-
- def tuple3[T1, T2, T3](p1: PType[T1], p2: PType[T2], p3: PType[T3]) = {
- val in = (x: CTuple3[T1, T2, T3]) => (x.first(), x.second(), x.third())
- val out = (x: (T1, T2, T3)) => CTuple3.of(x._1, x._2, x._3)
- derived(classOf[(T1, T2, T3)], in, out, ptf.triples(p1, p2, p3))
- }
-
- def tuple4[T1, T2, T3, T4](p1: PType[T1], p2: PType[T2], p3: PType[T3], p4: PType[T4]) = {
- val in = (x: CTuple4[T1, T2, T3, T4]) => (x.first(), x.second(), x.third(), x.fourth())
- val out = (x: (T1, T2, T3, T4)) => CTuple4.of(x._1, x._2, x._3, x._4)
- derived(classOf[(T1, T2, T3, T4)], in, out, ptf.quads(p1, p2, p3, p4))
- }
-
- def tableOf[K, V](keyType: PType[K], valueType: PType[V]) = ptf.tableOf(keyType, valueType)
-}
-
-object Writables extends PTypeFamily {
- override def ptf = WritableTypeFamily.getInstance()
-}
-
-object Avros extends PTypeFamily {
- override def ptf = AvroTypeFamily.getInstance()
-
- CAvros.REFLECT_DATA_FACTORY = new ScalaReflectDataFactory()
-
- def reflects[T: ClassManifest]() = CAvros.reflects(classManifest[T].erasure)
-}
http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/dfd28922/scrunch/src/main/scala/org/apache/scrunch/Pipeline.scala
----------------------------------------------------------------------
diff --git a/scrunch/src/main/scala/org/apache/scrunch/Pipeline.scala b/scrunch/src/main/scala/org/apache/scrunch/Pipeline.scala
deleted file mode 100644
index fa13d3a..0000000
--- a/scrunch/src/main/scala/org/apache/scrunch/Pipeline.scala
+++ /dev/null
@@ -1,164 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.scrunch
-
-import java.io.File
-
-import org.apache.hadoop.conf.Configuration
-import org.slf4j.LoggerFactory
-
-import org.apache.crunch.{Pipeline => JPipeline}
-import org.apache.crunch.impl.mem.MemPipeline
-import org.apache.crunch.impl.mr.MRPipeline
-import org.apache.crunch.util.DistCache
-import org.apache.scrunch.interpreter.InterpreterRunner
-
-/**
- * Manages the state of a pipeline execution.
- *
- * ==Overview==
- * There are two subtypes of [[org.apache.crunch.Pipeline]]:
- * [[org.apache.crunch.Pipeline#MapReduce]] - for jobs run on a Hadoop cluster.
- * [[org.apache.crunch.Pipeline#InMemory]] - for jobs run in memory.
- *
- * To create a Hadoop pipeline:
- * {{{
- * import org.apache.scrunch.Pipeline
- *
- * Pipeline.mapreduce[MyClass]
- * }}}
- *
- * To get an in memory pipeline:
- * {{{
- * import org.apache.scrunch.Pipeline
- *
- * Pipeline.inMemory
- * }}}
- */
-class Pipeline(val jpipeline: JPipeline) extends PipelineLike {
- /**
- * A convenience method for reading a text file.
- *
- * @param pathName Path to desired text file.
- * @return A PCollection containing the lines in the specified file.
- */
- def readTextFile(pathName: String): PCollection[String] = {
- new PCollection[String](jpipeline.readTextFile(pathName))
- }
-
- /**
- * A convenience method for writing a text file.
- *
- * @param pcollect A PCollection to write to text.
- * @param pathName Path to desired output text file.
- */
- def writeTextFile[T](pcollect: PCollection[T], pathName: String) {
- jpipeline.writeTextFile(pcollect.native, pathName)
- }
-}
-
-/**
- * Companion object. Contains subclasses of Pipeline.
- */
-object Pipeline {
- val log = LoggerFactory.getLogger(classOf[Pipeline])
-
- /**
- * Pipeline for running jobs on a hadoop cluster.
- *
- * @param clazz Type of the class using the pipeline.
- * @param configuration Hadoop configuration to use.
- */
- class MapReducePipeline (clazz: Class[_], configuration: Configuration) extends Pipeline(
- {
- // Attempt to add all jars in the Scrunch distribution lib directory to the job that will
- // be run.
- val jarPath = DistCache.findContainingJar(classOf[org.apache.scrunch.Pipeline])
- if (jarPath != null) {
- val scrunchJarFile = new File(jarPath)
- DistCache.addJarDirToDistributedCache(configuration, scrunchJarFile.getParent())
- } else {
- log.warn("Could not locate Scrunch jar file, so could not add Scrunch jars to the " +
- "job(s) about to be run.")
- }
- if (InterpreterRunner.repl == null) {
- new MRPipeline(clazz, configuration)
- } else {
- // We're running in the REPL, so we'll use the crunch jar as the job jar.
- new MRPipeline(classOf[org.apache.scrunch.Pipeline], configuration)
- }
- })
-
- /**
- * Pipeline for running jobs in memory.
- */
- object InMemoryPipeline extends Pipeline(MemPipeline.getInstance())
-
- /**
- * Creates a pipeline for running jobs on a hadoop cluster using the default configuration.
- *
- * @param clazz Type of the class using the pipeline.
- */
- def mapReduce(clazz: Class[_]): MapReducePipeline = mapReduce(clazz, new Configuration())
-
- /**
- * Creates a pipeline for running jobs on a hadoop cluster.
- *
- * @param clazz Type of the class using the pipeline.
- * @param configuration Hadoop configuration to use.
- */
- def mapReduce(clazz: Class[_], configuration: Configuration): MapReducePipeline = {
- new MapReducePipeline(clazz, configuration)
- }
-
- /**
- * Creates a pipeline for running jobs on a hadoop cluster using the default configuration.
- *
- * @tparam T Type of the class using the pipeline.
- */
- def mapReduce[T : ClassManifest]: MapReducePipeline = mapReduce[T](new Configuration())
-
- /**
- * Creates a pipeline for running jobs on a hadoop cluster.
- *
- * @param configuration Hadoop configuration to use.
- * @tparam T Type of the class using the pipeline.
- */
- def mapReduce[T : ClassManifest](configuration: Configuration): MapReducePipeline = {
- new MapReducePipeline(implicitly[ClassManifest[T]].erasure, configuration)
- }
-
- /**
- * Gets a pipeline for running jobs in memory.
- */
- def inMemory: InMemoryPipeline.type = InMemoryPipeline
-
- /**
- * Creates a new Pipeline according to the provided specifications.
- *
- * @param configuration Configuration for connecting to a Hadoop cluster.
- * @param memory Option specifying whether or not the pipeline is an in memory or mapreduce pipeline.
- * @param manifest ClassManifest for the class using the pipeline.
- * @tparam T type of the class using the pipeline.
- * @deprecated Use either {{{Pipeline.mapReduce(class, conf)}}} or {{{Pipeline.inMemory}}}
- */
- def apply[T](
- configuration: Configuration = new Configuration(),
- memory: Boolean = false)(implicit manifest: ClassManifest[T]
- ): Pipeline = if (memory) inMemory else mapReduce(manifest.erasure, configuration)
-}
http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/dfd28922/scrunch/src/main/scala/org/apache/scrunch/PipelineApp.scala
----------------------------------------------------------------------
diff --git a/scrunch/src/main/scala/org/apache/scrunch/PipelineApp.scala b/scrunch/src/main/scala/org/apache/scrunch/PipelineApp.scala
deleted file mode 100644
index b427f20..0000000
--- a/scrunch/src/main/scala/org/apache/scrunch/PipelineApp.scala
+++ /dev/null
@@ -1,64 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.scrunch
-
-import java.io.Serializable
-
-import scala.collection.mutable.ListBuffer
-
-import org.apache.hadoop.conf.Configuration
-import org.apache.hadoop.fs.{FileSystem, Path}
-import org.apache.hadoop.util.GenericOptionsParser
-
-import org.apache.crunch.{Source, TableSource, Target}
-
-trait PipelineApp extends MREmbeddedPipeline with PipelineHelper with DelayedInit {
- implicit def _string2path(str: String) = new Path(str)
-
- /** Contains factory methods used to create `Source`s. */
- val from = From
-
- /** Contains factory methods used to create `Target`s. */
- val to = To
-
- /** Contains factory methods used to create `SourceTarget`s. */
- val at = At
-
- private val initCode = new ListBuffer[() => Unit]
-
- private var _args: Array[String] = _
-
- /** Command-line arguments passed to this application. */
- protected def args: Array[String] = _args
-
- def configuration: Configuration = pipeline.getConfiguration
-
- /** Gets the distributed filesystem associated with this application's configuration. */
- def fs: FileSystem = FileSystem.get(configuration)
-
- override def delayedInit(body: => Unit) {
- initCode += (() => body)
- }
-
- def main(args: Array[String]) = {
- val parser = new GenericOptionsParser(configuration, args)
- _args = parser.getRemainingArgs()
- for (proc <- initCode) proc()
- done
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/dfd28922/scrunch/src/main/scala/org/apache/scrunch/PipelineHelper.scala
----------------------------------------------------------------------
diff --git a/scrunch/src/main/scala/org/apache/scrunch/PipelineHelper.scala b/scrunch/src/main/scala/org/apache/scrunch/PipelineHelper.scala
deleted file mode 100644
index cdeb37b..0000000
--- a/scrunch/src/main/scala/org/apache/scrunch/PipelineHelper.scala
+++ /dev/null
@@ -1,74 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.scrunch
-
-/**
- * This trait provides convenience methods for building pipelines.
- */
-trait PipelineHelper {
- /**
- * Materializes the specified PCollection and displays its contents.
- */
- def dump(data: PCollection[_]) {
- data.materialize.foreach(println(_))
- }
-
- /**
- * Materializes the specified PTable and displays its contents.
- */
- def dump(data: PTable[_, _]) {
- data.materialize.foreach(println(_))
- }
-
- /**
- * Performs a cogroup on the two specified PTables.
- */
- def cogroup[K : PTypeH, V1 : PTypeH, V2 : PTypeH](t1: PTable[K, V1], t2: PTable[K, V2])
- : PTable[K, (Iterable[V1], Iterable[V2])] = {
- t1.cogroup(t2)
- }
-
- /**
- * Performs an innerjoin on the two specified PTables.
- */
- def join[K : PTypeH, V1 : PTypeH, V2 : PTypeH](t1: PTable[K, V1], t2: PTable[K, V2])
- : PTable[K, (V1, V2)] = {
- t1.join(t2)
- }
-
- /**
- * Unions the specified PCollections.
- */
- def union[T](first: PCollection[T], others: PCollection[T]*)
- : PCollection[T] = {
- first.union(others: _*)
- }
-
- /**
- * Unions the specified PTables.
- */
- def union[K, V](first: PTable[K, V], others: PTable[K, V]*)
- : PTable[K, V] = {
- first.union(others: _*)
- }
-}
-
-/**
- * Companion object containing convenience methods for building pipelines.
- */
-object PipelineHelper extends PipelineHelper
http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/dfd28922/scrunch/src/main/scala/org/apache/scrunch/PipelineLike.scala
----------------------------------------------------------------------
diff --git a/scrunch/src/main/scala/org/apache/scrunch/PipelineLike.scala b/scrunch/src/main/scala/org/apache/scrunch/PipelineLike.scala
deleted file mode 100644
index 92dbaf3..0000000
--- a/scrunch/src/main/scala/org/apache/scrunch/PipelineLike.scala
+++ /dev/null
@@ -1,91 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.scrunch
-
-import org.apache.hadoop.conf.Configuration
-
-import org.apache.crunch.{Pipeline => JPipeline}
-import org.apache.crunch.Source
-import org.apache.crunch.TableSource
-import org.apache.crunch.Target
-import org.apache.scrunch.interpreter.InterpreterRunner
-
-trait PipelineLike {
- def jpipeline: JPipeline
-
- /**
- * Gets the configuration object associated with this pipeline.
- */
- def getConfiguration(): Configuration = jpipeline.getConfiguration()
-
- /**
- * Reads a source into a [[org.apache.scrunch.PCollection]]
- *
- * @param source The source to read from.
- * @tparam T The type of the values being read.
- * @return A PCollection containing data read from the specified source.
- */
- def read[T](source: Source[T]): PCollection[T] = new PCollection(jpipeline.read(source))
-
- /**
- * Reads a source into a [[org.apache.scrunch.PTable]]
- *
- * @param source The source to read from.
- * @tparam K The type of the keys being read.
- * @tparam V The type of the values being read.
- * @return A PCollection containing data read from the specified source.
- */
- def read[K, V](source: TableSource[K, V]): PTable[K, V] = new PTable(jpipeline.read(source))
-
- /**
- * Writes a parallel collection to a target.
- *
- * @param collection The collection to write.
- * @param target The destination target for this write.
- */
- def write(collection: PCollection[_], target: Target): Unit = jpipeline.write(collection.native, target)
-
- /**
- * Writes a parallel table to a target.
- *
- * @param table The table to write.
- * @param target The destination target for this write.
- */
- def write(table: PTable[_, _], target: Target): Unit = jpipeline.write(table.native, target)
-
- /**
- * Constructs and executes a series of MapReduce jobs in order
- * to write data to the output targets.
- */
- def run(): Unit = {
- InterpreterRunner.addReplJarsToJob(getConfiguration())
- jpipeline.run()
- }
-
- /**
- * Run any remaining jobs required to generate outputs and then
- * clean up any intermediate data files that were created in
- * this run or previous calls to `run`.
- */
- def done(): Unit = jpipeline.done()
-
- /**
- * Turn on debug logging for jobs that are run from this pipeline.
- */
- def debug(): Unit = jpipeline.enableDebug()
-}
http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/dfd28922/scrunch/src/main/scala/org/apache/scrunch/interpreter/InterpreterRunner.scala
----------------------------------------------------------------------
diff --git a/scrunch/src/main/scala/org/apache/scrunch/interpreter/InterpreterRunner.scala b/scrunch/src/main/scala/org/apache/scrunch/interpreter/InterpreterRunner.scala
deleted file mode 100644
index e37a0c7..0000000
--- a/scrunch/src/main/scala/org/apache/scrunch/interpreter/InterpreterRunner.scala
+++ /dev/null
@@ -1,208 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.scrunch.interpreter
-
-import java.io.File
-import java.io.FileOutputStream
-import java.util.jar.JarEntry
-import java.util.jar.JarOutputStream
-
-import scala.tools.nsc.GenericRunnerCommand
-import scala.tools.nsc.Global
-import scala.tools.nsc.MainGenericRunner
-import scala.tools.nsc.ObjectRunner
-import scala.tools.nsc.Properties
-import scala.tools.nsc.ScriptRunner
-import scala.tools.nsc.interpreter.ILoop
-import scala.tools.nsc.io.Jar
-import scala.tools.nsc.io.VirtualDirectory
-
-import com.google.common.io.Files
-import org.apache.commons.io.IOUtils
-import org.apache.hadoop.conf.Configuration
-
-import org.apache.crunch.util.DistCache
-
-/**
- * An object used to run a Scala REPL with modifications to facilitate Scrunch jobs running
- * within the REPL.
- */
-object InterpreterRunner extends MainGenericRunner {
-
- // The actual Scala repl.
- var repl: ILoop = null
-
- /**
- * Checks whether or not the Scala repl has been started.
- *
- * @return <code>true</code> if the repl is running, <code>false</code> otherwise.
- */
- def isReplRunning() = repl == null
-
- /**
- * The main entry point for the REPL. This method is lifted from
- * {@link scala.tools.nsc.MainGenericRunner} and modified to facilitate testing whether or not
- * the REPL is actually running.
- *
- * @param args Arguments used on the command line to start the REPL.
- * @return <code>true</code> if execution was successful, <code>false</code> otherwise.
- */
- override def process(args: Array[String]): Boolean = {
- val command = new GenericRunnerCommand(args.toList, (x: String) => errorFn(x))
- import command.{settings, howToRun, thingToRun}
- // Defines a nested function to retrieve a sample compiler if necessary.
- def sampleCompiler = new Global(settings)
-
- import Properties.{versionString, copyrightString}
- if (!command.ok) {
- return errorFn("\n" + command.shortUsageMsg)
- } else if (settings.version.value) {
- return errorFn("Scala code runner %s -- %s".format(versionString, copyrightString))
- } else if (command.shouldStopWithInfo) {
- return errorFn(command getInfoMessage sampleCompiler)
- }
-
- // Functions to retrieve settings values that were passed to REPL invocation.
- // The -e argument provides a Scala statement to execute.
- // The -i option requests that a file be preloaded into the interactive shell.
- def isE = !settings.execute.isDefault
- def dashe = settings.execute.value
- def isI = !settings.loadfiles.isDefault
- def dashi = settings.loadfiles.value
-
- // Function to retrieve code passed by -e and -i options to REPL.
- def combinedCode = {
- val files = if (isI) dashi map (file => scala.tools.nsc.io.File(file).slurp()) else Nil
- val str = if (isE) List(dashe) else Nil
- files ++ str mkString "\n\n"
- }
-
- import GenericRunnerCommand._
-
- // Function for running the target command. It can run an object with main, a script, or
- // an interactive REPL.
- def runTarget(): Either[Throwable, Boolean] = howToRun match {
- case AsObject =>
- ObjectRunner.runAndCatch(settings.classpathURLs, thingToRun, command.arguments)
- case AsScript =>
- ScriptRunner.runScriptAndCatch(settings, thingToRun, command.arguments)
- case AsJar =>
- ObjectRunner.runAndCatch(
- scala.tools.nsc.io.File(thingToRun).toURL +: settings.classpathURLs,
- new Jar(thingToRun).mainClass getOrElse sys.error("Cannot find main class for jar: " +
- thingToRun),
- command.arguments
- )
- case Error =>
- Right(false)
- case _ =>
- // We start the shell when no arguments are given.
- repl = new ILoop
- Right(repl.process(settings))
- }
-
- /**If -e and -i were both given, we want to execute the -e code after the
- * -i files have been included, so they are read into strings and prepended to
- * the code given in -e. The -i option is documented to only make sense
- * interactively so this is a pretty reasonable assumption.
- *
- * This all needs a rewrite though.
- */
- if (isE) {
- ScriptRunner.runCommand(settings, combinedCode, thingToRun +: command.arguments)
- }
- else runTarget() match {
- case Left(ex) => errorFn(ex)
- case Right(b) => b
- }
- }
-
- def main(args: Array[String]) {
- val retVal = process(args)
- if (!retVal)
- sys.exit(1)
- }
-
- /**
- * Creates a jar file containing the code thus far compiled by the REPL in a temporary directory.
- *
- * @return A file object representing the jar file created.
- */
- def createReplCodeJar(): File = {
- var jarStream: JarOutputStream = null
- try {
- val virtualDirectory = repl.virtualDirectory
- val tempDir = Files.createTempDir()
- val tempJar = new File(tempDir, "replJar.jar")
- jarStream = new JarOutputStream(new FileOutputStream(tempJar))
- addVirtualDirectoryToJar(virtualDirectory, "", jarStream)
- return tempJar
- } finally {
- IOUtils.closeQuietly(jarStream)
- }
- }
-
- /**
- * Add the contents of the specified virtual directory to a jar. This method will recursively
- * descend into subdirectories to add their contents.
- *
- * @param dir The virtual directory whose contents should be added.
- * @param entryPath The entry path for classes found in the virtual directory.
- * @param jarStream An output stream for writing the jar file.
- */
- def addVirtualDirectoryToJar(dir: VirtualDirectory, entryPath: String, jarStream:
- JarOutputStream): Unit = {
- dir.foreach { file =>
- if (file.isDirectory) {
- // Recursively descend into subdirectories, adjusting the package name as we do.
- val dirPath = entryPath + file.name + "/"
- val entry: JarEntry = new JarEntry(dirPath)
- jarStream.putNextEntry(entry)
- jarStream.closeEntry()
- addVirtualDirectoryToJar(file.asInstanceOf[VirtualDirectory],
- dirPath, jarStream)
- } else if (file.hasExtension("class")) {
- // Add class files as an entry in the jar file and write the class to the jar.
- val entry: JarEntry = new JarEntry(entryPath + file.name)
- jarStream.putNextEntry(entry)
- jarStream.write(file.toByteArray)
- jarStream.closeEntry()
- }
- }
- }
-
- /**
- * Generates a jar containing the code thus far compiled by the REPL,
- * and adds that jar file to the distributed cache of jobs using the specified configuration.
- * Also adds any jars added with the :cp command to the user's job.
- *
- * @param configuration The configuration of jobs that should use the REPL code jar.
- */
- def addReplJarsToJob(configuration: Configuration): Unit = {
- if (repl != null) {
- // Generate a jar of REPL code and add to the distributed cache.
- val replJarFile = createReplCodeJar()
- DistCache.addJarToDistributedCache(configuration, replJarFile)
- // Get the paths to jars added with the :cp command.
- val addedJarPaths = repl.addedClasspath.split(':')
- addedJarPaths.foreach {
- path => if (path.endsWith(".jar")) DistCache.addJarToDistributedCache(configuration, path)
- }
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/dfd28922/scrunch/src/main/scripts/imports.scala
----------------------------------------------------------------------
diff --git a/scrunch/src/main/scripts/imports.scala b/scrunch/src/main/scripts/imports.scala
deleted file mode 100644
index 64d7149..0000000
--- a/scrunch/src/main/scripts/imports.scala
+++ /dev/null
@@ -1,19 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-import org.apache.scrunch._
-
http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/dfd28922/scrunch/src/main/scripts/scrunch
----------------------------------------------------------------------
diff --git a/scrunch/src/main/scripts/scrunch b/scrunch/src/main/scripts/scrunch
deleted file mode 100755
index 44cb6fb..0000000
--- a/scrunch/src/main/scripts/scrunch
+++ /dev/null
@@ -1,163 +0,0 @@
-#!/bin/bash --posix
-#
-##############################################################################
-# Copyright 2002-2011, LAMP/EPFL
-#
-# This is free software; see the distribution for copying conditions.
-# There is NO warranty; not even for MERCHANTABILITY or FITNESS FOR A
-# PARTICULAR PURPOSE.
-##############################################################################
-
-# Identify the bin dir in the distribution from which this script is running.
-bin=`dirname $0`
-bin=`cd ${bin} && pwd`
-
-# Set the directory where libraries for scrunch shell live.
-SCRUNCH_LIB_DIR="${bin}/../lib"
-# Set the conf directory for the scrunch distribution.
-SCRUNCH_CONF_DIR="${bin}/../conf"
-# Set the main class used to run scrunch shell.
-MAIN_CLASS="org.apache.scrunch.interpreter.InterpreterRunner"
-
-# Not sure what the right default is here: trying nonzero.
-scala_exit_status=127
-saved_stty=""
-
-# restore stty settings (echo in particular)
-function restoreSttySettings() {
- if [[ -n $SCALA_RUNNER_DEBUG ]]; then
- echo "restoring stty: $saved_stty"
- fi
-
- stty $saved_stty
- saved_stty=""
-}
-
-function onExit() {
- if [[ "$saved_stty" != "" ]]; then
- restoreSttySettings
- exit $scala_exit_status
- fi
-}
-
-# to reenable echo if we are interrupted before completing.
-trap onExit INT
-
-# save terminal settings
-saved_stty=$(stty -g 2>/dev/null)
-# clear on error so we don't later try to restore them
-if [[ ! $? ]]; then
- saved_stty=""
-fi
-if [[ -n $SCALA_RUNNER_DEBUG ]]; then
- echo "saved stty: $saved_stty"
-fi
-
-cygwin=false;
-case "`uname`" in
- CYGWIN*) cygwin=true ;;
-esac
-
-# Constructing scrunch shell classpath.
-SCRUNCH_SHELL_CLASSPATH=""
-# Add files in conf dir.
-for ext in "$SCRUNCH_CONF_DIR"/* ; do
- if [ -z "$SCRUNCH_SHELL_CLASSPATH" ] ; then
- SCRUNCH_SHELL_CLASSPATH="$ext"
- else
- SCRUNCH_SHELL_CLASSPATH="$SCRUNCH_SHELL_CLASSPATH:$ext"
- fi
-done
-# Add files in lib dir.
-for ext in "$SCRUNCH_LIB_DIR"/*.jar ; do
- if [ -z "$SCRUNCH_SHELL_CLASSPATH" ] ; then
- SCRUNCH_SHELL_CLASSPATH="$ext"
- else
- SCRUNCH_SHELL_CLASSPATH="$SCRUNCH_SHELL_CLASSPATH:$ext"
- fi
-done
-
-# Constructing Hadoop classpath.
-if [ -z "$HADOOP_HOME" ]; then
- echo "HADOOP_HOME must be set to run the Scrunch shell."
- exit 1
-fi
-HADOOP_CLASSPATH=`$HADOOP_HOME/bin/hadoop classpath`
-
-CYGWIN_JLINE_TERMINAL=
-if $cygwin; then
- if [ "$OS" = "Windows_NT" ] && cygpath -m .>/dev/null 2>/dev/null ; then
- format=mixed
- else
- format=windows
- fi
- SCRUNCH_SHELL_CLASSPATH=`cygpath --path --$format "$SCRUNCH_SHELL_CLASSPATH"`
- case "$TERM" in
- rxvt* | xterm*)
- stty -icanon min 1 -echo
- CYGWIN_JLINE_TERMINAL="-Djline.terminal=scala.tools.jline.UnixTerminal"
- ;;
- esac
-fi
-
-[ -n "$JAVA_OPTS" ] || JAVA_OPTS="-Xmx256M -Xms32M"
-
-# break out -D and -J options and add them to JAVA_OPTS as well
-# so they reach the underlying JVM in time to do some good. The
-# -D options will be available as system properties.
-declare -a java_args
-declare -a scala_args
-
-# Don't use the bootstrap classloader.
-CPSELECT="-classpath "
-
-while [ $# -gt 0 ]; do
- case "$1" in
- -D*)
- # pass to scala as well: otherwise we lose it sometimes when we
- # need it, e.g. communicating with a server compiler.
- java_args=("${java_args[@]}" "$1")
- scala_args=("${scala_args[@]}" "$1")
- shift
- ;;
- -J*)
- # as with -D, pass to scala even though it will almost
- # never be used.
- java_args=("${java_args[@]}" "${1:2}")
- scala_args=("${scala_args[@]}" "$1")
- shift
- ;;
- -toolcp)
- TOOL_CLASSPATH="$TOOL_CLASSPATH:$2"
- shift 2
- ;;
- *)
- scala_args=("${scala_args[@]}" "$1")
- shift
- ;;
- esac
-done
-
-# reset "$@" to the remaining args
-set -- "${scala_args[@]}"
-
-if [ -z "$JAVACMD" -a -n "$JAVA_HOME" -a -x "$JAVA_HOME/bin/java" ]; then
- JAVACMD="$JAVA_HOME/bin/java"
-fi
-
-"${JAVACMD:=java}" \
- $JAVA_OPTS \
- "${java_args[@]}" \
- ${CPSELECT}${TOOL_CLASSPATH}":"${SCRUNCH_SHELL_CLASSPATH}":"${HADOOP_CLASSPATH} \
- -Dscala.usejavacp=true \
- -Denv.emacs="$EMACS" \
- $CYGWIN_JLINE_TERMINAL \
- $MAIN_CLASS "$@" \
- -i ${bin}/imports.scala \
- -Yrepl-sync
-# The -Yrepl-sync option is a fix for the 2.9.1 REPL. This should probably not be necessary in the future.
-
-# record the exit status lest it be overwritten:
-# then reenable echo and propagate the code.
-scala_exit_status=$?
-onExit
http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/dfd28922/scrunch/src/main/scripts/scrunch-job.py
----------------------------------------------------------------------
diff --git a/scrunch/src/main/scripts/scrunch-job.py b/scrunch/src/main/scripts/scrunch-job.py
deleted file mode 100755
index 2a61b3b..0000000
--- a/scrunch/src/main/scripts/scrunch-job.py
+++ /dev/null
@@ -1,133 +0,0 @@
-#!/usr/bin/python
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-import glob
-import os
-import re
-import shutil
-import subprocess
-import sys
-
-# Configuration in script
-##############################################################
-if not "SCALA_HOME" in os.environ:
- sys.stderr.write("Environment variable SCALA_HOME must be set\n")
- sys.exit(1)
-SCALA_LIB = os.path.join(os.environ["SCALA_HOME"], "lib")
-
-if not "HADOOP_HOME" in os.environ:
- sys.stderr.write("Environment variable HADOOP_HOME must be set\n")
- sys.exit(1)
-HADOOP_HOME = os.environ["HADOOP_HOME"]
-HADOOP_JARS = ":".join(glob.glob(os.path.join(HADOOP_HOME, "*.jar")))
-
-#Get the absolute path of the original (non-symlink) file.
-if os.path.islink(__file__):
- ORIGINAL_FILE = os.readlink(__file__)
-else:
- ORIGINAL_FILE = __file__
-
-DIST_ROOT = os.path.abspath(os.path.dirname(ORIGINAL_FILE)+"/../")
-LIB_DIR = DIST_ROOT + "/lib" # Dir with all scrunch dependencies.
-TMPDIR = "/tmp"
-BUILDDIR = TMPDIR + "/script-build"
-COMPILE_CMD = "java -cp %s/scala-library.jar:%s/scala-compiler.jar -Dscala.home=%s scala.tools.nsc.Main" % (SCALA_LIB, SCALA_LIB, SCALA_LIB)
-##############################################################
-
-argv = sys.argv[1:]
-if len(argv) < 1:
- sys.stderr.write("ERROR: insufficient args.\n")
- sys.exit(1)
-
-JOBFILE = argv.pop(0)
-
-def file_type():
- m = re.search(r'\.(scala|java)$', JOBFILE)
- if m:
- return m.group(1)
- return None
-
-def is_file():
- return file_type() is not None
-
-PACK_RE = r'package ([^;]+)'
-OBJECT_RE = r'object\s+([^\s(]+).*(extends|with)\s+PipelineApp.*'
-EXTENSION_RE = r'(.*)\.(scala|java)$'
-
-#Get the name of the job from the file.
-#the rule is: last class in the file, or the one that matches the filename
-def get_job_name(file):
- package = ""
- job = None
- default = None
- match = re.search(EXTENSION_RE, file)
- if match:
- default = match.group(1)
- for s in open(file, "r"):
- mp = re.search(PACK_RE, s)
- mo = re.search(OBJECT_RE, s)
- if mp:
- package = mp.group(1).trim() + "."
- elif mo:
- if not job or not default or not job.tolower() == default.tolower():
- #use either the last class, or the one with the same name as the file
- job = mo.group(1)
- if not job:
- raise "Could not find job name"
- return "%s%s" % (package, job)
- else:
- return file
-
-LIB_PATH = os.path.abspath(LIB_DIR)
-if not os.path.exists(LIB_PATH):
- sys.stderr.write("Scrunch distribution lib directory not found; run mvn package to construct a distribution to run examples from.\n")
- sys.exit(1)
-LIB_JARS = glob.glob(os.path.join(LIB_PATH, "*.jar"))
-LIB_CP = ":".join(LIB_JARS)
-
-JOBPATH = os.path.abspath(JOBFILE)
-JOB = get_job_name(JOBFILE)
-JOBJAR = JOB + ".jar"
-JOBJARPATH = os.path.join(TMPDIR, JOBJAR)
-
-def needs_rebuild():
- return not os.path.exists(JOBJARPATH) or os.stat(JOBJARPATH).st_mtime < os.stat(JOBPATH).st_mtime
-
-def build_job_jar():
- sys.stderr.write("compiling " + JOBFILE + "\n")
- if os.path.exists(BUILDDIR):
- shutil.rmtree(BUILDDIR)
- os.makedirs(BUILDDIR)
- cmd = "%s -classpath %s:%s -d %s %s" % (COMPILE_CMD, LIB_CP, HADOOP_JARS, BUILDDIR, JOBFILE)
- print cmd
- if subprocess.call(cmd, shell=True):
- shutil.rmtree(BUILDDIR)
- sys.exit(1)
-
- jar_cmd = "jar cf %s -C %s ." % (JOBJARPATH, BUILDDIR)
- subprocess.call(jar_cmd, shell=True)
- shutil.rmtree(BUILDDIR)
-
-def hadoop_command():
- return "HADOOP_CLASSPATH=%s ; %s/bin/hadoop jar %s %s %s" % (LIB_CP, HADOOP_HOME, JOBJARPATH, JOB, " ".join(argv))
-
-if is_file() and needs_rebuild():
- build_job_jar()
-
-SHELL_COMMAND = hadoop_command()
-print SHELL_COMMAND
-os.system(SHELL_COMMAND)
http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/dfd28922/scrunch/src/site/markdown/index.md
----------------------------------------------------------------------
diff --git a/scrunch/src/site/markdown/index.md b/scrunch/src/site/markdown/index.md
deleted file mode 100644
index 32a9279..0000000
--- a/scrunch/src/site/markdown/index.md
+++ /dev/null
@@ -1,20 +0,0 @@
-<!--
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements. See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership. The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License. You may obtain a copy of the License at
-
-http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied. See the License for the
-specific language governing permissions and limitations
-under the License.
--->
-# Scrunch - A Scala Wrapper for Crunch
----
http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/dfd28922/scrunch/src/site/site.xml
----------------------------------------------------------------------
diff --git a/scrunch/src/site/site.xml b/scrunch/src/site/site.xml
deleted file mode 100644
index 73fbd17..0000000
--- a/scrunch/src/site/site.xml
+++ /dev/null
@@ -1,34 +0,0 @@
-<?xml version="1.0" encoding="UTF-8"?>
-<!--
- Licensed to the Apache Software Foundation (ASF) under one or more
- contributor license agreements. See the NOTICE file distributed with
- this work for additional information regarding copyright ownership.
- The ASF licenses this file to You under the Apache License, Version 2.0
- (the "License"); you may not use this file except in compliance with
- the License. You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
- Unless required by applicable law or agreed to in writing, software
- distributed under the License is distributed on an "AS IS" BASIS,
- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- See the License for the specific language governing permissions and
- limitations under the License.
--->
-<project name="${project.name}"
- xmlns="http://maven.apache.org/DECORATION/1.3.0"
- xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
- xsi:schemaLocation="http://maven.apache.org/DECORATION/1.3.0
- http://maven.apache.org/xsd/decoration-1.3.0.xsd">
-
- <body>
- <!-- Note: Breadcrumbs for Doxia's Markdown parser are currently broken,
- see https://jira.codehaus.org/browse/DOXIA-472 -->
- <breadcrumbs>
- <item name="Apache" href="http://www.apache.org/index.html" />
- <item name="Crunch" href="../index.html"/>
- </breadcrumbs>
-
- </body>
-
-</project>
http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/dfd28922/src/site/markdown/scrunch.md
----------------------------------------------------------------------
diff --git a/src/site/markdown/scrunch.md b/src/site/markdown/scrunch.md
index 7d0585a..324b88a 100644
--- a/src/site/markdown/scrunch.md
+++ b/src/site/markdown/scrunch.md
@@ -32,9 +32,9 @@ a mixture of functional and object-oriented programming styles and has powerful
capabilities, allowing us to create complex pipelines using very few keystrokes. Here is
the Scrunch analogue of the classic WordCount problem:
- import com.cloudera.crunch.io.{From => from}
- import com.cloudera.scrunch._
- import com.cloudera.scrunch.Conversions_ # For implicit type conversions
+ import org.apache.crunch.io.{From => from}
+ import org.apache.crunch.scrunch._
+ import org.apache.crunch.scrunch.Conversions_ # For implicit type conversions
class WordCountExample {
val pipeline = new Pipeline[WordCountExample]
@@ -80,7 +80,6 @@ the output of a Crunch pipeline into the client:
Scrunch is alpha-quality code, written by someone who was learning Scala on the fly. There will be bugs,
rough edges, and non-idiomatic Scala usage all over the place. This will improve with time, and we welcome
contributions from Scala experts who are interested in helping us make Scrunch into a first-class project.
-The Crunch developers mailing list is [here](https://groups.google.com/a/cloudera.org/group/crunch-dev/topics).
Scrunch emerged out of conversations with [Dmitriy Ryaboy](http://twitter.com/#!/squarecog),
[Oscar Boykin](http://twitter.com/#!/posco), and [Avi Bryant](http://twitter.com/#!/avibryant) from Twitter.