You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by cr...@apache.org on 2013/08/12 18:21:37 UTC

[07/15] initial import.

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/5ff71e51/samza-core/src/main/scala/org/apache/samza/metrics/JvmMetrics.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/metrics/JvmMetrics.scala b/samza-core/src/main/scala/org/apache/samza/metrics/JvmMetrics.scala
new file mode 100644
index 0000000..164a2ee
--- /dev/null
+++ b/samza-core/src/main/scala/org/apache/samza/metrics/JvmMetrics.scala
@@ -0,0 +1,148 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.metrics
+
+import java.lang.management.ManagementFactory
+import scala.collection._
+import scala.collection.JavaConversions._
+import java.lang.Thread.State._
+import java.util.concurrent.Executors
+import java.util.concurrent.TimeUnit
+import grizzled.slf4j.Logging
+import org.apache.samza.util.Util
+import org.apache.samza.util.DaemonThreadFactory
+
+/**
+ * Straight up ripoff of Hadoop's metrics2 JvmMetrics class.
+ */
+class JvmMetrics(group: String, registry: MetricsRegistry) extends Runnable with Logging {
+  final val M = 1024 * 1024.0f
+
+  def this(registry: MetricsRegistry) = this("samza.jvm", registry)
+
+  val memoryMXBean = ManagementFactory.getMemoryMXBean()
+  val gcBeans = ManagementFactory.getGarbageCollectorMXBeans()
+  val threadMXBean = ManagementFactory.getThreadMXBean()
+  var gcBeanCounters = Map[String, (Counter, Counter)]()
+  val executor = Executors.newScheduledThreadPool(1, new DaemonThreadFactory)
+
+  // jvm metrics
+  val gMemNonHeapUsedM = registry.newGauge[Float](group, "MemNonHeapUsedM", 0)
+  val gMemNonHeapCommittedM = registry.newGauge[Float](group, "MemNonHeapCommittedM", 0)
+  val gMemHeapUsedM = registry.newGauge[Float](group, "MemHeapUsedM", 0)
+  val gMemHeapCommittedM = registry.newGauge[Float](group, "MemHeapCommittedM", 0)
+  val gThreadsNew = registry.newGauge[Long](group, "ThreadsNew", 0)
+  val gThreadsRunnable = registry.newGauge[Long](group, "ThreadsRunnable", 0)
+  val gThreadsBlocked = registry.newGauge[Long](group, "ThreadsBlocked", 0)
+  val gThreadsWaiting = registry.newGauge[Long](group, "ThreadsWaiting", 0)
+  val gThreadsTimedWaiting = registry.newGauge[Long](group, "ThreadsTimedWaiting", 0)
+  val gThreadsTerminated = registry.newGauge[Long](group, "ThreadsTerminated", 0)
+  val cGcCount = registry.newCounter(group, "GcCount")
+  val cGcTimeMillis = registry.newCounter(group, "GcTimeMillis")
+
+  def start {
+    executor.scheduleWithFixedDelay(this, 0, 5, TimeUnit.SECONDS)
+  }
+
+  def run {
+    debug("updating jvm metrics")
+
+    updateMemoryUsage
+    updateGcUsage
+    updateThreadUsage
+
+    debug("updated metrics to: [%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s]" format
+      (gMemNonHeapUsedM, gMemNonHeapCommittedM, gMemHeapUsedM, gMemHeapCommittedM, gThreadsNew,
+        gThreadsRunnable, gThreadsBlocked, gThreadsWaiting, gThreadsTimedWaiting,
+        gThreadsTerminated, cGcCount, cGcTimeMillis))
+  }
+
+  def stop = executor.shutdown
+
+  private def updateMemoryUsage {
+    val memNonHeap = memoryMXBean.getNonHeapMemoryUsage()
+    val memHeap = memoryMXBean.getHeapMemoryUsage()
+    gMemNonHeapUsedM.set(memNonHeap.getUsed() / M)
+    gMemNonHeapCommittedM.set(memNonHeap.getCommitted() / M)
+    gMemHeapUsedM.set(memHeap.getUsed() / M)
+    gMemHeapCommittedM.set(memHeap.getCommitted() / M)
+  }
+
+  private def updateGcUsage {
+    var count = 0l
+    var timeMillis = 0l
+
+    gcBeans.foreach(gcBean => {
+      val c = gcBean.getCollectionCount()
+      val t = gcBean.getCollectionTime()
+      val gcInfo = getGcInfo(gcBean.getName)
+      gcInfo._1.inc(c - gcInfo._1.getCount())
+      gcInfo._2.inc(t - gcInfo._2.getCount())
+      count += c
+      timeMillis += t
+    })
+
+    cGcCount.inc(count - cGcCount.getCount())
+    cGcTimeMillis.inc(timeMillis - cGcTimeMillis.getCount())
+  }
+
+  private def getGcInfo(gcName: String): (Counter, Counter) = {
+    gcBeanCounters.get(gcName) match {
+      case Some(gcBeanCounterTuple) => gcBeanCounterTuple
+      case _ => {
+        val t = (registry.newCounter(group, "GcCount" + gcName), registry.newCounter(group, "GcTimeMillis" + gcName))
+        gcBeanCounters += (gcName -> t)
+        t
+      }
+    }
+  }
+
+  private def updateThreadUsage {
+    var threadsNew = 0l
+    var threadsRunnable = 0l
+    var threadsBlocked = 0l
+    var threadsWaiting = 0l
+    var threadsTimedWaiting = 0l
+    var threadsTerminated = 0l
+    var threadIds = threadMXBean.getAllThreadIds
+
+    threadMXBean.getThreadInfo(threadIds, 0).foreach(threadInfo =>
+      Option(threadInfo) match {
+        case Some(threadInfo) => {
+          threadInfo.getThreadState match {
+            case NEW => threadsNew += 1
+            case RUNNABLE => threadsRunnable += 1
+            case BLOCKED => threadsBlocked += 1
+            case WAITING => threadsWaiting += 1
+            case TIMED_WAITING => threadsTimedWaiting += 1
+            case TERMINATED => threadsTerminated += 1
+          }
+        }
+        case _ => // race protection
+      })
+
+    gThreadsNew.set(threadsNew)
+    gThreadsRunnable.set(threadsRunnable)
+    gThreadsBlocked.set(threadsBlocked)
+    gThreadsWaiting.set(threadsWaiting)
+    gThreadsTimedWaiting.set(threadsTimedWaiting)
+    gThreadsTerminated.set(threadsTerminated)
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/5ff71e51/samza-core/src/main/scala/org/apache/samza/metrics/MetricsRegistryMap.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/metrics/MetricsRegistryMap.scala b/samza-core/src/main/scala/org/apache/samza/metrics/MetricsRegistryMap.scala
new file mode 100644
index 0000000..fc0bd38
--- /dev/null
+++ b/samza-core/src/main/scala/org/apache/samza/metrics/MetricsRegistryMap.scala
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.metrics
+import grizzled.slf4j.Logging
+import java.util.concurrent.ConcurrentHashMap
+
+/**
+ * A class that holds all metrics registered with it. It can be registered
+ * with one or more MetricReporters to flush metrics.
+ */
+class MetricsRegistryMap extends ReadableMetricsRegistry with Logging {
+  var listeners = Set[ReadableMetricsRegistryListener]()
+
+  /*
+   * groupName -> metricName -> metric
+   */
+  val metrics = new ConcurrentHashMap[String, ConcurrentHashMap[String, Metric]]
+
+  def newCounter(group: String, name: String) = {
+    debug("Creating new counter %s %s." format (group, name))
+    putAndGetGroup(group).putIfAbsent(name, new Counter(name))
+    val counter = metrics.get(group).get(name).asInstanceOf[Counter]
+    listeners.foreach(_.onCounter(group, counter))
+    counter
+  }
+
+  def newGauge[T](group: String, name: String, value: T) = {
+    debug("Creating new gauge %s %s %s." format (group, name, value))
+    putAndGetGroup(group).putIfAbsent(name, new Gauge[T](name, value))
+    val gauge = metrics.get(group).get(name).asInstanceOf[Gauge[T]]
+    listeners.foreach(_.onGauge(group, gauge))
+    gauge
+  }
+
+  private def putAndGetGroup(group: String) = {
+    metrics.putIfAbsent(group, new ConcurrentHashMap[String, Metric])
+    metrics.get(group)
+  }
+
+  def getGroups = metrics.keySet()
+
+  def getGroup(group: String) = metrics.get(group)
+
+  override def toString() = metrics.toString
+
+  def listen(listener: ReadableMetricsRegistryListener) {
+    listeners += listener
+  }
+
+  def unlisten(listener: ReadableMetricsRegistryListener) {
+    listeners -= listener
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/5ff71e51/samza-core/src/main/scala/org/apache/samza/metrics/reporter/JmxReporter.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/metrics/reporter/JmxReporter.scala b/samza-core/src/main/scala/org/apache/samza/metrics/reporter/JmxReporter.scala
new file mode 100644
index 0000000..8814e68
--- /dev/null
+++ b/samza-core/src/main/scala/org/apache/samza/metrics/reporter/JmxReporter.scala
@@ -0,0 +1,145 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.metrics.reporter
+
+import java.lang.management.ManagementFactory
+import grizzled.slf4j.Logging
+import javax.management.MBeanServer
+import javax.management.ObjectName
+import org.apache.samza.config.Config
+import org.apache.samza.metrics.Counter
+import org.apache.samza.metrics.Gauge
+import org.apache.samza.metrics.MetricsReporter
+import org.apache.samza.metrics.MetricsReporterFactory
+import org.apache.samza.metrics.ReadableMetricsRegistry
+import org.apache.samza.metrics.ReadableMetricsRegistryListener
+import scala.collection.JavaConversions._
+import org.apache.samza.metrics.MetricsVisitor
+
+class JmxReporter(server: MBeanServer) extends MetricsReporter with Logging {
+  var sources = Map[ReadableMetricsRegistry, String]()
+  var listeners = Map[ReadableMetricsRegistry, ReadableMetricsRegistryListener]()
+
+  def start() {
+    for ((registry, listener) <- listeners) {
+      // First, add a listener for all new metrics that are added.
+      registry.listen(listener)
+
+      // Second, add all existing metrics.
+      registry.getGroups.foreach(group => {
+        registry.getGroup(group).foreach {
+          case (name, metric) =>
+            metric.visit(new MetricsVisitor {
+              def counter(counter: Counter) = registerBean(new JmxCounter(counter, getObjectName(group, name, sources(registry))));
+              def gauge[T](gauge: Gauge[T]) = registerBean(new JmxGauge(gauge.asInstanceOf[Gauge[Object]], getObjectName(group, name, sources(registry))))
+            })
+        }
+      })
+    }
+  }
+
+  def register(source: String, registry: ReadableMetricsRegistry) {
+    if (!listeners.contains(registry)) {
+      sources += registry -> source
+      listeners += registry -> new ReadableMetricsRegistryListener {
+        def onCounter(group: String, counter: Counter) {
+          registerBean(new JmxCounter(counter, getObjectName(group, counter.getName, source)))
+        }
+
+        def onGauge(group: String, gauge: Gauge[_]) {
+          registerBean(new JmxGauge(gauge.asInstanceOf[Gauge[Object]], getObjectName(group, gauge.getName, source)))
+        }
+      }
+    } else {
+      warn("Trying to re-register a registry for source %s. Ignoring." format source)
+    }
+  }
+
+  def stop() {
+    for ((registry, listener) <- listeners) {
+      registry.unlisten(listener)
+    }
+  }
+
+  def getObjectName(group: String, name: String, t: String) = {
+    val nameBuilder = new StringBuilder
+    nameBuilder.append(makeNameJmxSafe(group))
+    nameBuilder.append(":type=")
+    nameBuilder.append(makeNameJmxSafe(t))
+    nameBuilder.append(",name=")
+    nameBuilder.append(makeNameJmxSafe(name))
+    val objName = new ObjectName(nameBuilder.toString)
+    debug("Resolved name for %s, %s, %s to: %s" format (group, name, t, objName))
+    objName
+  }
+
+  /*
+   * JMX only has ObjectName.quote, which is pretty nasty looking. This 
+   * function escapes without quoting, using the rules outlined in: 
+   * http://docs.oracle.com/javase/1.5.0/docs/api/javax/management/ObjectName.html
+   */
+  def makeNameJmxSafe(str: String) = str
+    .replace(",", "_")
+    .replace("=", "_")
+    .replace(":", "_")
+    .replace("\"", "_")
+    .replace("*", "_")
+    .replace("?", "_")
+
+  def registerBean(bean: MetricMBean) {
+    if (!server.isRegistered(bean.objectName)) {
+      debug("Registering MBean for %s." format bean.objectName)
+      server.registerMBean(bean, bean.objectName);
+    }
+  }
+}
+
+trait MetricMBean {
+  def objectName(): ObjectName
+}
+
+abstract class AbstractBean(val on: ObjectName) extends MetricMBean {
+  override def objectName = on
+}
+
+trait JmxGaugeMBean extends MetricMBean {
+  def getValue(): Object
+}
+
+class JmxGauge(g: org.apache.samza.metrics.Gauge[Object], on: ObjectName) extends JmxGaugeMBean {
+  def getValue = g.getValue
+  def objectName = on
+}
+
+trait JmxCounterMBean extends MetricMBean {
+  def getCount(): Long
+}
+
+class JmxCounter(c: org.apache.samza.metrics.Counter, on: ObjectName) extends JmxCounterMBean {
+  def getCount() = c.getCount()
+  def objectName = on
+}
+
+class JmxReporterFactory extends MetricsReporterFactory with Logging {
+  def getMetricsReporter(name: String, containerName: String, config: Config) = {
+    info("Creating JMX reporter with  name %s." format name)
+    new JmxReporter(ManagementFactory.getPlatformMBeanServer)
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/5ff71e51/samza-core/src/main/scala/org/apache/samza/metrics/reporter/Metrics.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/metrics/reporter/Metrics.scala b/samza-core/src/main/scala/org/apache/samza/metrics/reporter/Metrics.scala
new file mode 100644
index 0000000..d7aec8b
--- /dev/null
+++ b/samza-core/src/main/scala/org/apache/samza/metrics/reporter/Metrics.scala
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.metrics.reporter
+
+import java.util.Collections
+import java.util.HashMap
+import java.util.Map
+import scala.collection.JavaConversions._
+
+object Metrics {
+  def fromMap(map: Map[String, Map[String, Object]]): Metrics = {
+    new Metrics(map)
+  }
+}
+
+/**
+ * Immutable metrics snapshot.
+ */
+class Metrics(metrics: Map[String, Map[String, Object]]) {
+  val immutableMetrics = new HashMap[String, Map[String, Object]]
+
+  for (groupEntry <- metrics.entrySet) {
+    val immutableMetricGroup = new HashMap[String, Object]
+
+    for (metricEntry <- groupEntry.getValue.asInstanceOf[Map[String, Object]].entrySet) {
+      immutableMetricGroup.put(metricEntry.getKey, metricEntry.getValue)
+    }
+
+    immutableMetrics.put(groupEntry.getKey, Collections.unmodifiableMap(immutableMetricGroup))
+  }
+
+  def get[T](group: String, metricName: String) =
+    immutableMetrics.get(group).get(metricName).asInstanceOf[T]
+
+  def get(group: String) = immutableMetrics.get(group)
+
+  def getAsMap(): Map[String, Map[String, Object]] = Collections.unmodifiableMap(immutableMetrics)
+}

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/5ff71e51/samza-core/src/main/scala/org/apache/samza/metrics/reporter/MetricsHeader.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/metrics/reporter/MetricsHeader.scala b/samza-core/src/main/scala/org/apache/samza/metrics/reporter/MetricsHeader.scala
new file mode 100644
index 0000000..369c718
--- /dev/null
+++ b/samza-core/src/main/scala/org/apache/samza/metrics/reporter/MetricsHeader.scala
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.metrics.reporter
+
+import java.util.HashMap
+import java.util.Map
+import scala.reflect.BeanProperty
+
+object MetricsHeader {
+  def fromMap(map: Map[String, Object]): MetricsHeader = {
+    new MetricsHeader(
+      map.get("job-name").toString,
+      map.get("job-id").toString,
+      map.get("container-name").toString,
+      map.get("source").toString,
+      map.get("version").toString,
+      map.get("samza-version").toString,
+      map.get("host").toString,
+      map.get("time").asInstanceOf[Number].longValue,
+      map.get("reset-time").asInstanceOf[Number].longValue)
+  }
+}
+
+/**
+ * Immutable metric header snapshot.
+ */
+class MetricsHeader(
+  @BeanProperty val jobName: String,
+  @BeanProperty val jobId: String,
+  @BeanProperty val containerName: String,
+  @BeanProperty val source: String,
+  @BeanProperty val version: String,
+  @BeanProperty val samzaVersion: String,
+  @BeanProperty val host: String,
+  @BeanProperty val time: Long,
+  @BeanProperty val resetTime: Long) {
+
+  def getAsMap: Map[String, Object] = {
+    val map = new HashMap[String, Object]
+    map.put("job-name", jobName)
+    map.put("job-id", jobId)
+    map.put("container-name", containerName)
+    map.put("source", source)
+    map.put("version", version)
+    map.put("samza-version", samzaVersion)
+    map.put("host", host)
+    map.put("time", time: java.lang.Long)
+    map.put("reset-time", resetTime: java.lang.Long)
+    map
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/5ff71e51/samza-core/src/main/scala/org/apache/samza/metrics/reporter/MetricsSnapshot.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/metrics/reporter/MetricsSnapshot.scala b/samza-core/src/main/scala/org/apache/samza/metrics/reporter/MetricsSnapshot.scala
new file mode 100644
index 0000000..da775f7
--- /dev/null
+++ b/samza-core/src/main/scala/org/apache/samza/metrics/reporter/MetricsSnapshot.scala
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.metrics.reporter
+
+import java.util.Map
+import java.util.HashMap
+import scala.reflect.BeanProperty
+
+object MetricsSnapshot {
+  def fromMap(map: Map[String, Map[String, Object]]) = {
+    val header = MetricsHeader.fromMap(map.get("header"))
+    val metrics = Metrics.fromMap(map.get("metrics").asInstanceOf[Map[String, Map[String, Object]]])
+    new MetricsSnapshot(header, metrics)
+  }
+}
+
+class MetricsSnapshot(@BeanProperty val header: MetricsHeader, @BeanProperty val metrics: Metrics) {
+  def getAsMap(): Map[String, Object] = {
+    val map = new HashMap[String, Object]
+
+    map.put("header", header.getAsMap)
+    map.put("metrics", metrics.getAsMap)
+
+    map
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/5ff71e51/samza-core/src/main/scala/org/apache/samza/metrics/reporter/MetricsSnapshotReporter.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/metrics/reporter/MetricsSnapshotReporter.scala b/samza-core/src/main/scala/org/apache/samza/metrics/reporter/MetricsSnapshotReporter.scala
new file mode 100644
index 0000000..79c647e
--- /dev/null
+++ b/samza-core/src/main/scala/org/apache/samza/metrics/reporter/MetricsSnapshotReporter.scala
@@ -0,0 +1,144 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.metrics.reporter
+
+import java.util.HashMap
+import java.util.Map
+import scala.collection.JavaConversions._
+import grizzled.slf4j.Logging
+import org.apache.samza.metrics.Counter
+import org.apache.samza.metrics.Gauge
+import org.apache.samza.metrics.MetricsReporter
+import org.apache.samza.metrics.MetricsVisitor
+import org.apache.samza.metrics.ReadableMetricsRegistry
+import java.util.concurrent.Executors
+import org.apache.samza.util.DaemonThreadFactory
+import java.util.concurrent.TimeUnit
+import org.apache.samza.serializers.Serializer
+import org.apache.samza.system.SystemProducer
+import org.apache.samza.system.SystemStream
+import org.apache.samza.system.OutgoingMessageEnvelope
+
+/**
+ * MetricsSnapshotReporter is a generic metrics reporter that sends metrics to a stream.
+ *
+ * jobName // my-samza-job
+ * jobId // an id that differentiates multiple executions of the same job
+ * taskName // container_567890
+ * host // eat1-app128.gird
+ * version // 0.0.1
+ */
+class MetricsSnapshotReporter(
+  producer: SystemProducer,
+  out: SystemStream,
+  jobName: String,
+  jobId: String,
+  containerName: String,
+  version: String,
+  samzaVersion: String,
+  host: String,
+  serializer: Serializer[MetricsSnapshot] = null,
+  clock: () => Long = () => { System.currentTimeMillis }) extends MetricsReporter with Runnable with Logging {
+
+  val executor = Executors.newScheduledThreadPool(1, new DaemonThreadFactory)
+  val resetTime = clock()
+  var registries = List[(String, ReadableMetricsRegistry)]()
+
+  info("got metrics snapshot reporter properties [job name: %s, job id: %s, containerName: %s, version: %s, samzaVersion: %s, host: %s]"
+    format (jobName, jobId, containerName, version, samzaVersion, host))
+
+  def start {
+    info("Starting producer.")
+
+    producer.start
+
+    info("Starting reporter timer.")
+
+    // TODO could make this configurable.
+    executor.scheduleWithFixedDelay(this, 0, 60, TimeUnit.SECONDS)
+  }
+
+  def register(source: String, registry: ReadableMetricsRegistry) {
+    registries ::= (source, registry)
+
+    info("Registering %s with producer." format source)
+
+    producer.register(source)
+  }
+
+  def stop = {
+    info("Stopping producer.")
+
+    producer.stop
+
+    info("Stopping reporter timer.")
+
+    executor.shutdown
+    executor.awaitTermination(60, TimeUnit.SECONDS)
+
+    if (!executor.isTerminated) {
+      warn("Unable to shutdown reporter timer.")
+    }
+  }
+
+  def run {
+    debug("Begin flushing metrics.")
+
+    for ((source, registry) <- registries) {
+      debug("Flushing metrics for %s." format source)
+
+      val metricsMsg = new HashMap[String, Map[String, Object]]
+
+      // metrics
+      registry.getGroups.foreach(group => {
+        val groupMsg = new HashMap[String, Object]
+
+        registry.getGroup(group).foreach {
+          case (name, metric) =>
+            metric.visit(new MetricsVisitor {
+              def counter(counter: Counter) = groupMsg.put(name, counter.getCount: java.lang.Long)
+              def gauge[T](gauge: Gauge[T]) = groupMsg.put(name, gauge.getValue.asInstanceOf[Object])
+            })
+        }
+
+        metricsMsg.put(group, groupMsg)
+      })
+
+      val header = new MetricsHeader(jobName, jobId, containerName, source, version, samzaVersion, host, clock(), resetTime)
+      val metrics = new Metrics(metricsMsg)
+
+      debug("Flushing metrics for %s to %s with header and map: header=%s, map=%s." format (source, out, header.getAsMap, metrics.getAsMap))
+
+      val metricsSnapshot = new MetricsSnapshot(header, metrics)
+      val maybeSerialized = if (serializer != null) {
+        serializer.toBytes(metricsSnapshot)
+      } else {
+        metricsSnapshot
+      }
+
+      producer.send(source, new OutgoingMessageEnvelope(out, host, null, maybeSerialized))
+
+      // Always commit, since we don't want metrics to get batched up.
+      producer.commit(source)
+    }
+
+    debug("Finished flushing metrics.")
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/5ff71e51/samza-core/src/main/scala/org/apache/samza/metrics/reporter/MetricsSnapshotReporterFactory.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/metrics/reporter/MetricsSnapshotReporterFactory.scala b/samza-core/src/main/scala/org/apache/samza/metrics/reporter/MetricsSnapshotReporterFactory.scala
new file mode 100644
index 0000000..f20dc36
--- /dev/null
+++ b/samza-core/src/main/scala/org/apache/samza/metrics/reporter/MetricsSnapshotReporterFactory.scala
@@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.metrics.reporter
+
+import java.net.InetAddress
+import grizzled.slf4j.Logging
+import org.apache.samza.SamzaException
+import org.apache.samza.config.Config
+import org.apache.samza.config.JobConfig.Config2Job
+import org.apache.samza.config.MetricsConfig.Config2Metrics
+import org.apache.samza.config.SystemConfig.Config2System
+import org.apache.samza.config.StreamConfig.Config2Stream
+import org.apache.samza.config.SerializerConfig.Config2Serializer
+import org.apache.samza.config.TaskConfig.Config2Task
+import org.apache.samza.metrics.MetricsReporter
+import org.apache.samza.metrics.MetricsReporterFactory
+import org.apache.samza.util.Util
+import org.apache.samza.metrics.ReadableMetricsRegistry
+import org.apache.samza.metrics.MetricsRegistryMap
+import org.apache.samza.serializers.Serializer
+import org.apache.samza.serializers.SerdeFactory
+import org.apache.samza.system.SystemFactory
+
+class MetricsSnapshotReporterFactory extends MetricsReporterFactory with Logging {
+  def getMetricsReporter(name: String, containerName: String, config: Config): MetricsReporter = {
+    info("Creating new metrics snapshot reporter.")
+
+    val jobName = config
+      .getName
+      .getOrElse(throw new SamzaException("Job name must be defined in config."))
+
+    val jobId = config
+      .getJobId
+      .getOrElse(1.toString)
+
+    val taskClass = config
+      .getTaskClass
+      .getOrElse(throw new SamzaException("No task class defined for config."))
+
+    val version = Option(Class.forName(taskClass).getPackage.getImplementationVersion)
+      .getOrElse({
+        warn("Unable to find implementation version in jar's meta info. Defaulting to 0.0.1.")
+        "0.0.1"
+      })
+
+    val samzaVersion = Option(classOf[MetricsSnapshotReporterFactory].getPackage.getImplementationVersion)
+      .getOrElse({
+        warn("Unable to find implementation samza version in jar's meta info. Defaulting to 0.0.1.")
+        "0.0.1"
+      })
+
+    val metricsSystemStreamName = config
+      .getMetricsReporterStream(name)
+      .getOrElse(throw new SamzaException("No metrics stream defined in config."))
+
+    val systemStream = Util.getSystemStreamFromNames(metricsSystemStreamName)
+
+    info("Got system stream %s." format systemStream)
+
+    val systemName = systemStream.getSystem
+
+    val systemFactoryClassName = config
+      .getSystemFactory(systemName)
+      .getOrElse(throw new SamzaException("Trying to fetch system factory for system %s, which isn't defined in config." format systemName))
+
+    val systemFactory = Util.getObj[SystemFactory](systemFactoryClassName)
+
+    info("Got system factory %s." format systemFactory)
+
+    val registry = new MetricsRegistryMap
+
+    val producer = systemFactory.getProducer(systemName, config, registry)
+
+    info("Got producer %s." format producer)
+
+    val streamSerdeName = config.getStreamMsgSerde(systemStream)
+    val systemSerdeName = config.getSystemMsgSerde(systemName)
+    val serdeName = streamSerdeName.getOrElse(systemSerdeName.getOrElse(null))
+    val serde = if (serdeName != null) {
+      config.getSerdeClass(serdeName) match {
+        case Some(serdeName) =>
+          Util
+            .getObj[SerdeFactory[MetricsSnapshot]](serdeName)
+            .getSerde(serdeName, config)
+        case _ => null
+      }
+    } else {
+      null
+    }
+
+    info("Got serde %s." format serde)
+
+    val reporter = new MetricsSnapshotReporter(
+      producer,
+      systemStream,
+      jobName,
+      jobId,
+      containerName,
+      version,
+      samzaVersion,
+      InetAddress.getLocalHost().getHostName(),
+      serde)
+
+    reporter.register(this.getClass.getSimpleName.toString, registry)
+
+    reporter
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/5ff71e51/samza-core/src/main/scala/org/apache/samza/serializers/CheckpointSerde.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/serializers/CheckpointSerde.scala b/samza-core/src/main/scala/org/apache/samza/serializers/CheckpointSerde.scala
new file mode 100644
index 0000000..574c584
--- /dev/null
+++ b/samza-core/src/main/scala/org/apache/samza/serializers/CheckpointSerde.scala
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.serializers
+
+import scala.collection.JavaConversions._
+import org.codehaus.jackson.map.ObjectMapper
+import org.apache.samza.system.SystemStream
+import org.apache.samza.checkpoint.Checkpoint
+
+class CheckpointSerde extends Serde[Checkpoint] {
+  val jsonMapper = new ObjectMapper()
+
+  def fromBytes(bytes: Array[Byte]): Checkpoint = {
+    try {
+      val checkpointMap = jsonMapper
+        .readValue(bytes, classOf[java.util.Map[String, java.util.Map[String, String]]])
+        .flatMap {
+          case (systemName, streamToOffsetMap) =>
+            streamToOffsetMap.map { case (streamName, offset) => (new SystemStream(systemName, streamName), offset) }
+        }
+      return new Checkpoint(checkpointMap)
+    } catch {
+      case _ => return null
+    }
+  }
+
+  def toBytes(checkpoint: Checkpoint) = {
+    val offsetMap = asJavaMap(checkpoint
+      .getOffsets
+      // Convert Map[SystemStream, String] offset map to a iterable of tuples (system, stream, offset)
+      .map { case (systemStream, offset) => (systemStream.getSystem, systemStream.getStream, offset) }
+      // Group into a Map[String, (String, String, String)] by system
+      .groupBy(_._1)
+      // Group the tuples for each system into a Map[String, String] for stream to offsets
+      .map {
+        case (systemName, tuples) =>
+          val streamToOffestMap = asJavaMap(tuples
+            // Group the tuples by stream name
+            .groupBy(_._2)
+            // There should only ever be one SystemStream to offset mapping, so just 
+            // grab the first element from the tuple list for each stream.
+            .map { case (streamName, tuples) => (streamName, tuples.head._3) }
+            .toMap)
+          (systemName, streamToOffestMap)
+      }.toMap)
+
+    jsonMapper.writeValueAsBytes(offsetMap)
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/5ff71e51/samza-core/src/main/scala/org/apache/samza/serializers/SerdeManager.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/serializers/SerdeManager.scala b/samza-core/src/main/scala/org/apache/samza/serializers/SerdeManager.scala
new file mode 100644
index 0000000..4f3ff6e
--- /dev/null
+++ b/samza-core/src/main/scala/org/apache/samza/serializers/SerdeManager.scala
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.serializers
+
+import org.apache.samza.SamzaException
+import org.apache.samza.config.SerializerConfig
+import org.apache.samza.system.SystemStream
+import org.apache.samza.system.OutgoingMessageEnvelope
+import org.apache.samza.system.IncomingMessageEnvelope
+
+class SerdeManager(
+  serdes: Map[String, Serde[Object]] = Map(),
+  systemKeySerdes: Map[String, Serde[Object]] = Map(),
+  systemMessageSerdes: Map[String, Serde[Object]] = Map(),
+  systemStreamKeySerdes: Map[SystemStream, Serde[Object]] = Map(),
+  systemStreamMessageSerdes: Map[SystemStream, Serde[Object]] = Map(),
+  changeLogSystemStreams: Set[SystemStream] = Set()) {
+
+  def toBytes(obj: Object, serializerName: String) = serdes
+    .getOrElse(serializerName, throw new SamzaException("No serde defined for %s" format serializerName))
+    .toBytes(obj)
+
+  def toBytes(envelope: OutgoingMessageEnvelope): OutgoingMessageEnvelope = {
+    val key = if (changeLogSystemStreams.contains(envelope.getSystemStream)) {
+      // If the stream is a change log stream, don't do any serde. It is up to storage engines to handle serde.
+      envelope.getKey
+    } else if (envelope.getKeySerializerName != null) {
+      // If a serde is defined, use it.
+      toBytes(envelope.getKey, envelope.getKeySerializerName)
+    } else if (systemStreamKeySerdes.contains(envelope.getSystemStream)) {
+      // If the stream has a serde defined, use it.
+      systemStreamKeySerdes(envelope.getSystemStream).toBytes(envelope.getKey)
+    } else if (systemKeySerdes.contains(envelope.getSystemStream.getSystem)) {
+      // If the system has a serde defined, use it.
+      systemKeySerdes(envelope.getSystemStream.getSystem).toBytes(envelope.getKey)
+    } else {
+      // Just use the object.
+      envelope.getKey
+    }
+
+    val message = if (changeLogSystemStreams.contains(envelope.getSystemStream)) {
+      // If the stream is a change log stream, don't do any serde. It is up to storage engines to handle serde.
+      envelope.getMessage
+    } else if (envelope.getMessageSerializerName != null) {
+      // If a serde is defined, use it.
+      toBytes(envelope.getMessage, envelope.getMessageSerializerName)
+    } else if (systemStreamMessageSerdes.contains(envelope.getSystemStream)) {
+      // If the stream has a serde defined, use it.
+      systemStreamMessageSerdes(envelope.getSystemStream).toBytes(envelope.getMessage)
+    } else if (systemMessageSerdes.contains(envelope.getSystemStream.getSystem)) {
+      // If the system has a serde defined, use it.
+      systemMessageSerdes(envelope.getSystemStream.getSystem).toBytes(envelope.getMessage)
+    } else {
+      // Just use the object.
+      envelope.getMessage
+    }
+
+    new OutgoingMessageEnvelope(
+      envelope.getSystemStream,
+      null,
+      null,
+      envelope.getPartitionKey,
+      key,
+      message)
+  }
+
+  def fromBytes(bytes: Array[Byte], deserializerName: String) = serdes
+    .getOrElse(deserializerName, throw new SamzaException("No serde defined for %s" format deserializerName))
+    .fromBytes(bytes)
+
+  def fromBytes(envelope: IncomingMessageEnvelope) = {
+    val key = if (changeLogSystemStreams.contains(envelope.getSystemStreamPartition.getSystemStream)) {
+      // If the stream is a change log stream, don't do any serde. It is up to storage engines to handle serde.
+      envelope.getKey
+    } else if (systemStreamKeySerdes.contains(envelope.getSystemStreamPartition)) {
+      // If the stream has a serde defined, use it.
+      systemStreamKeySerdes(envelope.getSystemStreamPartition).fromBytes(envelope.getKey.asInstanceOf[Array[Byte]])
+    } else if (systemKeySerdes.contains(envelope.getSystemStreamPartition.getSystem)) {
+      // If the system has a serde defined, use it.
+      systemKeySerdes(envelope.getSystemStreamPartition.getSystem).fromBytes(envelope.getKey.asInstanceOf[Array[Byte]])
+    } else {
+      // Just use the object.
+      envelope.getKey
+    }
+
+    val message = if (changeLogSystemStreams.contains(envelope.getSystemStreamPartition.getSystemStream)) {
+      // If the stream is a change log stream, don't do any serde. It is up to storage engines to handle serde.
+      envelope.getMessage
+    } else if (systemStreamMessageSerdes.contains(envelope.getSystemStreamPartition)) {
+      // If the stream has a serde defined, use it.
+      systemStreamMessageSerdes(envelope.getSystemStreamPartition).fromBytes(envelope.getMessage.asInstanceOf[Array[Byte]])
+    } else if (systemMessageSerdes.contains(envelope.getSystemStreamPartition.getSystem)) {
+      // If the system has a serde defined, use it.
+      systemMessageSerdes(envelope.getSystemStreamPartition.getSystem).fromBytes(envelope.getMessage.asInstanceOf[Array[Byte]])
+    } else {
+      // Just use the object.
+      envelope.getMessage
+    }
+
+    new IncomingMessageEnvelope(
+      envelope.getSystemStreamPartition,
+      envelope.getOffset,
+      key,
+      message)
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/5ff71e51/samza-core/src/main/scala/org/apache/samza/serializers/StringSerde.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/serializers/StringSerde.scala b/samza-core/src/main/scala/org/apache/samza/serializers/StringSerde.scala
new file mode 100644
index 0000000..45cde95
--- /dev/null
+++ b/samza-core/src/main/scala/org/apache/samza/serializers/StringSerde.scala
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.serializers
+
+import java.nio.ByteBuffer
+import org.apache.samza.config.Config
+
+/**
+ * A serializer for strings
+ */
+class StringSerdeFactory extends SerdeFactory[String] {
+  def getSerde(name: String, config: Config): Serde[String] =
+    new StringSerde(config.get("encoding", "UTF-8"))
+}
+
+class StringSerde(val encoding: String) extends Serde[String] {
+  def toBytes(obj: String): Array[Byte] =
+    obj.toString.getBytes(encoding)
+
+  def fromBytes(bytes: Array[Byte]): String =
+    new String(bytes, 0, bytes.size, encoding)
+}

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/5ff71e51/samza-core/src/main/scala/org/apache/samza/storage/TaskStorageManager.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/storage/TaskStorageManager.scala b/samza-core/src/main/scala/org/apache/samza/storage/TaskStorageManager.scala
new file mode 100644
index 0000000..f4c0194
--- /dev/null
+++ b/samza-core/src/main/scala/org/apache/samza/storage/TaskStorageManager.scala
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.storage
+
+import java.io.File
+import scala.collection.Map
+import grizzled.slf4j.Logging
+import org.apache.samza.Partition
+import org.apache.samza.system.SystemConsumer
+import org.apache.samza.system.SystemStream
+import org.apache.samza.system.SystemStreamPartition
+import org.apache.samza.system.SystemStreamPartitionIterator
+import org.apache.samza.task.MessageCollector
+import org.apache.samza.util.Util
+
+object TaskStorageManager {
+  def getStoreDir(storeBaseDir: File, storeName: String) = {
+    new File(storeBaseDir, storeName)
+  }
+
+  def getStorePartitionDir(storeBaseDir: File, storeName: String, partition: Partition) = {
+    new File(storeBaseDir, storeName + File.separator + partition.getPartitionId)
+  }
+}
+
+/**
+ * Manage all the storage engines for a given task
+ */
+class TaskStorageManager(
+  partition: Partition,
+  taskStores: Map[String, StorageEngine] = Map(),
+  storeConsumers: Map[String, SystemConsumer] = Map(),
+  changeLogSystemStreams: Map[String, SystemStream] = Map(),
+  storeBaseDir: File = new File(System.getProperty("user.dir"), "state")) extends Logging {
+
+  def apply(storageEngineName: String) = taskStores(storageEngineName)
+
+  def init(collector: MessageCollector) {
+    cleanBaseDirs
+    startConsumers
+    restoreStores(collector)
+    stopConsumers
+  }
+
+  private def cleanBaseDirs {
+    debug("Cleaning base directories for stores.")
+
+    taskStores.keys.foreach(storeName => {
+      val storagePartitionDir = TaskStorageManager.getStorePartitionDir(storeBaseDir, storeName, partition)
+
+      debug("Cleaning %s for store %s." format (storagePartitionDir, storeName))
+
+      Util.rm(storagePartitionDir)
+      storagePartitionDir.mkdirs
+    })
+  }
+
+  private def startConsumers {
+    debug("Starting consumers for stores.")
+
+    for ((storeName, systemStream) <- changeLogSystemStreams) {
+      val systemStreamPartition = new SystemStreamPartition(systemStream, partition)
+      val consumer = storeConsumers(storeName)
+
+      debug("Registering consumer for system stream partition %s." format systemStreamPartition)
+
+      consumer.register(systemStreamPartition, null)
+    }
+
+    storeConsumers.values.foreach(_.start)
+  }
+
+  private def restoreStores(collector: MessageCollector) {
+    debug("Restoring stores.")
+
+    for ((storeName, store) <- taskStores) {
+      if (changeLogSystemStreams.contains(storeName)) {
+        val systemStream = changeLogSystemStreams(storeName)
+        val systemStreamPartition = new SystemStreamPartition(systemStream, partition)
+        val systemConsumer = storeConsumers(storeName)
+        val systemConsumerIterator = new SystemStreamPartitionIterator(systemConsumer, systemStreamPartition);
+        store.restore(systemConsumerIterator)
+      }
+    }
+  }
+
+  private def stopConsumers {
+    debug("Stopping consumers for stores.")
+
+    storeConsumers.values.foreach(_.stop)
+  }
+
+  def flush() {
+    debug("Flushing stores.")
+
+    taskStores.values.foreach(_.flush)
+  }
+
+  def stop() {
+    debug("Stopping stores.")
+
+    taskStores.values.foreach(_.stop)
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/5ff71e51/samza-core/src/main/scala/org/apache/samza/system/DefaultPicker.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/system/DefaultPicker.scala b/samza-core/src/main/scala/org/apache/samza/system/DefaultPicker.scala
new file mode 100644
index 0000000..8bad75c
--- /dev/null
+++ b/samza-core/src/main/scala/org/apache/samza/system/DefaultPicker.scala
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.system
+
+import java.util.ArrayDeque
+
+class DefaultPicker extends IncomingMessageEnvelopePicker {
+  var q = new ArrayDeque[IncomingMessageEnvelope]()
+  def update(envelope: IncomingMessageEnvelope) = q.add(envelope)
+  def pick = q.poll
+}

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/5ff71e51/samza-core/src/main/scala/org/apache/samza/system/SystemConsumers.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/system/SystemConsumers.scala b/samza-core/src/main/scala/org/apache/samza/system/SystemConsumers.scala
new file mode 100644
index 0000000..2e6f3b8
--- /dev/null
+++ b/samza-core/src/main/scala/org/apache/samza/system/SystemConsumers.scala
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.system
+
+import scala.collection.JavaConversions._
+import scala.collection.mutable.Queue
+import grizzled.slf4j.Logging
+import org.apache.samza.serializers.SerdeManager
+
+class SystemConsumers(
+  picker: IncomingMessageEnvelopePicker,
+  consumers: Map[String, SystemConsumer],
+  serdeManager: SerdeManager,
+  maxMsgsPerStreamPartition: Int = 1000,
+  noNewMessagesTimeout: Long = 10) extends Logging {
+
+  // TODO add metrics
+
+  var unprocessedMessages = Map[SystemStreamPartition, Queue[IncomingMessageEnvelope]]()
+  var neededByPicker = Set[SystemStreamPartition]()
+  var fetchMap = Map[SystemStreamPartition, java.lang.Integer]()
+  var timeout = noNewMessagesTimeout
+
+  debug("Got stream consumers: %s" format consumers)
+  debug("Got max messages per stream: %s" format maxMsgsPerStreamPartition)
+  debug("Got no new message timeout: %s" format noNewMessagesTimeout)
+
+  def start {
+    debug("Starting consumers.")
+
+    consumers.values.foreach(_.start)
+  }
+
+  def stop {
+    debug("Stopping consumers.")
+
+    consumers.values.foreach(_.stop)
+  }
+
+  def register(systemStreamPartition: SystemStreamPartition, lastReadOffset: String) {
+    debug("Registering stream: %s, %s" format (systemStreamPartition, lastReadOffset))
+
+    neededByPicker += systemStreamPartition
+    fetchMap += systemStreamPartition -> maxMsgsPerStreamPartition
+    unprocessedMessages += systemStreamPartition -> Queue[IncomingMessageEnvelope]()
+    consumers(systemStreamPartition.getSystem).register(systemStreamPartition, lastReadOffset)
+  }
+
+  def pick = {
+    val picked = picker.pick
+
+    if (picked == null) {
+      debug("Picker returned null.")
+
+      // Allow blocking if the picker didn't pick a message.
+      timeout = noNewMessagesTimeout
+    } else {
+      debug("Picker returned an incoming message envelope: %s" format picked)
+
+      // Don't block if we have a message to process.
+      timeout = 0
+
+      // Ok to give the picker a new message from this stream.
+      neededByPicker += picked.getSystemStreamPartition
+    }
+
+    refresh
+    picked
+  }
+
+  private def refresh {
+    debug("Refreshing picker with new messages.")
+
+    // Poll every system for new messages.
+    consumers.keys.foreach(poll(_))
+
+    // Update the picker.
+    neededByPicker.foreach(systemStreamPartition =>
+      // If we have messages for a stream that the picker needs, then update.
+      if (fetchMap(systemStreamPartition).intValue < maxMsgsPerStreamPartition) {
+        picker.update(unprocessedMessages(systemStreamPartition).dequeue)
+        fetchMap += systemStreamPartition -> (fetchMap(systemStreamPartition).intValue + 1)
+        neededByPicker -= systemStreamPartition
+      })
+  }
+
+  private def poll(systemName: String) = {
+    debug("Polling system consumer: %s" format systemName)
+
+    val consumer = consumers(systemName)
+
+    debug("Filtering for system: %s, %s" format (systemName, fetchMap))
+
+    val systemFetchMap = fetchMap.filterKeys(_.getSystem.equals(systemName))
+
+    debug("Fetching: %s" format systemFetchMap)
+
+    val incomingEnvelopes = consumer.poll(systemFetchMap, timeout)
+
+    debug("Got incoming message envelopes: %s" format incomingEnvelopes)
+
+    // We have new un-processed envelopes, so update maps accordingly.
+    incomingEnvelopes.foreach(envelope => {
+      val systemStreamPartition = envelope.getSystemStreamPartition
+
+      debug("Got message for: %s, %s" format (systemStreamPartition, envelope))
+
+      fetchMap += systemStreamPartition -> (fetchMap(systemStreamPartition).intValue - 1)
+
+      debug("Updated fetch map for: %s, %s" format (systemStreamPartition, fetchMap))
+
+      unprocessedMessages(envelope.getSystemStreamPartition).enqueue(serdeManager.fromBytes(envelope))
+
+      debug("Updated unprocessed messages for: %s, %s" format (systemStreamPartition, unprocessedMessages))
+    })
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/5ff71e51/samza-core/src/main/scala/org/apache/samza/system/SystemProducers.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/system/SystemProducers.scala b/samza-core/src/main/scala/org/apache/samza/system/SystemProducers.scala
new file mode 100644
index 0000000..738d04f
--- /dev/null
+++ b/samza-core/src/main/scala/org/apache/samza/system/SystemProducers.scala
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.system
+
+import org.apache.samza.serializers.SerdeManager
+
+class SystemProducers(
+  producers: Map[String, SystemProducer],
+  serdeManager: SerdeManager) {
+
+  // TODO add metrics and logging
+
+  def start {
+    producers.values.foreach(_.start)
+  }
+
+  def stop {
+    producers.values.foreach(_.stop)
+  }
+
+  def register(source: String) {
+    producers.values.foreach(_.register(source))
+  }
+
+  def commit(source: String) {
+    producers.values.foreach(_.commit(source))
+  }
+
+  def send(source: String, envelope: OutgoingMessageEnvelope) {
+    producers(envelope.getSystemStream.getSystem).send(source, serdeManager.toBytes(envelope))
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/5ff71e51/samza-core/src/main/scala/org/apache/samza/task/ReadableCollector.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/task/ReadableCollector.scala b/samza-core/src/main/scala/org/apache/samza/task/ReadableCollector.scala
new file mode 100644
index 0000000..0ec79da
--- /dev/null
+++ b/samza-core/src/main/scala/org/apache/samza/task/ReadableCollector.scala
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.task
+
+import scala.collection.mutable
+
+import org.apache.samza.system.OutgoingMessageEnvelope
+
+/** An in-memory implementation of MessageCollector that stores all outgoing messages in a list */
+class ReadableCollector extends MessageCollector {
+  var envelopes = new mutable.ArrayBuffer[OutgoingMessageEnvelope]()
+
+  def send(envelope: OutgoingMessageEnvelope) {
+    envelopes += envelope
+  }
+
+  def getEnvelopes = envelopes
+
+  def reset() = envelopes.clear
+}

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/5ff71e51/samza-core/src/main/scala/org/apache/samza/task/ReadableCoordinator.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/task/ReadableCoordinator.scala b/samza-core/src/main/scala/org/apache/samza/task/ReadableCoordinator.scala
new file mode 100644
index 0000000..aaf631e
--- /dev/null
+++ b/samza-core/src/main/scala/org/apache/samza/task/ReadableCoordinator.scala
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.task
+
+/** An in-memory implementation of TaskCoordinator that stores all coordination messages */
+class ReadableCoordinator extends TaskCoordinator {
+  var commitRequested = false
+  var shutdownRequested = false
+
+  def commit { commitRequested = true }
+
+  def isCommitRequested = commitRequested
+
+  def shutdown { shutdownRequested = true }
+
+  def isShutdownRequested = shutdownRequested
+}

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/5ff71e51/samza-core/src/main/scala/org/apache/samza/util/DaemonThreadFactory.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/util/DaemonThreadFactory.scala b/samza-core/src/main/scala/org/apache/samza/util/DaemonThreadFactory.scala
new file mode 100644
index 0000000..04e67a2
--- /dev/null
+++ b/samza-core/src/main/scala/org/apache/samza/util/DaemonThreadFactory.scala
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.util
+
+import java.util.concurrent.ThreadFactory
+
+class DaemonThreadFactory extends ThreadFactory {
+  def newThread(r: Runnable) = {
+    val thread = new Thread(r)
+    thread.setDaemon(true)
+    thread
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/5ff71e51/samza-core/src/main/scala/org/apache/samza/util/Util.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/util/Util.scala b/samza-core/src/main/scala/org/apache/samza/util/Util.scala
new file mode 100644
index 0000000..8386324
--- /dev/null
+++ b/samza-core/src/main/scala/org/apache/samza/util/Util.scala
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.util
+
+import java.io.File
+import java.net.InetAddress
+import java.net.UnknownHostException
+import java.util.Random
+import grizzled.slf4j.Logging
+import org.apache.samza.SamzaException
+import org.apache.samza.config.Config
+import org.apache.samza.config.SystemConfig.Config2System
+import org.apache.samza.config.TaskConfig.Config2Task
+import scala.collection.JavaConversions._
+import java.util.concurrent.ThreadFactory
+import org.apache.samza.system.SystemFactory
+import org.apache.samza.system.SystemStream
+
+object Util extends Logging {
+  val random = new Random
+
+  /**
+   * Make an environment variable string safe to pass.
+   */
+  def envVarEscape(str: String) = str.replace("\"", "\\\"").replace("'", "\\'")
+
+  /**
+   * Get a random number >= startInclusive, and < endExclusive.
+   */
+  def randomBetween(startInclusive: Int, endExclusive: Int) =
+    startInclusive + random.nextInt(endExclusive - startInclusive)
+
+  /**
+   * Recursively remove a directory (or file), and all sub-directories. Equivalent
+   * to rm -rf.
+   */
+  def rm(file: File) {
+    if (file == null) {
+      return
+    } else if (file.isDirectory) {
+      val files = file.listFiles()
+      if (files != null) {
+        for (f <- files)
+          rm(f)
+      }
+      file.delete()
+    } else {
+      file.delete()
+    }
+  }
+
+  /**
+   * Instantiate a class instance from a given className.
+   */
+  def getObj[T](className: String) = {
+    Class
+      .forName(className)
+      .newInstance
+      .asInstanceOf[T]
+  }
+
+  /**
+   * Uses config to create SystemAdmin classes for all input stream systems to
+   * get each input stream's partition count, then returns the maximum count.
+   * An input stream with two partitions, and a second input stream with four
+   * partitions would result in this method returning 4.
+   */
+  def getMaxInputStreamPartitions(config: Config) = {
+    val inputStreams = config.getInputStreams
+    val systemNames = config.getSystemNames
+    val systemAdmins = systemNames.map(systemName => {
+      val systemFactoryClassName = config
+        .getSystemFactory(systemName)
+        .getOrElse(throw new SamzaException("A stream uses system %s, which is missing from the configuration." format systemName))
+      val systemFactory = Util.getObj[SystemFactory](systemFactoryClassName)
+      val systemAdmin = systemFactory.getAdmin(systemName, config)
+      (systemName, systemAdmin)
+    }).toMap
+    inputStreams.flatMap(systemStream => {
+      systemAdmins.get(systemStream.getSystem) match {
+        case Some(sysAdmin) => sysAdmin.getPartitions(systemStream.getStream)
+        case None => throw new IllegalArgumentException("Could not find a stream admin for system '" + systemStream.getSystem + "'")
+      }
+    }).toSet
+  }
+
+  /**
+   * Returns a SystemStream object based on the system stream name given. For 
+   * example, kafka.topic would return new SystemStream("kafka", "topic").
+   */
+  def getSystemStreamFromNames(systemStreamNames: String): SystemStream = {
+    val idx = systemStreamNames.indexOf('.')
+    if(idx < 0)
+      throw new IllegalArgumentException("No '.' in stream name '" + systemStreamNames + "'. Stream names should be in the form 'system.stream'")
+    new SystemStream(systemStreamNames.substring(0, idx), systemStreamNames.substring(idx + 1, systemStreamNames.length))
+  }
+  
+  /**
+   * Returns a SystemStream object based on the system stream name given. For 
+   * example, kafka.topic would return new SystemStream("kafka", "topic").
+   */
+  def getNameFromSystemStream(systemStream: SystemStream) = {
+    systemStream.getSystem + "." + systemStream.getStream
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/5ff71e51/samza-core/src/test/resources/test.properties
----------------------------------------------------------------------
diff --git a/samza-core/src/test/resources/test.properties b/samza-core/src/test/resources/test.properties
new file mode 100644
index 0000000..9348c7d
--- /dev/null
+++ b/samza-core/src/test/resources/test.properties
@@ -0,0 +1,24 @@
+#
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+#
+
+job.factory.class=org.apache.samza.job.MockJobFactory
+job.name=test-job
+foo=bar

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/5ff71e51/samza-core/src/test/scala/org/apache/samza/checkpoint/file/TestFileSystemCheckpointManager.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/test/scala/org/apache/samza/checkpoint/file/TestFileSystemCheckpointManager.scala b/samza-core/src/test/scala/org/apache/samza/checkpoint/file/TestFileSystemCheckpointManager.scala
new file mode 100644
index 0000000..50d9a05
--- /dev/null
+++ b/samza-core/src/test/scala/org/apache/samza/checkpoint/file/TestFileSystemCheckpointManager.scala
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.checkpoint.file
+
+import java.io.File
+import scala.collection.JavaConversions._
+import java.util.Random
+import org.junit.Assert._
+import org.junit.Test
+import org.apache.samza.SamzaException
+import org.apache.samza.Partition
+import org.apache.samza.checkpoint.Checkpoint
+import org.apache.samza.system.SystemStream
+
+class TestFileSystemCheckpointManager {
+  val checkpointRoot = System.getProperty("java.io.tmpdir")
+
+  @Test
+  def testReadForCheckpointFileThatDoesNotExistShouldReturnNull {
+    val cpm = new FileSystemCheckpointManager("some-job-name", new File(checkpointRoot))
+    assert(cpm.readLastCheckpoint(new Partition(1)) == null)
+  }
+
+  @Test
+  def testReadForCheckpointFileThatDoesExistShouldReturnProperCheckpoint {
+    val cpm = new FileSystemCheckpointManager("some-job-name", new File(checkpointRoot))
+    val partition = new Partition(2)
+    val cp = new Checkpoint(Map(
+      new SystemStream("a", "b") -> "c",
+      new SystemStream("a", "c") -> "d",
+      new SystemStream("b", "d") -> "e"))
+    cpm.start
+    cpm.writeCheckpoint(partition, cp)
+    val readCp = cpm.readLastCheckpoint(partition)
+    cpm.stop
+    assert(readCp.equals(cp))
+  }
+
+  @Test
+  def testMissingRootDirectoryShouldFailOnManagerCreation {
+    val cpm = new FileSystemCheckpointManager("some-job-name", new File(checkpointRoot + new Random().nextInt))
+    try {
+      cpm.start
+      fail("Expected an exception since root directory for fs checkpoint manager doesn't exist.")
+    } catch {
+      case e: SamzaException => None // this is expected
+    }
+    cpm.stop
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/5ff71e51/samza-core/src/test/scala/org/apache/samza/config/factories/TestPropertiesConfigFactory.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/test/scala/org/apache/samza/config/factories/TestPropertiesConfigFactory.scala b/samza-core/src/test/scala/org/apache/samza/config/factories/TestPropertiesConfigFactory.scala
new file mode 100644
index 0000000..f254741
--- /dev/null
+++ b/samza-core/src/test/scala/org/apache/samza/config/factories/TestPropertiesConfigFactory.scala
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.config.factories
+import java.net.URI
+import java.io.File
+import org.apache.samza.SamzaException
+import org.junit.Assert._
+import org.junit.Test
+
+class TestPropertiesConfigFactory {
+  val factory = new PropertiesConfigFactory()
+
+  @Test
+  def testCanReadPropertiesConfigFiles {
+    val config = factory.getConfig(URI.create("file://%s/src/test/resources/test.properties" format new File(".").getCanonicalPath))
+    assert("bar".equals(config.get("foo")))
+  }
+
+  @Test
+  def testCanNotReadNonLocalPropertiesConfigFiles {
+    try {
+      factory.getConfig(URI.create("hdfs://foo"))
+      fail("should have gotten a samza exception")
+    } catch {
+      case e: SamzaException => None // Do nothing
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/5ff71e51/samza-core/src/test/scala/org/apache/samza/job/TestJobRunner.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/test/scala/org/apache/samza/job/TestJobRunner.scala b/samza-core/src/test/scala/org/apache/samza/job/TestJobRunner.scala
new file mode 100644
index 0000000..21d8a78
--- /dev/null
+++ b/samza-core/src/test/scala/org/apache/samza/job/TestJobRunner.scala
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.job
+import java.io.File
+import org.apache.samza.config.Config
+import org.junit.Assert._
+import org.junit.Test
+
+object TestJobRunner {
+  var processCount = 0
+}
+
+class TestJobRunner {
+  @Test
+  def testJobRunnerWorks {
+    JobRunner.main(Array(
+      "--config-factory",
+      "org.apache.samza.config.factories.PropertiesConfigFactory",
+      "--config-path",
+      "file://%s/src/test/resources/test.properties" format new File(".").getCanonicalPath))
+    assert(TestJobRunner.processCount == 1)
+  }
+}
+
+class MockJobFactory extends StreamJobFactory {
+  def getJob(config: Config): StreamJob = {
+    return new StreamJob {
+      def submit() = { TestJobRunner.processCount += 1; this }
+      def kill() = this
+      def waitForFinish(timeoutMs: Long) = ApplicationStatus.SuccessfulFinish
+      def waitForStatus(status: ApplicationStatus, timeoutMs: Long) = status
+      def getStatus() = ApplicationStatus.SuccessfulFinish
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/5ff71e51/samza-core/src/test/scala/org/apache/samza/job/local/TestProcessJob.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/test/scala/org/apache/samza/job/local/TestProcessJob.scala b/samza-core/src/test/scala/org/apache/samza/job/local/TestProcessJob.scala
new file mode 100644
index 0000000..d56024d
--- /dev/null
+++ b/samza-core/src/test/scala/org/apache/samza/job/local/TestProcessJob.scala
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.job.local;
+import org.junit.Assert._
+import org.junit.Test
+import org.apache.samza.job.ApplicationStatus
+
+class TestProcessJob {
+  @Test
+  def testProcessJobShouldFinishOnItsOwn {
+    val builder = new ProcessBuilder("sleep", "1")
+    val job = new ProcessJob(builder)
+    job.submit
+    job.waitForFinish(999999)
+  }
+
+  @Test
+  def testProcessJobKillShouldWork {
+    val builder = new ProcessBuilder("sleep", "999999999")
+    val job = new ProcessJob(builder)
+    job.submit
+    job.waitForFinish(500)
+    job.kill
+    job.waitForFinish(999999)
+    assert(ApplicationStatus.UnsuccessfulFinish.equals(job.waitForFinish(999999999)))
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/5ff71e51/samza-core/src/test/scala/org/apache/samza/job/local/TestThreadJob.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/test/scala/org/apache/samza/job/local/TestThreadJob.scala b/samza-core/src/test/scala/org/apache/samza/job/local/TestThreadJob.scala
new file mode 100644
index 0000000..7d45889
--- /dev/null
+++ b/samza-core/src/test/scala/org/apache/samza/job/local/TestThreadJob.scala
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.job.local
+import org.junit.Assert._
+import org.junit.Test
+import org.apache.samza.job.ApplicationStatus
+
+class TestThreadJob {
+  @Test
+  def testThreadJobShouldFinishOnItsOwn {
+    val job = new ThreadJob(new Runnable {
+      override def run {
+      }
+    })
+    job.submit
+    job.waitForFinish(999999)
+  }
+
+  @Test
+  def testThreadJobKillShouldWork {
+    val job = new ThreadJob(new Runnable {
+      override def run {
+        Thread.sleep(999999)
+      }
+    })
+    job.submit
+    job.waitForFinish(500)
+    job.kill
+    job.waitForFinish(999999)
+    assert(ApplicationStatus.UnsuccessfulFinish.equals(job.waitForFinish(999999999)))
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/5ff71e51/samza-core/src/test/scala/org/apache/samza/metrics/TestJmxServer.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/test/scala/org/apache/samza/metrics/TestJmxServer.scala b/samza-core/src/test/scala/org/apache/samza/metrics/TestJmxServer.scala
new file mode 100644
index 0000000..f5594d0
--- /dev/null
+++ b/samza-core/src/test/scala/org/apache/samza/metrics/TestJmxServer.scala
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.metrics
+
+import org.junit.Assert._
+import org.junit.Test
+import org.apache.samza.config.MapConfig
+import grizzled.slf4j.Logging
+import javax.management.remote.{JMXConnector, JMXConnectorFactory, JMXServiceURL}
+import java.io.IOException
+
+
+class TestJmxServer extends Logging {
+  @Test
+  def serverStartsUp {
+    var jmxServer:JmxServer = null
+
+    try {
+      jmxServer = new JmxServer
+
+      println("Got jmxServer on port " + jmxServer.getPort)
+
+      val jmxURL = new JMXServiceURL(jmxServer.getJmxUrl)
+      var jmxConnector:JMXConnector = null
+      try {
+        jmxConnector = JMXConnectorFactory.connect(jmxURL, null)
+        val connection = jmxConnector.getMBeanServerConnection()
+        assertTrue("Connected but mbean count is somehow 0", connection.getMBeanCount.intValue() > 0)
+      } catch {
+        case ioe:IOException => fail("Couldn't open connection to local JMX server")
+      }finally {
+        if(jmxConnector != null) jmxConnector.close
+      }
+
+    } finally {
+      if (jmxServer != null) jmxServer.stop
+    }
+
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/5ff71e51/samza-core/src/test/scala/org/apache/samza/metrics/reporter/TestJmxReporter.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/test/scala/org/apache/samza/metrics/reporter/TestJmxReporter.scala b/samza-core/src/test/scala/org/apache/samza/metrics/reporter/TestJmxReporter.scala
new file mode 100644
index 0000000..8827697
--- /dev/null
+++ b/samza-core/src/test/scala/org/apache/samza/metrics/reporter/TestJmxReporter.scala
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.metrics.reporter
+
+import org.junit.Assert._
+import org.junit.AfterClass
+import org.junit.BeforeClass
+import org.junit.Test
+import scala.collection.JavaConversions._
+import org.apache.samza.task.TaskContext
+import javax.management.remote.JMXConnectorFactory
+import org.apache.samza.metrics.MetricsRegistryMap
+import javax.management.remote.JMXConnectorServerFactory
+import javax.management.remote.JMXConnectorServer
+import java.rmi.registry.LocateRegistry
+import javax.management.remote.JMXServiceURL
+import org.apache.samza.config.MapConfig
+import java.lang.management.ManagementFactory
+import org.apache.samza.Partition
+import javax.management.ObjectName
+import org.apache.samza.metrics.JvmMetrics
+
+object TestJmxReporter {
+  val port = 4500
+  val url = new JMXServiceURL("service:jmx:rmi:///jndi/rmi://localhost:%d/jmxapitestrmi" format port)
+  var cs: JMXConnectorServer = null
+
+  @BeforeClass
+  def beforeSetupServers {
+    LocateRegistry.createRegistry(4500)
+    val mbs = ManagementFactory.getPlatformMBeanServer()
+    cs = JMXConnectorServerFactory.newJMXConnectorServer(url, null, mbs)
+    cs.start
+  }
+
+  @AfterClass
+  def afterCleanLogDirs {
+    if (cs != null) {
+      cs.stop
+    }
+  }
+}
+
+class TestJmxReporter {
+  import TestJmxReporter.url
+
+  @Test
+  def testJmxReporter {
+    val registry = new MetricsRegistryMap
+    val jvm = new JvmMetrics("test", registry)
+    val reporter = new JmxReporterFactory().getMetricsReporter("", "", new MapConfig(Map[String, String]()))
+
+    reporter.register("test", registry)
+    reporter.start
+    jvm.run
+
+    val mbserver = JMXConnectorFactory.connect(url).getMBeanServerConnection
+    val stateViaJMX = mbserver.getAttribute(new ObjectName("test:type=test,name=MemNonHeapUsedM"), "Value").asInstanceOf[Float]
+
+    assertTrue(stateViaJMX > 0)
+
+    reporter.stop
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/5ff71e51/samza-core/src/test/scala/org/apache/samza/task/TestReadableCoordinator.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/test/scala/org/apache/samza/task/TestReadableCoordinator.scala b/samza-core/src/test/scala/org/apache/samza/task/TestReadableCoordinator.scala
new file mode 100644
index 0000000..c45ed9b
--- /dev/null
+++ b/samza-core/src/test/scala/org/apache/samza/task/TestReadableCoordinator.scala
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.task
+
+import org.junit.Assert._
+import org.junit.Test
+
+class TestReadableCoordinator {
+  @Test
+  def testCommit {
+    val coord = new ReadableCoordinator
+    assert(!coord.isCommitRequested)
+    coord.commit
+    assert(coord.isCommitRequested)
+    coord.commit
+    assert(coord.isCommitRequested)
+  }
+
+  @Test
+  def testShutdown {
+    val coord = new ReadableCoordinator
+    assert(!coord.isShutdownRequested)
+    coord.shutdown
+    assert(coord.isShutdownRequested)
+    coord.shutdown
+    assert(coord.isShutdownRequested)
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/5ff71e51/samza-kafka/lib/kafka_2.8.1-0.8.1-SNAPSHOT-test.jar
----------------------------------------------------------------------
diff --git a/samza-kafka/lib/kafka_2.8.1-0.8.1-SNAPSHOT-test.jar b/samza-kafka/lib/kafka_2.8.1-0.8.1-SNAPSHOT-test.jar
new file mode 100644
index 0000000..ce88269
Binary files /dev/null and b/samza-kafka/lib/kafka_2.8.1-0.8.1-SNAPSHOT-test.jar differ