You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by cr...@apache.org on 2013/08/12 18:21:38 UTC

[08/15] initial import.

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/5ff71e51/samza-core/src/main/scala/org/apache/samza/config/FileSystemCheckpointManagerConfig.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/config/FileSystemCheckpointManagerConfig.scala b/samza-core/src/main/scala/org/apache/samza/config/FileSystemCheckpointManagerConfig.scala
new file mode 100644
index 0000000..707ea59
--- /dev/null
+++ b/samza-core/src/main/scala/org/apache/samza/config/FileSystemCheckpointManagerConfig.scala
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.config
+
+object FileSystemCheckpointManagerConfig {
+  // file system checkpoint manager config constants
+  val CHECKPOINT_MANAGER_ROOT = "task.checkpoint.path" // system name to use when sending offset checkpoints
+
+  implicit def Config2FSCP(config: Config) = new FileSystemCheckpointManagerConfig(config)
+}
+
+class FileSystemCheckpointManagerConfig(config: Config) extends ScalaMapConfig(config) {
+  def getFileSystemCheckpointRoot = getOption(FileSystemCheckpointManagerConfig.CHECKPOINT_MANAGER_ROOT)
+}

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/5ff71e51/samza-core/src/main/scala/org/apache/samza/config/JobConfig.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/config/JobConfig.scala b/samza-core/src/main/scala/org/apache/samza/config/JobConfig.scala
new file mode 100644
index 0000000..150ea93
--- /dev/null
+++ b/samza-core/src/main/scala/org/apache/samza/config/JobConfig.scala
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.config
+
+object JobConfig {
+  // job config constants
+  val STREAM_JOB_FACTORY_CLASS = "job.factory.class" // streaming.job_factory_class
+  val CONFIG_REWRITERS = "job.config.rewriters" // CSV list of config rewriter classes to apply
+  val CONFIG_REWRITER_CLASS = "job.config.rewriter.%s.class"
+  val JOB_NAME = "job.name" // streaming.job_name
+  val JOB_ID = "job.id" // streaming.job_id
+
+  implicit def Config2Job(config: Config) = new JobConfig(config)
+}
+
+class JobConfig(config: Config) extends ScalaMapConfig(config) {
+  def getName = getOption(JobConfig.JOB_NAME)
+
+  def getStreamJobFactoryClass = getOption(JobConfig.STREAM_JOB_FACTORY_CLASS)
+
+  def getJobId = getOption(JobConfig.JOB_ID)
+
+  def getConfigRewriters = getOption(JobConfig.CONFIG_REWRITERS)
+
+  def getConfigRewriterClass(name: String) = getOption(JobConfig.CONFIG_REWRITER_CLASS format name)
+}

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/5ff71e51/samza-core/src/main/scala/org/apache/samza/config/MetricsConfig.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/config/MetricsConfig.scala b/samza-core/src/main/scala/org/apache/samza/config/MetricsConfig.scala
new file mode 100644
index 0000000..f7a11c5
--- /dev/null
+++ b/samza-core/src/main/scala/org/apache/samza/config/MetricsConfig.scala
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.config
+import scala.collection.JavaConversions._
+
+object MetricsConfig {
+  // metrics config constants
+  val METRICS_REPORTERS = "metrics.reporters"
+  val METRICS_REPORTER_FACTORY = "metrics.reporter.%s.class"
+  val METRICS_SNAPSHOT_REPORTER_STREAM = "metrics.reporter.%s.stream"
+
+  implicit def Config2Metrics(config: Config) = new MetricsConfig(config)
+}
+
+class MetricsConfig(config: Config) extends ScalaMapConfig(config) {
+  def getMetricsReporters(): Option[String] = getOption(MetricsConfig.METRICS_REPORTERS)
+
+  def getMetricsFactoryClass(name: String): Option[String] = getOption(MetricsConfig.METRICS_REPORTER_FACTORY format name)
+
+  def getMetricsReporterStream(name: String): Option[String] = getOption(MetricsConfig.METRICS_SNAPSHOT_REPORTER_STREAM format name)
+
+  /**
+   * Returns a list of all metrics names from the config file. Useful for
+   * getting individual metrics.
+   */
+  def getMetricReporterNames() = {
+    getMetricsReporters match {
+      case Some(mr) => if (!"".equals(mr)) {
+        mr.split(",").map(name => name.trim).toList
+      } else {
+        List[String]()
+      }
+      case _ => List[String]()
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/5ff71e51/samza-core/src/main/scala/org/apache/samza/config/ScalaMapConfig.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/config/ScalaMapConfig.scala b/samza-core/src/main/scala/org/apache/samza/config/ScalaMapConfig.scala
new file mode 100644
index 0000000..5fb1f52
--- /dev/null
+++ b/samza-core/src/main/scala/org/apache/samza/config/ScalaMapConfig.scala
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.config
+import org.apache.samza.SamzaException
+
+class ScalaMapConfig(config: Config) extends MapConfig(config) {
+  def getOrElse(k: String, els: String) = getOption(k).getOrElse(els)
+
+  def getOption(k: String): Option[String] = if (containsKey(k)) Some(config.get(k)) else None
+
+  def getExcept(k: String, msg: String = null): String = 
+    getOption(k) match {
+      case Some(s) => s
+      case _ =>
+        val error = 
+          if(msg == null) "Missing required configuration '%s'".format(k)
+          else "Missing required configuration '%s': %s".format(k, msg)
+        throw new SamzaException(error)
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/5ff71e51/samza-core/src/main/scala/org/apache/samza/config/SerializerConfig.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/config/SerializerConfig.scala b/samza-core/src/main/scala/org/apache/samza/config/SerializerConfig.scala
new file mode 100644
index 0000000..0c026e6
--- /dev/null
+++ b/samza-core/src/main/scala/org/apache/samza/config/SerializerConfig.scala
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.config
+import scala.collection.JavaConversions._
+
+object SerializerConfig {
+  // serializer config constants
+  val SERIALIZER_PREFIX = "serializers.registry.%s"
+  val SERDE = "serializers.registry.%s.class"
+
+  implicit def Config2Serializer(config: Config) = new SerializerConfig(config)
+}
+
+class SerializerConfig(config: Config) extends ScalaMapConfig(config) {
+  def getSerdeClass(name: String) = getOption(SerializerConfig.SERDE format name)
+
+  /**
+   * Returns a list of all serializer names from the config file. Useful for
+   * getting individual serializers.
+   */
+  def getSerdeNames() = {
+    val subConf = config.subset("serializers.registry.", true)
+    subConf.keys.filter(k => k.endsWith(".class")).map(_.replace(".class", ""))
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/5ff71e51/samza-core/src/main/scala/org/apache/samza/config/ShellCommandConfig.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/config/ShellCommandConfig.scala b/samza-core/src/main/scala/org/apache/samza/config/ShellCommandConfig.scala
new file mode 100644
index 0000000..fd8dab8
--- /dev/null
+++ b/samza-core/src/main/scala/org/apache/samza/config/ShellCommandConfig.scala
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.config
+
+object ShellCommandConfig {
+  /**
+   * This environment variable is used to store a JSON serialized map of all configuration.
+   */
+  val ENV_CONFIG = "STREAMING_CONFIG"
+
+  /**
+   * A CSV list of partition IDs that a TaskRunner is responsible for (e.g. 0,2,4,6).
+   */
+  val ENV_PARTITION_IDS = "PARTITION_IDS"
+
+  /**
+   * A name for a TaskRunner.
+   */
+  val ENV_TASK_NAME = "TASK_NAME"
+
+  /**
+   * Arguments to be passed to the processing running the TaskRunner (or equivalent, for non JVM languages).
+   */
+  val ENV_SAMZA_OPTS = "SAMZA_OPTS"
+
+  val COMMAND_SHELL_EXECUTE = "task.execute"
+  val TASK_JVM_OPTS = "task.opts"
+
+  implicit def Config2ShellCommand(config: Config) = new ShellCommandConfig(config)
+}
+
+class ShellCommandConfig(config: Config) extends ScalaMapConfig(config) {
+  def getCommand = getOption(ShellCommandConfig.COMMAND_SHELL_EXECUTE).getOrElse("bin/run-task.sh")
+
+  def getTaskOpts = getOption(ShellCommandConfig.TASK_JVM_OPTS)
+}

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/5ff71e51/samza-core/src/main/scala/org/apache/samza/config/StorageConfig.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/config/StorageConfig.scala b/samza-core/src/main/scala/org/apache/samza/config/StorageConfig.scala
new file mode 100644
index 0000000..abcb034
--- /dev/null
+++ b/samza-core/src/main/scala/org/apache/samza/config/StorageConfig.scala
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.config
+
+import scala.collection.JavaConversions._
+
+import grizzled.slf4j.Logging
+
+object StorageConfig {
+  // stream config constants
+  val FACTORY = "stores.%s.factory"
+  val KEY_SERDE = "stores.%s.key.serde"
+  val MSG_SERDE = "stores.%s.msg.serde"
+  val CHANGELOG_STREAM = "stores.%s.changelog"
+
+  implicit def Config2Storage(config: Config) = new StorageConfig(config)
+}
+
+class StorageConfig(config: Config) extends ScalaMapConfig(config) with Logging {
+  import StorageConfig._
+  def getStorageFactoryClassName(name: String) = getOption(FACTORY.format(name))
+  def getStorageKeySerde(name: String) = getOption(StorageConfig.KEY_SERDE format name)
+  def getStorageMsgSerde(name: String) = getOption(StorageConfig.MSG_SERDE format name)
+  def getChangelogStream(name: String) = getOption(CHANGELOG_STREAM format name)
+  def getStoreNames: Seq[String] = {
+    val conf = config.subset("stores.", true)
+    conf.keys.filter(k => k.endsWith(".factory")).map(k => k.substring(0, k.length - ".factory".length)).toSeq
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/5ff71e51/samza-core/src/main/scala/org/apache/samza/config/StreamConfig.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/config/StreamConfig.scala b/samza-core/src/main/scala/org/apache/samza/config/StreamConfig.scala
new file mode 100644
index 0000000..517e9ae
--- /dev/null
+++ b/samza-core/src/main/scala/org/apache/samza/config/StreamConfig.scala
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.config
+
+import grizzled.slf4j.Logging
+import scala.collection.JavaConversions._
+import org.apache.samza.SamzaException
+import org.apache.samza.util.Util
+import org.apache.samza.system.SystemStream
+
+object StreamConfig {
+  // stream config constants
+  val STREAM_PREFIX = "systems.%s.streams.%s."
+  val MSG_SERDE = STREAM_PREFIX + "samza.msg.serde"
+  val KEY_SERDE = STREAM_PREFIX + "samza.key.serde"
+  val CONSUMER_RESET_OFFSET = STREAM_PREFIX + "samza.reset.offset"
+
+  implicit def Config2Stream(config: Config) = new StreamConfig(config)
+}
+
+class StreamConfig(config: Config) extends ScalaMapConfig(config) with Logging {
+  def getStreamMsgSerde(systemStream: SystemStream) =
+    getOption(StreamConfig.MSG_SERDE format (systemStream.getSystem, systemStream.getStream))
+
+  def getStreamKeySerde(systemStream: SystemStream) =
+    getOption(StreamConfig.KEY_SERDE format (systemStream.getSystem, systemStream.getStream))
+
+  def getResetOffsetMap(systemName: String) = {
+    val subConf = config.subset("systems.%s.streams." format systemName, true)
+    subConf
+      .filterKeys(k => k.endsWith(".samza.reset.offset"))
+      .map(kv => {
+        val streamName = kv._1.replace(".samza.reset.offset", "")
+        val systemStream = new SystemStream(systemName, streamName)
+        val resetVal = getResetOffset(systemStream)
+        (systemStream, resetVal)
+      }).toMap
+  }
+
+  def getResetOffset(systemStream: SystemStream) =
+    getOption(StreamConfig.CONSUMER_RESET_OFFSET format (systemStream.getSystem, systemStream.getStream)) match {
+      case Some("true") => true
+      case Some("false") => false
+      case Some(resetOffset) =>
+        warn("Got a configuration for %s that is not true, or false (was %s). Defaulting to false." format (StreamConfig.CONSUMER_RESET_OFFSET format (systemStream.getSystem, systemStream.getStream), resetOffset))
+        false
+      case _ => false
+    }
+
+  /**
+   * Returns a list of all SystemStreams that have a serde defined from the config file.
+   */
+  def getSerdeStreams(systemName: String) = {
+    val subConf = config.subset("systems.%s.streams." format systemName, true)
+    subConf
+      .keys
+      .filter(k => k.endsWith(".samza.msg.serde") || k.endsWith(".samza.key.serde"))
+      .map(k => {
+        val streamName = k.substring(0, k.length - 16 /* .samza.XXX.serde length */ )
+        new SystemStream(systemName, streamName)
+      }).toSet
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/5ff71e51/samza-core/src/main/scala/org/apache/samza/config/SystemConfig.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/config/SystemConfig.scala b/samza-core/src/main/scala/org/apache/samza/config/SystemConfig.scala
new file mode 100644
index 0000000..ce63a8a
--- /dev/null
+++ b/samza-core/src/main/scala/org/apache/samza/config/SystemConfig.scala
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.config
+
+import scala.collection.JavaConversions._
+
+import grizzled.slf4j.Logging
+
+object SystemConfig {
+  // system config constants
+  val SYSTEM_PREFIX = "systems.%s."
+  val SYSTEM_FACTORY = "systems.%s.samza.factory"
+  val KEY_SERDE = "systems.%s.samza.key.serde"
+  val MSG_SERDE = "systems.%s.samza.msg.serde"
+
+  implicit def Config2System(config: Config) = new SystemConfig(config)
+}
+
+class SystemConfig(config: Config) extends ScalaMapConfig(config) with Logging {
+  def getSystemFactory(name: String) = getOption(SystemConfig.SYSTEM_FACTORY format name)
+
+  def getSystemKeySerde(name: String) = getOption(SystemConfig.KEY_SERDE format name)
+
+  def getSystemMsgSerde(name: String) = getOption(SystemConfig.MSG_SERDE format name)
+
+  /**
+   * Returns a list of all system names from the config file. Useful for
+   * getting individual systems.
+   */
+  def getSystemNames() = {
+    val subConf = config.subset("systems.", true)
+    // find all .samza.partition.manager keys, and strip the suffix
+    subConf.keys.filter(k => k.endsWith(".samza.factory")).map(_.replace(".samza.factory", ""))
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/5ff71e51/samza-core/src/main/scala/org/apache/samza/config/TaskConfig.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/config/TaskConfig.scala b/samza-core/src/main/scala/org/apache/samza/config/TaskConfig.scala
new file mode 100644
index 0000000..0c742d8
--- /dev/null
+++ b/samza-core/src/main/scala/org/apache/samza/config/TaskConfig.scala
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.config
+
+import org.apache.samza.config.StreamConfig.Config2Stream
+import org.apache.samza.SamzaException
+import org.apache.samza.util.Util
+import org.apache.samza.system.SystemStream
+
+object TaskConfig {
+  // task config constants
+  val INPUT_STREAMS = "task.inputs" // streaming.input-streams
+  val WINDOW_MS = "task.window.ms" // window period in milliseconds
+  val COMMIT_MS = "task.commit.ms" // commit period in milliseconds
+  val TASK_CLASS = "task.class" // streaming.task-factory-class
+  val COMMAND_BUILDER = "task.command.class" // streaming.task-factory-class
+  val LIFECYCLE_LISTENERS = "task.lifecycle.listeners" // li-generator,foo
+  val LIFECYCLE_LISTENER = "task.lifecycle.listener.%s.class" // task.lifecycle.listener.li-generator.class
+  val CHECKPOINT_MANAGER_FACTORY = "task.checkpoint.factory" // class name to use when sending offset checkpoints
+  val TASK_JMX_ENABLED = "task.jmx.enabled" // Start up a JMX server for this task?
+
+  implicit def Config2Task(config: Config) = new TaskConfig(config)
+}
+
+class TaskConfig(config: Config) extends ScalaMapConfig(config) {
+  def getInputStreams = getOption(TaskConfig.INPUT_STREAMS) match {
+    case Some(streams) => if (streams.length > 0) {
+      streams.split(",").map(systemStreamNames => {
+        Util.getSystemStreamFromNames(systemStreamNames)
+      }).toSet
+    } else {
+      Set[SystemStream]()
+    }
+    case _ => Set[SystemStream]()
+  }
+
+  def getWindowMs: Option[Long] = getOption(TaskConfig.WINDOW_MS) match {
+    case Some(ms) => Some(ms.toLong)
+    case _ => None
+  }
+
+  def getCommitMs: Option[Long] = getOption(TaskConfig.COMMIT_MS) match {
+    case Some(ms) => Some(ms.toLong)
+    case _ => None
+  }
+
+  def getLifecycleListeners(): Option[String] = getOption(TaskConfig.LIFECYCLE_LISTENERS)
+
+  def getLifecycleListenerClass(name: String): Option[String] = getOption(TaskConfig.LIFECYCLE_LISTENER format name)
+
+  def getTaskClass = getOption(TaskConfig.TASK_CLASS)
+
+  def getCommandClass = getOption(TaskConfig.COMMAND_BUILDER)
+
+  def getCheckpointManagerFactory() = getOption(TaskConfig.CHECKPOINT_MANAGER_FACTORY)
+
+  def getJmxServerEnabled = getBoolean(TaskConfig.TASK_JMX_ENABLED, true)
+}

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/5ff71e51/samza-core/src/main/scala/org/apache/samza/config/factories/PropertiesConfigFactory.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/config/factories/PropertiesConfigFactory.scala b/samza-core/src/main/scala/org/apache/samza/config/factories/PropertiesConfigFactory.scala
new file mode 100644
index 0000000..768cfa4
--- /dev/null
+++ b/samza-core/src/main/scala/org/apache/samza/config/factories/PropertiesConfigFactory.scala
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.config.factories
+import java.io.FileInputStream
+import java.net.URI
+import java.util.Properties
+import scala.collection.JavaConversions._
+import org.apache.samza.config.Config
+import org.apache.samza.config.ConfigFactory
+import org.apache.samza.config.MapConfig
+import grizzled.slf4j.Logging
+import org.apache.samza.SamzaException
+
+class PropertiesConfigFactory extends ConfigFactory with Logging {
+  def getConfig(configUri: URI): Config = {
+    val scheme = configUri.getScheme
+    if (scheme != null && !scheme.equals("file")) {
+      throw new SamzaException("only the file:// scheme is supported for properties files")
+    }
+
+    val configPath = configUri.getPath
+    val props = new Properties();
+    val in = new FileInputStream(configPath);
+
+    props.load(in);
+    in.close
+
+    debug("got config %s from config %s" format (props, configPath))
+
+    new MapConfig(props.toMap[String, String])
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/5ff71e51/samza-core/src/main/scala/org/apache/samza/config/serializers/JsonConfigSerializer.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/config/serializers/JsonConfigSerializer.scala b/samza-core/src/main/scala/org/apache/samza/config/serializers/JsonConfigSerializer.scala
new file mode 100644
index 0000000..60e65ea
--- /dev/null
+++ b/samza-core/src/main/scala/org/apache/samza/config/serializers/JsonConfigSerializer.scala
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.config.serializers
+import scala.collection.JavaConversions._
+
+import org.codehaus.jackson.map.ObjectMapper
+
+import org.apache.samza.config.Config
+import org.apache.samza.config.MapConfig
+
+import java.util.HashMap
+
+object JsonConfigSerializer {
+  val jsonMapper = new ObjectMapper()
+
+  def fromJson(string: String): Config = {
+    val map = jsonMapper.readValue(string, classOf[HashMap[String, String]])
+    new MapConfig(map)
+  }
+
+  def toJson(config: Config) = jsonMapper.writeValueAsString(new HashMap[String, String](config))
+}

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/5ff71e51/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala b/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
new file mode 100644
index 0000000..2243b5c
--- /dev/null
+++ b/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
@@ -0,0 +1,608 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.container
+
+import java.io.File
+import grizzled.slf4j.Logging
+import org.apache.samza.Partition
+import org.apache.samza.SamzaException
+import org.apache.samza.checkpoint.CheckpointManager
+import org.apache.samza.checkpoint.CheckpointManagerFactory
+import org.apache.samza.config.Config
+import org.apache.samza.config.MetricsConfig.Config2Metrics
+import org.apache.samza.config.SerializerConfig.Config2Serializer
+import org.apache.samza.config.ShellCommandConfig
+import org.apache.samza.config.StorageConfig.Config2Storage
+import org.apache.samza.config.StreamConfig.Config2Stream
+import org.apache.samza.config.SystemConfig.Config2System
+import org.apache.samza.config.TaskConfig.Config2Task
+import org.apache.samza.config.serializers.JsonConfigSerializer
+import org.apache.samza.metrics.JmxServer
+import org.apache.samza.metrics.JvmMetrics
+import org.apache.samza.metrics.MetricsReporter
+import org.apache.samza.metrics.MetricsReporterFactory
+import org.apache.samza.serializers.Serde
+import org.apache.samza.serializers.SerdeFactory
+import org.apache.samza.serializers.SerdeManager
+import org.apache.samza.storage.StorageEngineFactory
+import org.apache.samza.storage.TaskStorageManager
+import org.apache.samza.system.SystemFactory
+import org.apache.samza.system.SystemStream
+import org.apache.samza.system.SystemStreamPartition
+import org.apache.samza.task.StreamTask
+import org.apache.samza.task.TaskLifecycleListener
+import org.apache.samza.task.TaskLifecycleListenerFactory
+import org.apache.samza.util.Util
+import org.apache.samza.task.ReadableCoordinator
+import org.apache.samza.system.SystemProducers
+import org.apache.samza.task.ReadableCollector
+import org.apache.samza.system.DefaultPicker
+import org.apache.samza.system.SystemConsumers
+
+object SamzaContainer extends Logging {
+  def main(args: Array[String]) {
+    val jmxServer = new JmxServer
+    val containerName = System.getenv(ShellCommandConfig.ENV_TASK_NAME)
+    val configStr = System.getenv(ShellCommandConfig.ENV_CONFIG)
+    val config = JsonConfigSerializer.fromJson(configStr)
+    val partitionIdsCsv = System.getenv(ShellCommandConfig.ENV_PARTITION_IDS)
+    val partitions = if (partitionIdsCsv.length > 0) {
+      partitionIdsCsv.split(",")
+        .map(partitionIdStr => new Partition(partitionIdStr.toInt))
+        .toSet
+    } else {
+      throw new SamzaException("No partitions for this task. Can't run a task without partition assignments. It's likely that the partition manager for this system doesn't know about the stream you're trying to read.")
+    }
+
+    try {
+      SamzaContainer(containerName, partitions, config).run
+    } finally {
+      jmxServer.stop
+    }
+  }
+
+  def apply(containerName: String, partitions: Set[Partition], config: Config) = {
+    info("Setting up Samza container: %s" format containerName)
+    info("Using partitions: %s" format partitions)
+    info("Using configuration: %s" format config)
+
+    val samzaContainerMetrics = new SamzaContainerMetrics(containerName)
+
+    val inputStreams = config.getInputStreams
+    val inputSystems = inputStreams.map(_.getSystem)
+
+    info("Got input streams: %s" format inputStreams)
+
+    val systemNames = config.getSystemNames
+
+    info("Got system names: %s" format systemNames)
+
+    val resetInputStreams = systemNames.flatMap(systemName => {
+      config.getResetOffsetMap(systemName)
+    }).toMap
+
+    info("Got input stream resets: %s" format resetInputStreams)
+
+    val serdeStreams = systemNames.foldLeft(Set[SystemStream]())(_ ++ config.getSerdeStreams(_))
+
+    debug("Got serde streams: %s" format serdeStreams)
+
+    val serdeNames = config.getSerdeNames
+
+    info("Got serde names: %s" format serdeNames)
+
+    val systemFactories = systemNames.map(systemName => {
+      val systemFactoryClassName = config
+        .getSystemFactory(systemName)
+        .getOrElse(throw new SamzaException("A stream uses system %s, which is missing from the configuration." format systemName))
+      (systemName, Util.getObj[SystemFactory](systemFactoryClassName))
+    }).toMap
+
+    info("Got system factories: %s" format systemFactories.keys)
+
+    val consumers = inputSystems
+      .map(systemName => {
+        val systemFactory = systemFactories(systemName)
+
+        try {
+          (systemName, systemFactory.getConsumer(systemName, config, samzaContainerMetrics.registry))
+        } catch {
+          case e: Throwable =>
+            info("Failed to create a consumer for %s, so skipping." format systemName)
+            debug(e)
+            (systemName, null)
+        }
+      })
+      .filter(_._2 != null)
+      .toMap
+
+    info("Got system consumers: %s" format consumers.keys)
+
+    val producers = systemFactories
+      .map {
+        case (systemName, systemFactory) =>
+          try {
+            (systemName, systemFactory.getProducer(systemName, config, samzaContainerMetrics.registry))
+          } catch {
+            case e: Throwable =>
+              info("Failed to create a producer for %s, so skipping." format systemName)
+              debug(e)
+              (systemName, null)
+          }
+      }
+      .filter(_._2 != null)
+      .toMap
+
+    info("Got system producers: %s" format producers.keys)
+
+    val serdes = serdeNames.map(serdeName => {
+      val serdeClassName = config
+        .getSerdeClass(serdeName)
+        .getOrElse(throw new SamzaException("No class defined for serde: %s." format serdeName))
+
+      val serde = Util.getObj[SerdeFactory[Object]](serdeClassName)
+        .getSerde(serdeName, config)
+
+      (serdeName, serde)
+    }).toMap
+
+    info("Got serdes: %s" format serdes.keys)
+
+    /*
+     * A Helper function to build a Map[String, Serde] (systemName -> Serde) for systems defined in the config. This is useful to build both key and message serde maps.
+     */
+    val buildSystemSerdeMap = (getSerdeName: (String) => Option[String]) => {
+      systemNames
+        .filter(getSerdeName(_).isDefined)
+        .map(systemName => {
+          val serdeName = getSerdeName(systemName).get
+          val serde = serdes.getOrElse(serdeName, throw new SamzaException("Serde %s for system %s does not exist in configuration." format (serdeName, systemName)))
+          (systemName, serde)
+        }).toMap
+    }
+
+    /*
+     * A Helper function to build a Map[SystemStream, Serde] for streams defined in the config. This is useful to build both key and message serde maps.
+     */
+    val buildSystemStreamSerdeMap = (getSerdeName: (SystemStream) => Option[String]) => {
+      (serdeStreams ++ inputStreams)
+        .filter(systemStream => getSerdeName(systemStream).isDefined)
+        .map(systemStream => {
+          val serdeName = getSerdeName(systemStream).get
+          val serde = serdes.getOrElse(serdeName, throw new SamzaException("Serde %s for system %s does not exist in configuration." format (serdeName, systemStream)))
+          (systemStream, serde)
+        }).toMap
+    }
+
+    val systemKeySerdes = buildSystemSerdeMap((systemName: String) => config.getSystemKeySerde(systemName))
+
+    debug("Got system key serdes: %s" format systemKeySerdes)
+
+    val systemMessageSerdes = buildSystemSerdeMap((systemName: String) => config.getSystemMsgSerde(systemName))
+
+    debug("Got system message serdes: %s" format systemMessageSerdes)
+
+    val systemStreamKeySerdes = buildSystemStreamSerdeMap((systemStream: SystemStream) => config.getStreamKeySerde(systemStream))
+
+    debug("Got system stream key serdes: %s" format systemStreamKeySerdes)
+
+    val systemStreamMessageSerdes = buildSystemStreamSerdeMap((systemStream: SystemStream) => config.getStreamMsgSerde(systemStream))
+
+    debug("Got system stream message serdes: %s" format systemStreamMessageSerdes)
+
+    val changeLogSystemStreams = config
+      .getStoreNames
+      .filter(config.getChangelogStream(_).isDefined)
+      .map(name => (name, config.getChangelogStream(name).get)).toMap
+      .mapValues(Util.getSystemStreamFromNames(_))
+
+    info("Got change log system streams: %s" format changeLogSystemStreams)
+
+    val serdeManager = new SerdeManager(
+      serdes = serdes,
+      systemKeySerdes = systemKeySerdes,
+      systemMessageSerdes = systemMessageSerdes,
+      systemStreamKeySerdes = systemStreamKeySerdes,
+      systemStreamMessageSerdes = systemStreamMessageSerdes,
+      changeLogSystemStreams = changeLogSystemStreams.values.toSet)
+
+    info("Setting up JVM metrics.")
+
+    val jvm = new JvmMetrics(samzaContainerMetrics.registry)
+
+    info("Setting up incoming message envelope picker.")
+
+    val picker = new DefaultPicker
+
+    info("Setting up metrics reporters.")
+
+    val reporters = config.getMetricReporterNames.map(reporterName => {
+      val metricsFactoryClassName = config
+        .getMetricsFactoryClass(reporterName)
+        .getOrElse(throw new SamzaException("Metrics reporter %s missing .class config" format reporterName))
+
+      val reporter =
+        Util
+          .getObj[MetricsReporterFactory](metricsFactoryClassName)
+          .getMetricsReporter(reporterName, containerName, config)
+      (reporterName, reporter)
+    }).toMap
+
+    info("Got metrics reporters: %s" format reporters.keys)
+
+    val checkpointManager = config.getCheckpointManagerFactory match {
+      case Some(checkpointFactoryClassName) =>
+        Util
+          .getObj[CheckpointManagerFactory](checkpointFactoryClassName)
+          .getCheckpointManager(config, samzaContainerMetrics.registry)
+      case _ => null
+    }
+
+    info("Got checkpoint manager: %s" format checkpointManager)
+
+    val consumerMultiplexer = new SystemConsumers(
+      // TODO add config values for no new message timeout and max msgs per stream partition
+      picker = picker,
+      consumers = consumers,
+      serdeManager = serdeManager)
+
+    val producerMultiplexer = new SystemProducers(
+      producers = producers,
+      serdeManager = serdeManager)
+
+    val listeners = config.getLifecycleListeners match {
+      case Some(listeners) => {
+        listeners.split(",").map(listenerName => {
+          info("Loading lifecycle listener: %s" format listenerName)
+
+          val listenerClassName = config.getLifecycleListenerClass(listenerName).getOrElse(throw new SamzaException("Referencing missing listener %s in config" format listenerName))
+
+          Util.getObj[TaskLifecycleListenerFactory](listenerClassName)
+            .getLifecyleListener(listenerName, config)
+        }).toList
+      }
+      case _ => {
+        info("No lifecycle listeners found")
+
+        List[TaskLifecycleListener]()
+      }
+    }
+
+    // TODO not sure how we should make this config based, or not. Kind of 
+    // strange, since it has some dynamic directories when used with YARN.
+    val storeBaseDir = new File(System.getProperty("user.dir"), "state")
+
+    info("Got storage engine base directory: %s" format storeBaseDir)
+
+    val storageEngineFactories = config
+      .getStoreNames
+      .map(storeName => {
+        val storageFactoryClassName = config
+          .getStorageFactoryClassName(storeName)
+          .getOrElse(throw new SamzaException("Missing storage factory for %s." format storeName))
+        (storeName, Util.getObj[StorageEngineFactory[Object, Object]](storageFactoryClassName))
+      }).toMap
+
+    info("Got storage engines: %s" format storageEngineFactories.keys)
+
+    val taskClassName = config
+      .getTaskClass
+      .getOrElse(throw new SamzaException("No task class defined in configuration."))
+
+    info("Got stream task class: %s" format taskClassName)
+
+    val taskWindowMs = config.getWindowMs.getOrElse(-1L)
+
+    info("Got window milliseconds: %s" format taskWindowMs)
+
+    val taskCommitMs = config.getCommitMs.getOrElse(60000L)
+
+    info("Got commit milliseconds: %s" format taskCommitMs)
+
+    // Wire up all task-level (unshared) objects.
+    val taskInstances = partitions.map(partition => {
+      debug("Setting up task instance: %s" format partition)
+
+      val task = Util.getObj[StreamTask](taskClassName)
+
+      val collector = new ReadableCollector
+
+      val taskInstanceMetrics = new TaskInstanceMetrics(partition)
+
+      val storeConsumers = changeLogSystemStreams
+        .map {
+          case (storeName, changeLogSystemStream) =>
+            val systemConsumer = systemFactories
+              .getOrElse(changeLogSystemStream.getSystem, throw new SamzaException("Changelog system %s for store %s does not exist in the config." format (changeLogSystemStream, storeName)))
+              .getConsumer(changeLogSystemStream.getSystem, config, taskInstanceMetrics.registry)
+            (storeName, systemConsumer)
+        }.toMap
+
+      info("Got store consumers: %s" format storeConsumers)
+
+      val taskStores = storageEngineFactories
+        .map {
+          case (storeName, storageEngineFactory) =>
+            val changeLogSystemStreamPartition = if (changeLogSystemStreams.contains(storeName)) {
+              new SystemStreamPartition(changeLogSystemStreams(storeName), partition)
+            } else {
+              null
+            }
+            val keySerde = config.getStorageKeySerde(storeName) match {
+              case Some(keySerde) => serdes(keySerde)
+              case _ => null
+            }
+            val msgSerde = config.getStorageMsgSerde(storeName) match {
+              case Some(msgSerde) => serdes(msgSerde)
+              case _ => null
+            }
+            val storePartitionDir = TaskStorageManager.getStorePartitionDir(storeBaseDir, storeName, partition)
+            val storageEngine = storageEngineFactory.getStorageEngine(
+              storeName,
+              storePartitionDir,
+              keySerde,
+              msgSerde,
+              collector,
+              config,
+              taskInstanceMetrics.registry,
+              changeLogSystemStreamPartition)
+            (storeName, storageEngine)
+        }
+
+      info("Got task stores: %s" format taskStores)
+
+      val storageManager = new TaskStorageManager(
+        partition = partition,
+        taskStores = taskStores,
+        storeConsumers = storeConsumers,
+        changeLogSystemStreams = changeLogSystemStreams,
+        storeBaseDir = storeBaseDir)
+
+      val taskInstance = new TaskInstance(
+        task = task,
+        partition = partition,
+        config = config,
+        metrics = taskInstanceMetrics,
+        consumerMultiplexer = consumerMultiplexer,
+        producerMultiplexer = producerMultiplexer,
+        storageManager = storageManager,
+        checkpointManager = checkpointManager,
+        reporters = reporters,
+        listeners = listeners,
+        inputStreams = inputStreams,
+        resetInputStreams = resetInputStreams,
+        windowMs = taskWindowMs,
+        commitMs = taskCommitMs,
+        collector = collector)
+
+      (partition, taskInstance)
+    }).toMap
+
+    info("Samza container setup complete.")
+
+    new SamzaContainer(
+      taskInstances = taskInstances,
+      config = config,
+      consumerMultiplexer = consumerMultiplexer,
+      producerMultiplexer = producerMultiplexer,
+      checkpointManager = checkpointManager,
+      metrics = samzaContainerMetrics,
+      reporters = reporters,
+      jvm = jvm)
+  }
+}
+
+class SamzaContainer(
+  taskInstances: Map[Partition, TaskInstance],
+  config: Config,
+  consumerMultiplexer: SystemConsumers,
+  producerMultiplexer: SystemProducers,
+  checkpointManager: CheckpointManager = null,
+  metrics: SamzaContainerMetrics = new SamzaContainerMetrics,
+  reporters: Map[String, MetricsReporter] = Map(),
+  jvm: JvmMetrics = null) extends Runnable with Logging {
+
+  def run {
+    info("Entering run loop.")
+
+    startMetrics
+    startCheckpoints
+    startStores
+    startTask
+    startProducers
+    startConsumers
+
+    try {
+      while (true) {
+        val coordinator = new ReadableCoordinator
+
+        process(coordinator)
+        window(coordinator)
+        send
+        commit(coordinator)
+
+        if (coordinator.shutdownRequested) {
+          info("Shutdown requested.")
+
+          return
+        }
+      }
+    } catch {
+      case e: Throwable =>
+        error("Caught exception in process loop.", e)
+        throw e
+    } finally {
+      info("Shutting down.")
+
+      shutdownConsumers
+      shutdownProducers
+      shutdownTask
+      shutdownStores
+      shutdownCheckpoints
+      shutdownMetrics
+
+      info("Shutdown complete.")
+    }
+  }
+
+  def startMetrics {
+    info("Registering task instances with metrics.")
+
+    taskInstances.values.foreach(_.registerMetrics)
+
+    info("Starting JVM metrics.")
+
+    if (jvm != null) {
+      jvm.start
+    }
+
+    info("Starting metrics reporters.")
+
+    reporters.values.foreach(reporter => {
+      reporter.register(metrics.source, metrics.registry)
+      reporter.start
+    })
+  }
+
+  def startCheckpoints {
+    info("Registering task instances with checkpoints.")
+
+    taskInstances.values.foreach(_.registerCheckpoints)
+
+    if (checkpointManager != null) {
+      info("Registering checkpoint manager.")
+
+      checkpointManager.start
+    } else {
+      warn("No checkpoint manager defined. No consumer offsets will be maintained for this job.")
+    }
+  }
+
+  def startStores {
+    info("Starting task instance stores.")
+
+    taskInstances.values.foreach(_.startStores)
+  }
+
+  def startTask {
+    info("Initializing stream tasks.")
+
+    taskInstances.values.foreach(_.initTask)
+  }
+
+  def startProducers {
+    info("Registering task instances with producers.")
+
+    taskInstances.values.foreach(_.registerProducers)
+
+    info("Starting producer multiplexer.")
+
+    producerMultiplexer.start
+  }
+
+  def startConsumers {
+    info("Registering task instances with consumers.")
+
+    taskInstances.values.foreach(_.registerConsumers)
+
+    info("Starting consumer multiplexer.")
+
+    consumerMultiplexer.start
+  }
+
+  def process(coordinator: ReadableCoordinator) {
+    trace("Attempting to pick a message to process.")
+
+    val envelope = consumerMultiplexer.pick
+
+    if (envelope != null) {
+      val partition = envelope.getSystemStreamPartition.getPartition
+
+      trace("Processing incoming message envelope for partition %s." format partition)
+
+      taskInstances(partition).process(envelope, coordinator)
+    } else {
+      trace("No incoming message envelope was available.")
+    }
+  }
+
+  def window(coordinator: ReadableCoordinator) {
+    trace("Windowing stream tasks.")
+
+    taskInstances.values.foreach(_.window(coordinator))
+  }
+
+  def send {
+    trace("Triggering send in task instances.")
+
+    taskInstances.values.foreach(_.send)
+  }
+
+  def commit(coordinator: ReadableCoordinator) {
+    trace("Committing task instances.")
+
+    taskInstances.values.foreach(_.commit(coordinator))
+  }
+
+  def shutdownConsumers {
+    info("Shutting down consumer multiplexer.")
+
+    consumerMultiplexer.stop
+  }
+
+  def shutdownProducers {
+    info("Shutting down producer multiplexer.")
+
+    producerMultiplexer.stop
+  }
+
+  def shutdownTask {
+    info("Shutting down task instance stream tasks.")
+
+    taskInstances.values.foreach(_.shutdownTask)
+  }
+
+  def shutdownStores {
+    info("Shutting down task instance stores.")
+
+    taskInstances.values.foreach(_.shutdownStores)
+  }
+
+  def shutdownCheckpoints {
+    if (checkpointManager != null) {
+      info("Shutting down checkpoint manager.")
+      checkpointManager.stop
+    } else {
+      info("No checkpoint manager defined, so skipping checkpoint manager stop.")
+    }
+  }
+
+  def shutdownMetrics {
+    info("Shutting down metrics reporters.")
+
+    reporters.values.foreach(_.stop)
+
+    if (jvm != null) {
+      info("Shutting down JVM metrics.")
+
+      jvm.stop
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/5ff71e51/samza-core/src/main/scala/org/apache/samza/container/SamzaContainerMetrics.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/container/SamzaContainerMetrics.scala b/samza-core/src/main/scala/org/apache/samza/container/SamzaContainerMetrics.scala
new file mode 100644
index 0000000..81cf356
--- /dev/null
+++ b/samza-core/src/main/scala/org/apache/samza/container/SamzaContainerMetrics.scala
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.container
+
+import org.apache.samza.metrics.ReadableMetricsRegistry
+import org.apache.samza.metrics.MetricsRegistryMap
+
+class SamzaContainerMetrics(
+  val containerName: String = "unnamed-container",
+  val registry: ReadableMetricsRegistry = new MetricsRegistryMap) {
+
+  val source = containerName
+  val commits = registry.newCounter("samza.task.SamzaContainer", "commits")
+
+  // TODO .. etc ..
+}

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/5ff71e51/samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala b/samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala
new file mode 100644
index 0000000..5fc9316
--- /dev/null
+++ b/samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala
@@ -0,0 +1,241 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.container
+
+import org.apache.samza.metrics.MetricsReporter
+import org.apache.samza.config.Config
+import org.apache.samza.Partition
+import grizzled.slf4j.Logging
+import scala.collection.JavaConversions._
+import org.apache.samza.storage.TaskStorageManager
+import org.apache.samza.config.StreamConfig.Config2Stream
+import org.apache.samza.system.SystemStreamPartition
+import org.apache.samza.task.TaskContext
+import org.apache.samza.task.ClosableTask
+import org.apache.samza.task.InitableTask
+import org.apache.samza.system.IncomingMessageEnvelope
+import org.apache.samza.task.WindowableTask
+import org.apache.samza.checkpoint.CheckpointManager
+import org.apache.samza.task.TaskLifecycleListener
+import org.apache.samza.task.StreamTask
+import org.apache.samza.system.SystemStream
+import org.apache.samza.checkpoint.Checkpoint
+import org.apache.samza.task.ReadableCollector
+import org.apache.samza.system.SystemConsumers
+import org.apache.samza.system.SystemProducers
+import org.apache.samza.task.ReadableCoordinator
+
+class TaskInstance(
+  task: StreamTask,
+  partition: Partition,
+  config: Config,
+  metrics: TaskInstanceMetrics,
+  consumerMultiplexer: SystemConsumers,
+  producerMultiplexer: SystemProducers,
+  storageManager: TaskStorageManager = null,
+  checkpointManager: CheckpointManager = null,
+  reporters: Map[String, MetricsReporter] = Map(),
+  listeners: Seq[TaskLifecycleListener] = Seq(),
+  inputStreams: Set[SystemStream] = Set(),
+  resetInputStreams: Map[SystemStream, Boolean] = Map(),
+  queueSize: Int = 1000,
+  windowMs: Long = -1,
+  commitMs: Long = 60000,
+  clock: () => Long = { System.currentTimeMillis },
+  collector: ReadableCollector = new ReadableCollector) extends Logging {
+
+  var offsets = Map[SystemStream, String]()
+  var lastWindowMs = 0L
+  var lastCommitMs = 0L
+  val isInitableTask = task.isInstanceOf[InitableTask]
+  val isWindowableTask = task.isInstanceOf[WindowableTask]
+  val isClosableTask = task.isInstanceOf[ClosableTask]
+  val context = new TaskContext {
+    def getMetricsRegistry = metrics.registry
+    def getPartition = partition
+    def getStore(storeName: String) = if (storageManager != null) {
+      storageManager(storeName)
+    } else {
+      warn("No store found for name: %s" format storeName)
+
+      null
+    }
+  }
+
+  def registerMetrics {
+    debug("Registering metrics for partition: %s." format partition)
+
+    reporters.values.foreach(_.register(metrics.source, metrics.registry))
+  }
+
+  def registerCheckpoints {
+    if (checkpointManager != null) {
+      debug("Registering checkpoint manager for partition: %s." format partition)
+
+      checkpointManager.register(partition)
+    } else {
+      debug("Skipping checkpoint manager registration for partition: %s." format partition)
+    }
+  }
+
+  def startStores {
+    if (storageManager != null) {
+      debug("Starting storage manager for partition: %s." format partition)
+
+      storageManager.init(collector)
+    } else {
+      debug("Skipping storage manager initialization for partition: %s." format partition)
+    }
+  }
+
+  def initTask {
+    listeners.foreach(_.beforeInit(config, context))
+
+    if (isInitableTask) {
+      debug("Initializing task for partition: %s." format partition)
+
+      task.asInstanceOf[InitableTask].init(config, context)
+    } else {
+      debug("Skipping task initialization for partition: %s." format partition)
+    }
+
+    listeners.foreach(_.afterInit(config, context))
+  }
+
+  def registerProducers {
+    debug("Registering producers for partition: %s." format partition)
+
+    producerMultiplexer.register(metrics.source)
+  }
+
+  def registerConsumers {
+    if (checkpointManager != null) {
+      debug("Loading checkpoints for partition: %s." format partition)
+
+      val checkpoint = checkpointManager.readLastCheckpoint(partition)
+
+      if (checkpoint != null) {
+        for ((systemStream, offset) <- checkpoint.getOffsets) {
+          if (!resetInputStreams.getOrElse(systemStream, false)) {
+            offsets += systemStream -> offset
+          } else {
+            info("Got offset %s for %s, but ignoring, since stream was configured to reset offsets." format (offset, systemStream))
+          }
+        }
+
+        info("Successfully loaded offsets for partition: %s, %s" format (partition, offsets))
+      } else {
+        warn("No checkpoint found for partition: %s. This is allowed if this is your first time running the job, but if it's not, you've probably lost data." format partition)
+      }
+    }
+
+    debug("Registering consumers for partition: %s." format partition)
+
+    inputStreams.foreach(stream =>
+      consumerMultiplexer.register(
+        new SystemStreamPartition(stream, partition),
+        offsets.get(stream).getOrElse(null)))
+  }
+
+  def process(envelope: IncomingMessageEnvelope, coordinator: ReadableCoordinator) {
+    listeners.foreach(_.beforeProcess(envelope, config, context))
+
+    trace("Processing incoming message envelope for partition: %s, %s" format (partition, envelope.getSystemStreamPartition))
+
+    task.process(envelope, collector, coordinator)
+
+    listeners.foreach(_.afterProcess(envelope, config, context))
+
+    trace("Updating offset map for partition: %s, %s, %s" format (partition, envelope.getSystemStreamPartition, envelope.getOffset))
+
+    offsets += envelope.getSystemStreamPartition -> envelope.getOffset
+  }
+
+  def window(coordinator: ReadableCoordinator) {
+    if (isWindowableTask && windowMs >= 0 && lastWindowMs + windowMs < clock()) {
+      trace("Windowing for partition: %s" format partition)
+
+      task.asInstanceOf[WindowableTask].window(collector, coordinator)
+      lastWindowMs = clock()
+
+      trace("Assigned last window time for partition: %s, %s" format (partition, lastWindowMs))
+    } else {
+      trace("Skipping window for partition: %s" format partition)
+    }
+  }
+
+  def send {
+    if (collector.envelopes.size > 0) {
+      trace("Sending messages for partition: %s, %s" format (partition, collector.envelopes.size))
+
+      collector.envelopes.foreach(envelope => producerMultiplexer.send(metrics.source, envelope))
+
+      trace("Resetting collector for partition: %s" format partition)
+
+      collector.reset
+    } else {
+      trace("Skipping send for partition %s because no messages were collected." format partition)
+    }
+  }
+
+  def commit(coordinator: ReadableCoordinator) {
+    if (lastCommitMs + commitMs < clock() || coordinator.isCommitRequested || coordinator.isShutdownRequested) {
+      trace("Flushing state stores for partition: %s" format partition)
+
+      storageManager.flush
+
+      trace("Committing producers for partition: %s" format partition)
+
+      producerMultiplexer.commit(metrics.source)
+
+      if (checkpointManager != null) {
+        trace("Committing checkpoint manager for partition: %s" format partition)
+
+        checkpointManager.writeCheckpoint(partition, new Checkpoint(offsets))
+      }
+
+      lastCommitMs = clock()
+    }
+  }
+
+  def shutdownTask {
+    listeners.foreach(_.beforeClose(config, context))
+
+    if (task.isInstanceOf[ClosableTask]) {
+      debug("Shutting down stream task for partition: %s" format partition)
+
+      task.asInstanceOf[ClosableTask].close
+    } else {
+      debug("Skipping stream task shutdown for partition: %s" format partition)
+    }
+
+    listeners.foreach(_.afterClose(config, context))
+  }
+
+  def shutdownStores {
+    if (storageManager != null) {
+      debug("Shutting down storage manager for partition: %s" format partition)
+
+      storageManager.stop
+    } else {
+      debug("Skipping storage manager shutdown for partition: %s" format partition)
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/5ff71e51/samza-core/src/main/scala/org/apache/samza/container/TaskInstanceMetrics.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/container/TaskInstanceMetrics.scala b/samza-core/src/main/scala/org/apache/samza/container/TaskInstanceMetrics.scala
new file mode 100644
index 0000000..07d72c7
--- /dev/null
+++ b/samza-core/src/main/scala/org/apache/samza/container/TaskInstanceMetrics.scala
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.container
+
+import org.apache.samza.metrics.ReadableMetricsRegistry
+import org.apache.samza.metrics.MetricsRegistryMap
+import org.apache.samza.Partition
+
+class TaskInstanceMetrics(
+  val partition: Partition,
+  val registry: ReadableMetricsRegistry = new MetricsRegistryMap) {
+
+  val source = "Partition-%s" format partition.getPartitionId
+  val commits = registry.newCounter("samza.task.TaskInstance", "commits")
+
+  // TODO .. etc ..
+}

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/5ff71e51/samza-core/src/main/scala/org/apache/samza/job/JobRunner.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/job/JobRunner.scala b/samza-core/src/main/scala/org/apache/samza/job/JobRunner.scala
new file mode 100644
index 0000000..f3a75af
--- /dev/null
+++ b/samza-core/src/main/scala/org/apache/samza/job/JobRunner.scala
@@ -0,0 +1,135 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.job
+
+import java.lang.String
+import java.net.URI
+import org.apache.samza.job.ApplicationStatus.Running
+import org.apache.samza.config.JobConfig.Config2Job
+import org.apache.samza.config._
+import grizzled.slf4j.Logging
+import joptsimple.OptionParser
+import joptsimple.util.KeyValuePair
+import org.apache.samza.SamzaException
+import org.apache.samza.config.factories.PropertiesConfigFactory
+import scala.Some
+import org.apache.samza.util.Util
+import scala.collection.mutable.Buffer
+import scala.collection.JavaConversions._
+
+object JobRunner extends Logging {
+  def main(args: Array[String]) {
+    // Define parameters.
+    var parser = new OptionParser()
+    val configFactoryOpt = 
+      parser.accepts("config-factory", "The config factory to use to read your config file.")
+            .withRequiredArg
+            .ofType(classOf[java.lang.String])
+            .describedAs("com.foo.bar.ClassName")
+            .defaultsTo(classOf[PropertiesConfigFactory].getName)
+    val configPathOpt =
+      parser.accepts("config-path", "URI location to a config file (e.g. file:///some/local/path.properties). " + 
+                                    "If multiple files are given they are all used with later files overriding any values that appear in earlier files.")
+            .withRequiredArg
+            .ofType(classOf[URI])
+            .describedAs("path")
+    val configOverrideOpt = 
+      parser.accepts("config", "A configuration value in the form key=value. Command line properties override any configuration values given.")
+            .withRequiredArg
+            .ofType(classOf[KeyValuePair])
+            .describedAs("key=value")
+    var options = parser.parse(args: _*)
+
+    // Verify legitimate parameters.
+    if (!options.has(configPathOpt)) {
+      parser.printHelpOn(System.err)
+      System.exit(-1)
+    }
+
+    // Set up the job parameters.
+    val configFactoryClass = options.valueOf(configFactoryOpt)
+    val configPaths = options.valuesOf(configPathOpt)
+    val configFactory = Class.forName(configFactoryClass).newInstance.asInstanceOf[ConfigFactory]
+    val configOverrides = options.valuesOf(configOverrideOpt).map(kv => (kv.key, kv.value)).toMap
+    
+    val configs: Buffer[java.util.Map[String, String]] = configPaths.map(configFactory.getConfig)
+    configs += configOverrides
+
+    new JobRunner(new MapConfig(configs)).run
+  }
+}
+
+/**
+ * ConfigRunner is a helper class that sets up and executes a Samza job based
+ * on a config URI. The configFactory is instantiated, fed the configPath,
+ * and returns a Config, which is used to execute the job.
+ */
+class JobRunner(config: Config) extends Logging with Runnable {
+
+  def run() {
+    val conf = rewriteConfig(config)
+
+    val jobFactoryClass = conf.getStreamJobFactoryClass match {
+      case Some(factoryClass) => factoryClass
+      case _ => throw new SamzaException("no job factory class defined")
+    }
+
+    val jobFactory = Class.forName(jobFactoryClass).newInstance.asInstanceOf[StreamJobFactory]
+
+    info("job factory: %s" format (jobFactoryClass))
+    debug("config: %s" format (conf))
+
+    // Create the actual job, and submit it.
+    val job = jobFactory.getJob(conf).submit
+
+    info("waiting for job to start")
+
+    // Wait until the job has started, then exit.
+    Option(job.waitForStatus(Running, 500)) match {
+      case Some(appStatus) => {
+        if (Running.equals(appStatus)) {
+          info("job started successfully")
+        } else {
+          warn("unable to start job successfully. job has status %s" format (appStatus))
+        }
+      }
+      case _ => warn("unable to start job successfully.")
+    }
+
+    info("exiting")
+  }
+
+  // Apply any and all config re-writer classes that the user has specified
+  def rewriteConfig(config: Config): Config = {
+    def rewrite(c: Config, rewriterName: String): Config = {
+      val klass = config
+        .getConfigRewriterClass(rewriterName)
+        .getOrElse(throw new SamzaException("Unable to find class config for config rewriter %s." format rewriterName))
+      val rewriter = Util.getObj[ConfigRewriter](klass)
+      info("Re-writing config file with " + rewriter)
+      rewriter.rewrite(rewriterName, c)
+    }
+
+    config.getConfigRewriters match {
+      case Some(rewriters) => rewriters.split(",").foldLeft(config)(rewrite(_, _))
+      case None => config
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/5ff71e51/samza-core/src/main/scala/org/apache/samza/job/ShellCommandBuilder.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/job/ShellCommandBuilder.scala b/samza-core/src/main/scala/org/apache/samza/job/ShellCommandBuilder.scala
new file mode 100644
index 0000000..f55ca4c
--- /dev/null
+++ b/samza-core/src/main/scala/org/apache/samza/job/ShellCommandBuilder.scala
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.job
+
+import scala.collection.JavaConversions._
+import org.apache.samza.config.ShellCommandConfig
+import org.apache.samza.config.ShellCommandConfig.Config2ShellCommand
+import org.apache.samza.config.serializers.JsonConfigSerializer
+
+class ShellCommandBuilder extends CommandBuilder {
+  def buildCommand() = config.getCommand
+
+  def buildEnvironment(): java.util.Map[String, String] = {
+    val parts = if (partitions.size() > 0) partitions.map(_.getPartitionId.toString).reduceLeft(_ + "," + _) else ""
+
+    Map(
+      ShellCommandConfig.ENV_TASK_NAME -> name,
+      ShellCommandConfig.ENV_PARTITION_IDS -> parts,
+      ShellCommandConfig.ENV_CONFIG -> JsonConfigSerializer.toJson(config),
+      ShellCommandConfig.ENV_SAMZA_OPTS -> config.getTaskOpts.getOrElse(""))
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/5ff71e51/samza-core/src/main/scala/org/apache/samza/job/local/LocalJobFactory.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/job/local/LocalJobFactory.scala b/samza-core/src/main/scala/org/apache/samza/job/local/LocalJobFactory.scala
new file mode 100644
index 0000000..ddb119b
--- /dev/null
+++ b/samza-core/src/main/scala/org/apache/samza/job/local/LocalJobFactory.scala
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.job.local
+import org.apache.samza.config.TaskConfig._
+import org.apache.samza.config.Config
+import org.apache.samza.config.SystemConfig._
+import org.apache.samza.config.StreamConfig._
+import org.apache.samza.config.ShellCommandConfig._
+import org.apache.samza.job.CommandBuilder
+import org.apache.samza.job.StreamJob
+import org.apache.samza.job.StreamJobFactory
+import scala.collection.JavaConversions._
+import org.apache.samza.Partition
+import grizzled.slf4j.Logging
+import org.apache.samza.SamzaException
+import org.apache.samza.container.SamzaContainer
+import org.apache.samza.util.Util
+import org.apache.samza.job.ShellCommandBuilder
+
+class LocalJobFactory extends StreamJobFactory with Logging {
+  def getJob(config: Config): StreamJob = {
+    val taskName = "local-task"
+    val partitions = Util.getMaxInputStreamPartitions(config)
+
+    info("got partitions for job %s" format partitions)
+
+    if (partitions.size <= 0) {
+      throw new SamzaException("No partitions were detected for your input streams. It's likely that the system(s) specified don't know about the input streams: %s" format config.getInputStreams)
+    }
+
+    config.getCommandClass match {
+      case Some(cmdBuilderClassName) => {
+        // A command class was specified, so we need to use a process job to
+        // execute the command in its own process.
+        val cmdBuilder = Class.forName(cmdBuilderClassName).newInstance.asInstanceOf[CommandBuilder]
+
+        cmdBuilder
+          .setConfig(config)
+          .setName(taskName)
+          .setPartitions(partitions)
+
+        val processBuilder = new ProcessBuilder(cmdBuilder.buildCommand.split(" ").toList)
+
+        processBuilder
+          .environment
+          .putAll(cmdBuilder.buildEnvironment)
+
+        new ProcessJob(processBuilder)
+      }
+      case _ => {
+        info("No config specified for %s. Defaulting to ThreadJob, which is only meant for debugging." format COMMAND_BUILDER)
+
+        // Give developers a nice friendly warning if they've specified task.opts and are using a threaded job.
+        config.getTaskOpts match {
+          case Some(taskOpts) => warn("%s was specified in config, but is not being used because job is being executed with ThreadJob. You probably want to run %s=%s." format (TASK_JVM_OPTS, COMMAND_BUILDER, classOf[ShellCommandBuilder].getName))
+          case _ => None
+        }
+
+        // No command class was specified, so execute the job in this process
+        // using a threaded job.
+        new ThreadJob(SamzaContainer(taskName, partitions, config))
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/5ff71e51/samza-core/src/main/scala/org/apache/samza/job/local/ProcessJob.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/job/local/ProcessJob.scala b/samza-core/src/main/scala/org/apache/samza/job/local/ProcessJob.scala
new file mode 100644
index 0000000..a9ecd97
--- /dev/null
+++ b/samza-core/src/main/scala/org/apache/samza/job/local/ProcessJob.scala
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.job.local
+
+import org.apache.samza.job.ApplicationStatus.New
+import org.apache.samza.job.ApplicationStatus.Running
+import org.apache.samza.job.ApplicationStatus.SuccessfulFinish
+import org.apache.samza.job.ApplicationStatus.UnsuccessfulFinish
+import org.apache.samza.job.ApplicationStatus
+import org.apache.samza.job.StreamJob
+import grizzled.slf4j.Logging
+import java.util.concurrent.CountDownLatch
+import java.io.BufferedReader
+import java.io.InputStreamReader
+import java.io.InputStream
+import java.io.OutputStream
+import org.apache.samza.SamzaException
+
+class ProcessJob(processBuilder: ProcessBuilder) extends StreamJob with Logging {
+  var jobStatus: Option[ApplicationStatus] = None
+  var process: Process = null
+
+  def submit: StreamJob = {
+    val waitForThreadStart = new CountDownLatch(1)
+    jobStatus = Some(New)
+
+    // create a non-daemon thread to make job runner block until the job finishes.
+    // without this, the proc dies when job runner ends.
+    val procThread = new Thread {
+      override def run {
+        process = processBuilder.start
+
+        // pipe all output to this process's streams
+        val outThread = new Thread(new Piper(process.getInputStream, System.out))
+        val errThread = new Thread(new Piper(process.getErrorStream, System.err))
+        outThread.setDaemon(true)
+        errThread.setDaemon(true)
+        outThread.start
+        errThread.start
+        waitForThreadStart.countDown
+        process.waitFor
+      }
+    }
+
+    procThread.start
+    waitForThreadStart.await
+    jobStatus = Some(Running)
+
+    ProcessJob.this
+  }
+
+  def kill: StreamJob = {
+    process.destroy
+    jobStatus = Some(UnsuccessfulFinish); 
+    ProcessJob.this
+  }
+
+  def waitForFinish(timeoutMs: Long) = {
+    val thread = new Thread {
+      setDaemon(true)
+      override def run {
+        try {
+          process.waitFor
+        } catch {
+          case e: InterruptedException => None
+        }
+      }
+    }
+
+    thread.start
+    thread.join(timeoutMs)
+    thread.interrupt
+    jobStatus.getOrElse(null)
+  }
+
+  def waitForStatus(status: ApplicationStatus, timeoutMs: Long) = {
+    val start = System.currentTimeMillis
+
+    while (System.currentTimeMillis - start < timeoutMs && status != jobStatus) {
+      Thread.sleep(500)
+    }
+
+    jobStatus.getOrElse(null)
+  }
+
+  def getStatus = jobStatus.getOrElse(null)
+}
+
+/**
+ * Silly class to forward bytes from one stream to another. Using this to pipe
+ * output from subprocess to this process' stdout/stderr.
+ */
+class Piper(in: InputStream, out: OutputStream) extends Runnable {
+  def run() {
+    try {
+      val b = new Array[Byte](512)
+      var read = 1;
+      while (read > -1) {
+        read = in.read(b, 0, b.length)
+        if (read > -1) {
+          out.write(b, 0, read)
+          out.flush()
+        }
+      }
+    } catch {
+      case e: Exception => throw new SamzaException("Broken pipe", e);
+    } finally {
+      in.close()
+      out.close()
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/5ff71e51/samza-core/src/main/scala/org/apache/samza/job/local/ThreadJob.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/job/local/ThreadJob.scala b/samza-core/src/main/scala/org/apache/samza/job/local/ThreadJob.scala
new file mode 100644
index 0000000..62994b0
--- /dev/null
+++ b/samza-core/src/main/scala/org/apache/samza/job/local/ThreadJob.scala
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.job.local
+
+import grizzled.slf4j.Logging
+import org.apache.samza.job.StreamJob
+import org.apache.samza.job.ApplicationStatus
+import org.apache.samza.job.ApplicationStatus.New
+import org.apache.samza.job.ApplicationStatus.Running
+import org.apache.samza.job.ApplicationStatus.SuccessfulFinish
+import org.apache.samza.job.ApplicationStatus.UnsuccessfulFinish
+
+class ThreadJob(runnable: Runnable) extends StreamJob with Logging {
+  @volatile var jobStatus: Option[ApplicationStatus] = None
+  var thread: Thread = null
+
+  def submit: StreamJob = {
+    jobStatus = Some(New)
+
+    // create a non-daemon thread to make job runner block until the job finishes.
+    // without this, the proc dies when job runner ends.
+    thread = new Thread {
+      override def run {
+        try {
+          runnable.run
+          jobStatus = Some(SuccessfulFinish)
+        } catch {
+          case e: Throwable => {
+            error("Failing job with exception.", e)
+            jobStatus = Some(UnsuccessfulFinish)
+            throw e
+          }
+        }
+      }
+    }
+    thread.setName("ThreadJob")
+    thread.start
+    jobStatus = Some(Running)
+
+    ThreadJob.this
+  }
+
+  def kill: StreamJob = {
+    thread.interrupt
+    ThreadJob.this
+  }
+
+  def waitForFinish(timeoutMs: Long) = {
+    thread.join(timeoutMs)
+    jobStatus.getOrElse(null)
+  }
+
+  def waitForStatus(status: ApplicationStatus, timeoutMs: Long) = {
+    val start = System.currentTimeMillis
+
+    while (System.currentTimeMillis - start < timeoutMs && !status.equals(jobStatus.getOrElse(null))) {
+      Thread.sleep(500)
+    }
+
+    jobStatus.getOrElse(null)
+  }
+
+  def getStatus = jobStatus.getOrElse(null)
+}

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/5ff71e51/samza-core/src/main/scala/org/apache/samza/metrics/JmxServer.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/metrics/JmxServer.scala b/samza-core/src/main/scala/org/apache/samza/metrics/JmxServer.scala
new file mode 100644
index 0000000..eee213d
--- /dev/null
+++ b/samza-core/src/main/scala/org/apache/samza/metrics/JmxServer.scala
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.metrics
+
+import grizzled.slf4j.Logging
+import java.rmi.server.RMIServerSocketFactory
+import java.net.{ InetAddress, ServerSocket }
+import java.rmi.registry.LocateRegistry
+import management.ManagementFactory
+import java.util
+import javax.management.remote.{ JMXConnectorServerFactory, JMXServiceURL }
+import org.apache.samza.config.Config
+
+/**
+ * Programmatically start the JMX server and its accompanying RMI server. This is necessary in order to reliably
+ * and - heh - simply request and know a dynamic port such that processes on the same machine do not collide when
+ * opening JMX servers on the same port. Server will start upon instantiation.
+ *
+ * Note: This server starts the JMX server, which runs in a separate thread and must be stopped or it will prevent
+ * the process from ending.
+ *
+ * @param requestedPort Port on which to start JMX server, 0 for ephemeral
+ */
+class JmxServer(requestedPort: Int) extends Logging {
+  def this() = this(0)
+
+  // Instance construction
+  val (jmxServer, url, actualPort) = {
+    // An RMIServerSocketFactory that will tell what port it opened up.  Imagine that.
+    class UpfrontRMIServerSocketFactory extends RMIServerSocketFactory {
+      var lastSS: ServerSocket = null
+      def createServerSocket(port: Int): ServerSocket = {
+        lastSS = new ServerSocket(port)
+        lastSS
+      }
+    }
+
+    // Check if the system property has been set and, if not, set it to what we need. Warn otherwise.
+    def updateSystemProperty(prop: String, value: String) = {
+      val existingProp = System.getProperty(prop)
+      if (existingProp == null) {
+        debug("Setting new system property of %s to %s" format (prop, value))
+        System.setProperty(prop, value)
+      } else {
+        info("Not overriding system property %s as already has value %s" format (prop, existingProp))
+      }
+    }
+
+    if (System.getProperty("com.sun.management.jmxremote") != null) {
+      warn("System property com.sun.management.jmxremote has been specified, starting the JVM's JMX server as well. " +
+        "This behavior is not well defined and our values will collide with any set on command line.")
+    }
+
+    val hostname = InetAddress.getLocalHost.getHostName
+    info("According to InetAddress.getLocalHost.getHostName we are " + hostname)
+    updateSystemProperty("com.sun.management.jmxremote.authenticate", "false")
+    updateSystemProperty("com.sun.management.jmxremote.ssl", "false")
+    updateSystemProperty("java.rmi.server.hostname", hostname)
+
+    val ssFactory = new UpfrontRMIServerSocketFactory
+    LocateRegistry.createRegistry(requestedPort, null, ssFactory)
+    val actualPort = ssFactory.lastSS.getLocalPort
+    val mbs = ManagementFactory.getPlatformMBeanServer
+    val env = new util.HashMap[String, Object]()
+    val url = new JMXServiceURL("service:jmx:rmi:///jndi/rmi://" + hostname + ":" + actualPort + "/jmxrmi")
+    val jmxServer = JMXConnectorServerFactory.newJMXConnectorServer(url, env, mbs)
+
+    (jmxServer, url.toString, actualPort)
+  }
+
+  jmxServer.start
+  info("Started " + toString)
+
+  /**
+   * Get RMI port the JMX server is listening on.
+   * @return RMI port
+   */
+  def getPort = actualPort
+
+  /**
+   * Get Jmx URL for this server
+   * @return Jmx-Style URL string
+   */
+  def getJmxUrl = url
+
+  /**
+   * Stop the JMX server. Must be called at program end or will prevent termination.
+   */
+  def stop = jmxServer.stop
+
+  override def toString = "JmxServer port=%d url=%s" format (getPort, getJmxUrl)
+}
+