You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by cr...@apache.org on 2013/08/12 18:21:37 UTC
[07/15] initial import.
http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/5ff71e51/samza-core/src/main/scala/org/apache/samza/metrics/JvmMetrics.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/metrics/JvmMetrics.scala b/samza-core/src/main/scala/org/apache/samza/metrics/JvmMetrics.scala
new file mode 100644
index 0000000..164a2ee
--- /dev/null
+++ b/samza-core/src/main/scala/org/apache/samza/metrics/JvmMetrics.scala
@@ -0,0 +1,148 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.metrics
+
+import java.lang.management.ManagementFactory
+import scala.collection._
+import scala.collection.JavaConversions._
+import java.lang.Thread.State._
+import java.util.concurrent.Executors
+import java.util.concurrent.TimeUnit
+import grizzled.slf4j.Logging
+import org.apache.samza.util.Util
+import org.apache.samza.util.DaemonThreadFactory
+
+/**
+ * Straight up ripoff of Hadoop's metrics2 JvmMetrics class.
+ */
+class JvmMetrics(group: String, registry: MetricsRegistry) extends Runnable with Logging {
+ final val M = 1024 * 1024.0f
+
+ def this(registry: MetricsRegistry) = this("samza.jvm", registry)
+
+ val memoryMXBean = ManagementFactory.getMemoryMXBean()
+ val gcBeans = ManagementFactory.getGarbageCollectorMXBeans()
+ val threadMXBean = ManagementFactory.getThreadMXBean()
+ var gcBeanCounters = Map[String, (Counter, Counter)]()
+ val executor = Executors.newScheduledThreadPool(1, new DaemonThreadFactory)
+
+ // jvm metrics
+ val gMemNonHeapUsedM = registry.newGauge[Float](group, "MemNonHeapUsedM", 0)
+ val gMemNonHeapCommittedM = registry.newGauge[Float](group, "MemNonHeapCommittedM", 0)
+ val gMemHeapUsedM = registry.newGauge[Float](group, "MemHeapUsedM", 0)
+ val gMemHeapCommittedM = registry.newGauge[Float](group, "MemHeapCommittedM", 0)
+ val gThreadsNew = registry.newGauge[Long](group, "ThreadsNew", 0)
+ val gThreadsRunnable = registry.newGauge[Long](group, "ThreadsRunnable", 0)
+ val gThreadsBlocked = registry.newGauge[Long](group, "ThreadsBlocked", 0)
+ val gThreadsWaiting = registry.newGauge[Long](group, "ThreadsWaiting", 0)
+ val gThreadsTimedWaiting = registry.newGauge[Long](group, "ThreadsTimedWaiting", 0)
+ val gThreadsTerminated = registry.newGauge[Long](group, "ThreadsTerminated", 0)
+ val cGcCount = registry.newCounter(group, "GcCount")
+ val cGcTimeMillis = registry.newCounter(group, "GcTimeMillis")
+
+ def start {
+ executor.scheduleWithFixedDelay(this, 0, 5, TimeUnit.SECONDS)
+ }
+
+ def run {
+ debug("updating jvm metrics")
+
+ updateMemoryUsage
+ updateGcUsage
+ updateThreadUsage
+
+ debug("updated metrics to: [%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s]" format
+ (gMemNonHeapUsedM, gMemNonHeapCommittedM, gMemHeapUsedM, gMemHeapCommittedM, gThreadsNew,
+ gThreadsRunnable, gThreadsBlocked, gThreadsWaiting, gThreadsTimedWaiting,
+ gThreadsTerminated, cGcCount, cGcTimeMillis))
+ }
+
+ def stop = executor.shutdown
+
+ private def updateMemoryUsage {
+ val memNonHeap = memoryMXBean.getNonHeapMemoryUsage()
+ val memHeap = memoryMXBean.getHeapMemoryUsage()
+ gMemNonHeapUsedM.set(memNonHeap.getUsed() / M)
+ gMemNonHeapCommittedM.set(memNonHeap.getCommitted() / M)
+ gMemHeapUsedM.set(memHeap.getUsed() / M)
+ gMemHeapCommittedM.set(memHeap.getCommitted() / M)
+ }
+
+ private def updateGcUsage {
+ var count = 0l
+ var timeMillis = 0l
+
+ gcBeans.foreach(gcBean => {
+ val c = gcBean.getCollectionCount()
+ val t = gcBean.getCollectionTime()
+ val gcInfo = getGcInfo(gcBean.getName)
+ gcInfo._1.inc(c - gcInfo._1.getCount())
+ gcInfo._2.inc(t - gcInfo._2.getCount())
+ count += c
+ timeMillis += t
+ })
+
+ cGcCount.inc(count - cGcCount.getCount())
+ cGcTimeMillis.inc(timeMillis - cGcTimeMillis.getCount())
+ }
+
+ private def getGcInfo(gcName: String): (Counter, Counter) = {
+ gcBeanCounters.get(gcName) match {
+ case Some(gcBeanCounterTuple) => gcBeanCounterTuple
+ case _ => {
+ val t = (registry.newCounter(group, "GcCount" + gcName), registry.newCounter(group, "GcTimeMillis" + gcName))
+ gcBeanCounters += (gcName -> t)
+ t
+ }
+ }
+ }
+
+ private def updateThreadUsage {
+ var threadsNew = 0l
+ var threadsRunnable = 0l
+ var threadsBlocked = 0l
+ var threadsWaiting = 0l
+ var threadsTimedWaiting = 0l
+ var threadsTerminated = 0l
+ var threadIds = threadMXBean.getAllThreadIds
+
+ threadMXBean.getThreadInfo(threadIds, 0).foreach(threadInfo =>
+ Option(threadInfo) match {
+ case Some(threadInfo) => {
+ threadInfo.getThreadState match {
+ case NEW => threadsNew += 1
+ case RUNNABLE => threadsRunnable += 1
+ case BLOCKED => threadsBlocked += 1
+ case WAITING => threadsWaiting += 1
+ case TIMED_WAITING => threadsTimedWaiting += 1
+ case TERMINATED => threadsTerminated += 1
+ }
+ }
+ case _ => // race protection
+ })
+
+ gThreadsNew.set(threadsNew)
+ gThreadsRunnable.set(threadsRunnable)
+ gThreadsBlocked.set(threadsBlocked)
+ gThreadsWaiting.set(threadsWaiting)
+ gThreadsTimedWaiting.set(threadsTimedWaiting)
+ gThreadsTerminated.set(threadsTerminated)
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/5ff71e51/samza-core/src/main/scala/org/apache/samza/metrics/MetricsRegistryMap.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/metrics/MetricsRegistryMap.scala b/samza-core/src/main/scala/org/apache/samza/metrics/MetricsRegistryMap.scala
new file mode 100644
index 0000000..fc0bd38
--- /dev/null
+++ b/samza-core/src/main/scala/org/apache/samza/metrics/MetricsRegistryMap.scala
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.metrics
+import grizzled.slf4j.Logging
+import java.util.concurrent.ConcurrentHashMap
+
+/**
+ * A class that holds all metrics registered with it. It can be registered
+ * with one or more MetricReporters to flush metrics.
+ */
+class MetricsRegistryMap extends ReadableMetricsRegistry with Logging {
+ var listeners = Set[ReadableMetricsRegistryListener]()
+
+ /*
+ * groupName -> metricName -> metric
+ */
+ val metrics = new ConcurrentHashMap[String, ConcurrentHashMap[String, Metric]]
+
+ def newCounter(group: String, name: String) = {
+ debug("Creating new counter %s %s." format (group, name))
+ putAndGetGroup(group).putIfAbsent(name, new Counter(name))
+ val counter = metrics.get(group).get(name).asInstanceOf[Counter]
+ listeners.foreach(_.onCounter(group, counter))
+ counter
+ }
+
+ def newGauge[T](group: String, name: String, value: T) = {
+ debug("Creating new gauge %s %s %s." format (group, name, value))
+ putAndGetGroup(group).putIfAbsent(name, new Gauge[T](name, value))
+ val gauge = metrics.get(group).get(name).asInstanceOf[Gauge[T]]
+ listeners.foreach(_.onGauge(group, gauge))
+ gauge
+ }
+
+ private def putAndGetGroup(group: String) = {
+ metrics.putIfAbsent(group, new ConcurrentHashMap[String, Metric])
+ metrics.get(group)
+ }
+
+ def getGroups = metrics.keySet()
+
+ def getGroup(group: String) = metrics.get(group)
+
+ override def toString() = metrics.toString
+
+ def listen(listener: ReadableMetricsRegistryListener) {
+ listeners += listener
+ }
+
+ def unlisten(listener: ReadableMetricsRegistryListener) {
+ listeners -= listener
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/5ff71e51/samza-core/src/main/scala/org/apache/samza/metrics/reporter/JmxReporter.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/metrics/reporter/JmxReporter.scala b/samza-core/src/main/scala/org/apache/samza/metrics/reporter/JmxReporter.scala
new file mode 100644
index 0000000..8814e68
--- /dev/null
+++ b/samza-core/src/main/scala/org/apache/samza/metrics/reporter/JmxReporter.scala
@@ -0,0 +1,145 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.metrics.reporter
+
+import java.lang.management.ManagementFactory
+import grizzled.slf4j.Logging
+import javax.management.MBeanServer
+import javax.management.ObjectName
+import org.apache.samza.config.Config
+import org.apache.samza.metrics.Counter
+import org.apache.samza.metrics.Gauge
+import org.apache.samza.metrics.MetricsReporter
+import org.apache.samza.metrics.MetricsReporterFactory
+import org.apache.samza.metrics.ReadableMetricsRegistry
+import org.apache.samza.metrics.ReadableMetricsRegistryListener
+import scala.collection.JavaConversions._
+import org.apache.samza.metrics.MetricsVisitor
+
+class JmxReporter(server: MBeanServer) extends MetricsReporter with Logging {
+ var sources = Map[ReadableMetricsRegistry, String]()
+ var listeners = Map[ReadableMetricsRegistry, ReadableMetricsRegistryListener]()
+
+ def start() {
+ for ((registry, listener) <- listeners) {
+ // First, add a listener for all new metrics that are added.
+ registry.listen(listener)
+
+ // Second, add all existing metrics.
+ registry.getGroups.foreach(group => {
+ registry.getGroup(group).foreach {
+ case (name, metric) =>
+ metric.visit(new MetricsVisitor {
+ def counter(counter: Counter) = registerBean(new JmxCounter(counter, getObjectName(group, name, sources(registry))));
+ def gauge[T](gauge: Gauge[T]) = registerBean(new JmxGauge(gauge.asInstanceOf[Gauge[Object]], getObjectName(group, name, sources(registry))))
+ })
+ }
+ })
+ }
+ }
+
+ def register(source: String, registry: ReadableMetricsRegistry) {
+ if (!listeners.contains(registry)) {
+ sources += registry -> source
+ listeners += registry -> new ReadableMetricsRegistryListener {
+ def onCounter(group: String, counter: Counter) {
+ registerBean(new JmxCounter(counter, getObjectName(group, counter.getName, source)))
+ }
+
+ def onGauge(group: String, gauge: Gauge[_]) {
+ registerBean(new JmxGauge(gauge.asInstanceOf[Gauge[Object]], getObjectName(group, gauge.getName, source)))
+ }
+ }
+ } else {
+ warn("Trying to re-register a registry for source %s. Ignoring." format source)
+ }
+ }
+
+ def stop() {
+ for ((registry, listener) <- listeners) {
+ registry.unlisten(listener)
+ }
+ }
+
+ def getObjectName(group: String, name: String, t: String) = {
+ val nameBuilder = new StringBuilder
+ nameBuilder.append(makeNameJmxSafe(group))
+ nameBuilder.append(":type=")
+ nameBuilder.append(makeNameJmxSafe(t))
+ nameBuilder.append(",name=")
+ nameBuilder.append(makeNameJmxSafe(name))
+ val objName = new ObjectName(nameBuilder.toString)
+ debug("Resolved name for %s, %s, %s to: %s" format (group, name, t, objName))
+ objName
+ }
+
+ /*
+ * JMX only has ObjectName.quote, which is pretty nasty looking. This
+ * function escapes without quoting, using the rules outlined in:
+ * http://docs.oracle.com/javase/1.5.0/docs/api/javax/management/ObjectName.html
+ */
+ def makeNameJmxSafe(str: String) = str
+ .replace(",", "_")
+ .replace("=", "_")
+ .replace(":", "_")
+ .replace("\"", "_")
+ .replace("*", "_")
+ .replace("?", "_")
+
+ def registerBean(bean: MetricMBean) {
+ if (!server.isRegistered(bean.objectName)) {
+ debug("Registering MBean for %s." format bean.objectName)
+ server.registerMBean(bean, bean.objectName);
+ }
+ }
+}
+
+trait MetricMBean {
+ def objectName(): ObjectName
+}
+
+abstract class AbstractBean(val on: ObjectName) extends MetricMBean {
+ override def objectName = on
+}
+
+trait JmxGaugeMBean extends MetricMBean {
+ def getValue(): Object
+}
+
+class JmxGauge(g: org.apache.samza.metrics.Gauge[Object], on: ObjectName) extends JmxGaugeMBean {
+ def getValue = g.getValue
+ def objectName = on
+}
+
+trait JmxCounterMBean extends MetricMBean {
+ def getCount(): Long
+}
+
+class JmxCounter(c: org.apache.samza.metrics.Counter, on: ObjectName) extends JmxCounterMBean {
+ def getCount() = c.getCount()
+ def objectName = on
+}
+
+class JmxReporterFactory extends MetricsReporterFactory with Logging {
+ def getMetricsReporter(name: String, containerName: String, config: Config) = {
+ info("Creating JMX reporter with name %s." format name)
+ new JmxReporter(ManagementFactory.getPlatformMBeanServer)
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/5ff71e51/samza-core/src/main/scala/org/apache/samza/metrics/reporter/Metrics.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/metrics/reporter/Metrics.scala b/samza-core/src/main/scala/org/apache/samza/metrics/reporter/Metrics.scala
new file mode 100644
index 0000000..d7aec8b
--- /dev/null
+++ b/samza-core/src/main/scala/org/apache/samza/metrics/reporter/Metrics.scala
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.metrics.reporter
+
+import java.util.Collections
+import java.util.HashMap
+import java.util.Map
+import scala.collection.JavaConversions._
+
+object Metrics {
+ def fromMap(map: Map[String, Map[String, Object]]): Metrics = {
+ new Metrics(map)
+ }
+}
+
+/**
+ * Immutable metrics snapshot.
+ */
+class Metrics(metrics: Map[String, Map[String, Object]]) {
+ val immutableMetrics = new HashMap[String, Map[String, Object]]
+
+ for (groupEntry <- metrics.entrySet) {
+ val immutableMetricGroup = new HashMap[String, Object]
+
+ for (metricEntry <- groupEntry.getValue.asInstanceOf[Map[String, Object]].entrySet) {
+ immutableMetricGroup.put(metricEntry.getKey, metricEntry.getValue)
+ }
+
+ immutableMetrics.put(groupEntry.getKey, Collections.unmodifiableMap(immutableMetricGroup))
+ }
+
+ def get[T](group: String, metricName: String) =
+ immutableMetrics.get(group).get(metricName).asInstanceOf[T]
+
+ def get(group: String) = immutableMetrics.get(group)
+
+ def getAsMap(): Map[String, Map[String, Object]] = Collections.unmodifiableMap(immutableMetrics)
+}
http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/5ff71e51/samza-core/src/main/scala/org/apache/samza/metrics/reporter/MetricsHeader.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/metrics/reporter/MetricsHeader.scala b/samza-core/src/main/scala/org/apache/samza/metrics/reporter/MetricsHeader.scala
new file mode 100644
index 0000000..369c718
--- /dev/null
+++ b/samza-core/src/main/scala/org/apache/samza/metrics/reporter/MetricsHeader.scala
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.metrics.reporter
+
+import java.util.HashMap
+import java.util.Map
+import scala.reflect.BeanProperty
+
+object MetricsHeader {
+ def fromMap(map: Map[String, Object]): MetricsHeader = {
+ new MetricsHeader(
+ map.get("job-name").toString,
+ map.get("job-id").toString,
+ map.get("container-name").toString,
+ map.get("source").toString,
+ map.get("version").toString,
+ map.get("samza-version").toString,
+ map.get("host").toString,
+ map.get("time").asInstanceOf[Number].longValue,
+ map.get("reset-time").asInstanceOf[Number].longValue)
+ }
+}
+
+/**
+ * Immutable metric header snapshot.
+ */
+class MetricsHeader(
+ @BeanProperty val jobName: String,
+ @BeanProperty val jobId: String,
+ @BeanProperty val containerName: String,
+ @BeanProperty val source: String,
+ @BeanProperty val version: String,
+ @BeanProperty val samzaVersion: String,
+ @BeanProperty val host: String,
+ @BeanProperty val time: Long,
+ @BeanProperty val resetTime: Long) {
+
+ def getAsMap: Map[String, Object] = {
+ val map = new HashMap[String, Object]
+ map.put("job-name", jobName)
+ map.put("job-id", jobId)
+ map.put("container-name", containerName)
+ map.put("source", source)
+ map.put("version", version)
+ map.put("samza-version", samzaVersion)
+ map.put("host", host)
+ map.put("time", time: java.lang.Long)
+ map.put("reset-time", resetTime: java.lang.Long)
+ map
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/5ff71e51/samza-core/src/main/scala/org/apache/samza/metrics/reporter/MetricsSnapshot.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/metrics/reporter/MetricsSnapshot.scala b/samza-core/src/main/scala/org/apache/samza/metrics/reporter/MetricsSnapshot.scala
new file mode 100644
index 0000000..da775f7
--- /dev/null
+++ b/samza-core/src/main/scala/org/apache/samza/metrics/reporter/MetricsSnapshot.scala
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.metrics.reporter
+
+import java.util.Map
+import java.util.HashMap
+import scala.reflect.BeanProperty
+
+object MetricsSnapshot {
+ def fromMap(map: Map[String, Map[String, Object]]) = {
+ val header = MetricsHeader.fromMap(map.get("header"))
+ val metrics = Metrics.fromMap(map.get("metrics").asInstanceOf[Map[String, Map[String, Object]]])
+ new MetricsSnapshot(header, metrics)
+ }
+}
+
+class MetricsSnapshot(@BeanProperty val header: MetricsHeader, @BeanProperty val metrics: Metrics) {
+ def getAsMap(): Map[String, Object] = {
+ val map = new HashMap[String, Object]
+
+ map.put("header", header.getAsMap)
+ map.put("metrics", metrics.getAsMap)
+
+ map
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/5ff71e51/samza-core/src/main/scala/org/apache/samza/metrics/reporter/MetricsSnapshotReporter.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/metrics/reporter/MetricsSnapshotReporter.scala b/samza-core/src/main/scala/org/apache/samza/metrics/reporter/MetricsSnapshotReporter.scala
new file mode 100644
index 0000000..79c647e
--- /dev/null
+++ b/samza-core/src/main/scala/org/apache/samza/metrics/reporter/MetricsSnapshotReporter.scala
@@ -0,0 +1,144 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.metrics.reporter
+
+import java.util.HashMap
+import java.util.Map
+import scala.collection.JavaConversions._
+import grizzled.slf4j.Logging
+import org.apache.samza.metrics.Counter
+import org.apache.samza.metrics.Gauge
+import org.apache.samza.metrics.MetricsReporter
+import org.apache.samza.metrics.MetricsVisitor
+import org.apache.samza.metrics.ReadableMetricsRegistry
+import java.util.concurrent.Executors
+import org.apache.samza.util.DaemonThreadFactory
+import java.util.concurrent.TimeUnit
+import org.apache.samza.serializers.Serializer
+import org.apache.samza.system.SystemProducer
+import org.apache.samza.system.SystemStream
+import org.apache.samza.system.OutgoingMessageEnvelope
+
+/**
+ * MetricsSnapshotReporter is a generic metrics reporter that sends metrics to a stream.
+ *
+ * jobName // my-samza-job
+ * jobId // an id that differentiates multiple executions of the same job
+ * taskName // container_567890
+ * host // eat1-app128.gird
+ * version // 0.0.1
+ */
+class MetricsSnapshotReporter(
+ producer: SystemProducer,
+ out: SystemStream,
+ jobName: String,
+ jobId: String,
+ containerName: String,
+ version: String,
+ samzaVersion: String,
+ host: String,
+ serializer: Serializer[MetricsSnapshot] = null,
+ clock: () => Long = () => { System.currentTimeMillis }) extends MetricsReporter with Runnable with Logging {
+
+ val executor = Executors.newScheduledThreadPool(1, new DaemonThreadFactory)
+ val resetTime = clock()
+ var registries = List[(String, ReadableMetricsRegistry)]()
+
+ info("got metrics snapshot reporter properties [job name: %s, job id: %s, containerName: %s, version: %s, samzaVersion: %s, host: %s]"
+ format (jobName, jobId, containerName, version, samzaVersion, host))
+
+ def start {
+ info("Starting producer.")
+
+ producer.start
+
+ info("Starting reporter timer.")
+
+ // TODO could make this configurable.
+ executor.scheduleWithFixedDelay(this, 0, 60, TimeUnit.SECONDS)
+ }
+
+ def register(source: String, registry: ReadableMetricsRegistry) {
+ registries ::= (source, registry)
+
+ info("Registering %s with producer." format source)
+
+ producer.register(source)
+ }
+
+ def stop = {
+ info("Stopping producer.")
+
+ producer.stop
+
+ info("Stopping reporter timer.")
+
+ executor.shutdown
+ executor.awaitTermination(60, TimeUnit.SECONDS)
+
+ if (!executor.isTerminated) {
+ warn("Unable to shutdown reporter timer.")
+ }
+ }
+
+ def run {
+ debug("Begin flushing metrics.")
+
+ for ((source, registry) <- registries) {
+ debug("Flushing metrics for %s." format source)
+
+ val metricsMsg = new HashMap[String, Map[String, Object]]
+
+ // metrics
+ registry.getGroups.foreach(group => {
+ val groupMsg = new HashMap[String, Object]
+
+ registry.getGroup(group).foreach {
+ case (name, metric) =>
+ metric.visit(new MetricsVisitor {
+ def counter(counter: Counter) = groupMsg.put(name, counter.getCount: java.lang.Long)
+ def gauge[T](gauge: Gauge[T]) = groupMsg.put(name, gauge.getValue.asInstanceOf[Object])
+ })
+ }
+
+ metricsMsg.put(group, groupMsg)
+ })
+
+ val header = new MetricsHeader(jobName, jobId, containerName, source, version, samzaVersion, host, clock(), resetTime)
+ val metrics = new Metrics(metricsMsg)
+
+ debug("Flushing metrics for %s to %s with header and map: header=%s, map=%s." format (source, out, header.getAsMap, metrics.getAsMap))
+
+ val metricsSnapshot = new MetricsSnapshot(header, metrics)
+ val maybeSerialized = if (serializer != null) {
+ serializer.toBytes(metricsSnapshot)
+ } else {
+ metricsSnapshot
+ }
+
+ producer.send(source, new OutgoingMessageEnvelope(out, host, null, maybeSerialized))
+
+ // Always commit, since we don't want metrics to get batched up.
+ producer.commit(source)
+ }
+
+ debug("Finished flushing metrics.")
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/5ff71e51/samza-core/src/main/scala/org/apache/samza/metrics/reporter/MetricsSnapshotReporterFactory.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/metrics/reporter/MetricsSnapshotReporterFactory.scala b/samza-core/src/main/scala/org/apache/samza/metrics/reporter/MetricsSnapshotReporterFactory.scala
new file mode 100644
index 0000000..f20dc36
--- /dev/null
+++ b/samza-core/src/main/scala/org/apache/samza/metrics/reporter/MetricsSnapshotReporterFactory.scala
@@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.metrics.reporter
+
+import java.net.InetAddress
+import grizzled.slf4j.Logging
+import org.apache.samza.SamzaException
+import org.apache.samza.config.Config
+import org.apache.samza.config.JobConfig.Config2Job
+import org.apache.samza.config.MetricsConfig.Config2Metrics
+import org.apache.samza.config.SystemConfig.Config2System
+import org.apache.samza.config.StreamConfig.Config2Stream
+import org.apache.samza.config.SerializerConfig.Config2Serializer
+import org.apache.samza.config.TaskConfig.Config2Task
+import org.apache.samza.metrics.MetricsReporter
+import org.apache.samza.metrics.MetricsReporterFactory
+import org.apache.samza.util.Util
+import org.apache.samza.metrics.ReadableMetricsRegistry
+import org.apache.samza.metrics.MetricsRegistryMap
+import org.apache.samza.serializers.Serializer
+import org.apache.samza.serializers.SerdeFactory
+import org.apache.samza.system.SystemFactory
+
+class MetricsSnapshotReporterFactory extends MetricsReporterFactory with Logging {
+ def getMetricsReporter(name: String, containerName: String, config: Config): MetricsReporter = {
+ info("Creating new metrics snapshot reporter.")
+
+ val jobName = config
+ .getName
+ .getOrElse(throw new SamzaException("Job name must be defined in config."))
+
+ val jobId = config
+ .getJobId
+ .getOrElse(1.toString)
+
+ val taskClass = config
+ .getTaskClass
+ .getOrElse(throw new SamzaException("No task class defined for config."))
+
+ val version = Option(Class.forName(taskClass).getPackage.getImplementationVersion)
+ .getOrElse({
+ warn("Unable to find implementation version in jar's meta info. Defaulting to 0.0.1.")
+ "0.0.1"
+ })
+
+ val samzaVersion = Option(classOf[MetricsSnapshotReporterFactory].getPackage.getImplementationVersion)
+ .getOrElse({
+ warn("Unable to find implementation samza version in jar's meta info. Defaulting to 0.0.1.")
+ "0.0.1"
+ })
+
+ val metricsSystemStreamName = config
+ .getMetricsReporterStream(name)
+ .getOrElse(throw new SamzaException("No metrics stream defined in config."))
+
+ val systemStream = Util.getSystemStreamFromNames(metricsSystemStreamName)
+
+ info("Got system stream %s." format systemStream)
+
+ val systemName = systemStream.getSystem
+
+ val systemFactoryClassName = config
+ .getSystemFactory(systemName)
+ .getOrElse(throw new SamzaException("Trying to fetch system factory for system %s, which isn't defined in config." format systemName))
+
+ val systemFactory = Util.getObj[SystemFactory](systemFactoryClassName)
+
+ info("Got system factory %s." format systemFactory)
+
+ val registry = new MetricsRegistryMap
+
+ val producer = systemFactory.getProducer(systemName, config, registry)
+
+ info("Got producer %s." format producer)
+
+ val streamSerdeName = config.getStreamMsgSerde(systemStream)
+ val systemSerdeName = config.getSystemMsgSerde(systemName)
+ val serdeName = streamSerdeName.getOrElse(systemSerdeName.getOrElse(null))
+ val serde = if (serdeName != null) {
+ config.getSerdeClass(serdeName) match {
+ case Some(serdeName) =>
+ Util
+ .getObj[SerdeFactory[MetricsSnapshot]](serdeName)
+ .getSerde(serdeName, config)
+ case _ => null
+ }
+ } else {
+ null
+ }
+
+ info("Got serde %s." format serde)
+
+ val reporter = new MetricsSnapshotReporter(
+ producer,
+ systemStream,
+ jobName,
+ jobId,
+ containerName,
+ version,
+ samzaVersion,
+ InetAddress.getLocalHost().getHostName(),
+ serde)
+
+ reporter.register(this.getClass.getSimpleName.toString, registry)
+
+ reporter
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/5ff71e51/samza-core/src/main/scala/org/apache/samza/serializers/CheckpointSerde.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/serializers/CheckpointSerde.scala b/samza-core/src/main/scala/org/apache/samza/serializers/CheckpointSerde.scala
new file mode 100644
index 0000000..574c584
--- /dev/null
+++ b/samza-core/src/main/scala/org/apache/samza/serializers/CheckpointSerde.scala
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.serializers
+
+import scala.collection.JavaConversions._
+import org.codehaus.jackson.map.ObjectMapper
+import org.apache.samza.system.SystemStream
+import org.apache.samza.checkpoint.Checkpoint
+
+class CheckpointSerde extends Serde[Checkpoint] {
+ val jsonMapper = new ObjectMapper()
+
+ def fromBytes(bytes: Array[Byte]): Checkpoint = {
+ try {
+ val checkpointMap = jsonMapper
+ .readValue(bytes, classOf[java.util.Map[String, java.util.Map[String, String]]])
+ .flatMap {
+ case (systemName, streamToOffsetMap) =>
+ streamToOffsetMap.map { case (streamName, offset) => (new SystemStream(systemName, streamName), offset) }
+ }
+ return new Checkpoint(checkpointMap)
+ } catch {
+ case _ => return null
+ }
+ }
+
+ def toBytes(checkpoint: Checkpoint) = {
+ val offsetMap = asJavaMap(checkpoint
+ .getOffsets
+ // Convert Map[SystemStream, String] offset map to a iterable of tuples (system, stream, offset)
+ .map { case (systemStream, offset) => (systemStream.getSystem, systemStream.getStream, offset) }
+ // Group into a Map[String, (String, String, String)] by system
+ .groupBy(_._1)
+ // Group the tuples for each system into a Map[String, String] for stream to offsets
+ .map {
+ case (systemName, tuples) =>
+ val streamToOffestMap = asJavaMap(tuples
+ // Group the tuples by stream name
+ .groupBy(_._2)
+ // There should only ever be one SystemStream to offset mapping, so just
+ // grab the first element from the tuple list for each stream.
+ .map { case (streamName, tuples) => (streamName, tuples.head._3) }
+ .toMap)
+ (systemName, streamToOffestMap)
+ }.toMap)
+
+ jsonMapper.writeValueAsBytes(offsetMap)
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/5ff71e51/samza-core/src/main/scala/org/apache/samza/serializers/SerdeManager.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/serializers/SerdeManager.scala b/samza-core/src/main/scala/org/apache/samza/serializers/SerdeManager.scala
new file mode 100644
index 0000000..4f3ff6e
--- /dev/null
+++ b/samza-core/src/main/scala/org/apache/samza/serializers/SerdeManager.scala
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.serializers
+
+import org.apache.samza.SamzaException
+import org.apache.samza.config.SerializerConfig
+import org.apache.samza.system.SystemStream
+import org.apache.samza.system.OutgoingMessageEnvelope
+import org.apache.samza.system.IncomingMessageEnvelope
+
+class SerdeManager(
+ serdes: Map[String, Serde[Object]] = Map(),
+ systemKeySerdes: Map[String, Serde[Object]] = Map(),
+ systemMessageSerdes: Map[String, Serde[Object]] = Map(),
+ systemStreamKeySerdes: Map[SystemStream, Serde[Object]] = Map(),
+ systemStreamMessageSerdes: Map[SystemStream, Serde[Object]] = Map(),
+ changeLogSystemStreams: Set[SystemStream] = Set()) {
+
+ def toBytes(obj: Object, serializerName: String) = serdes
+ .getOrElse(serializerName, throw new SamzaException("No serde defined for %s" format serializerName))
+ .toBytes(obj)
+
+ def toBytes(envelope: OutgoingMessageEnvelope): OutgoingMessageEnvelope = {
+ val key = if (changeLogSystemStreams.contains(envelope.getSystemStream)) {
+ // If the stream is a change log stream, don't do any serde. It is up to storage engines to handle serde.
+ envelope.getKey
+ } else if (envelope.getKeySerializerName != null) {
+ // If a serde is defined, use it.
+ toBytes(envelope.getKey, envelope.getKeySerializerName)
+ } else if (systemStreamKeySerdes.contains(envelope.getSystemStream)) {
+ // If the stream has a serde defined, use it.
+ systemStreamKeySerdes(envelope.getSystemStream).toBytes(envelope.getKey)
+ } else if (systemKeySerdes.contains(envelope.getSystemStream.getSystem)) {
+ // If the system has a serde defined, use it.
+ systemKeySerdes(envelope.getSystemStream.getSystem).toBytes(envelope.getKey)
+ } else {
+ // Just use the object.
+ envelope.getKey
+ }
+
+ val message = if (changeLogSystemStreams.contains(envelope.getSystemStream)) {
+ // If the stream is a change log stream, don't do any serde. It is up to storage engines to handle serde.
+ envelope.getMessage
+ } else if (envelope.getMessageSerializerName != null) {
+ // If a serde is defined, use it.
+ toBytes(envelope.getMessage, envelope.getMessageSerializerName)
+ } else if (systemStreamMessageSerdes.contains(envelope.getSystemStream)) {
+ // If the stream has a serde defined, use it.
+ systemStreamMessageSerdes(envelope.getSystemStream).toBytes(envelope.getMessage)
+ } else if (systemMessageSerdes.contains(envelope.getSystemStream.getSystem)) {
+ // If the system has a serde defined, use it.
+ systemMessageSerdes(envelope.getSystemStream.getSystem).toBytes(envelope.getMessage)
+ } else {
+ // Just use the object.
+ envelope.getMessage
+ }
+
+ new OutgoingMessageEnvelope(
+ envelope.getSystemStream,
+ null,
+ null,
+ envelope.getPartitionKey,
+ key,
+ message)
+ }
+
+ def fromBytes(bytes: Array[Byte], deserializerName: String) = serdes
+ .getOrElse(deserializerName, throw new SamzaException("No serde defined for %s" format deserializerName))
+ .fromBytes(bytes)
+
+ def fromBytes(envelope: IncomingMessageEnvelope) = {
+ val key = if (changeLogSystemStreams.contains(envelope.getSystemStreamPartition.getSystemStream)) {
+ // If the stream is a change log stream, don't do any serde. It is up to storage engines to handle serde.
+ envelope.getKey
+ } else if (systemStreamKeySerdes.contains(envelope.getSystemStreamPartition)) {
+ // If the stream has a serde defined, use it.
+ systemStreamKeySerdes(envelope.getSystemStreamPartition).fromBytes(envelope.getKey.asInstanceOf[Array[Byte]])
+ } else if (systemKeySerdes.contains(envelope.getSystemStreamPartition.getSystem)) {
+ // If the system has a serde defined, use it.
+ systemKeySerdes(envelope.getSystemStreamPartition.getSystem).fromBytes(envelope.getKey.asInstanceOf[Array[Byte]])
+ } else {
+ // Just use the object.
+ envelope.getKey
+ }
+
+ val message = if (changeLogSystemStreams.contains(envelope.getSystemStreamPartition.getSystemStream)) {
+ // If the stream is a change log stream, don't do any serde. It is up to storage engines to handle serde.
+ envelope.getMessage
+ } else if (systemStreamMessageSerdes.contains(envelope.getSystemStreamPartition)) {
+ // If the stream has a serde defined, use it.
+ systemStreamMessageSerdes(envelope.getSystemStreamPartition).fromBytes(envelope.getMessage.asInstanceOf[Array[Byte]])
+ } else if (systemMessageSerdes.contains(envelope.getSystemStreamPartition.getSystem)) {
+ // If the system has a serde defined, use it.
+ systemMessageSerdes(envelope.getSystemStreamPartition.getSystem).fromBytes(envelope.getMessage.asInstanceOf[Array[Byte]])
+ } else {
+ // Just use the object.
+ envelope.getMessage
+ }
+
+ new IncomingMessageEnvelope(
+ envelope.getSystemStreamPartition,
+ envelope.getOffset,
+ key,
+ message)
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/5ff71e51/samza-core/src/main/scala/org/apache/samza/serializers/StringSerde.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/serializers/StringSerde.scala b/samza-core/src/main/scala/org/apache/samza/serializers/StringSerde.scala
new file mode 100644
index 0000000..45cde95
--- /dev/null
+++ b/samza-core/src/main/scala/org/apache/samza/serializers/StringSerde.scala
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.serializers
+
+import java.nio.ByteBuffer
+import org.apache.samza.config.Config
+
+/**
+ * A serializer for strings
+ */
+class StringSerdeFactory extends SerdeFactory[String] {
+ def getSerde(name: String, config: Config): Serde[String] =
+ new StringSerde(config.get("encoding", "UTF-8"))
+}
+
+class StringSerde(val encoding: String) extends Serde[String] {
+ def toBytes(obj: String): Array[Byte] =
+ obj.toString.getBytes(encoding)
+
+ def fromBytes(bytes: Array[Byte]): String =
+ new String(bytes, 0, bytes.size, encoding)
+}
http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/5ff71e51/samza-core/src/main/scala/org/apache/samza/storage/TaskStorageManager.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/storage/TaskStorageManager.scala b/samza-core/src/main/scala/org/apache/samza/storage/TaskStorageManager.scala
new file mode 100644
index 0000000..f4c0194
--- /dev/null
+++ b/samza-core/src/main/scala/org/apache/samza/storage/TaskStorageManager.scala
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.storage
+
+import java.io.File
+import scala.collection.Map
+import grizzled.slf4j.Logging
+import org.apache.samza.Partition
+import org.apache.samza.system.SystemConsumer
+import org.apache.samza.system.SystemStream
+import org.apache.samza.system.SystemStreamPartition
+import org.apache.samza.system.SystemStreamPartitionIterator
+import org.apache.samza.task.MessageCollector
+import org.apache.samza.util.Util
+
+object TaskStorageManager {
+ def getStoreDir(storeBaseDir: File, storeName: String) = {
+ new File(storeBaseDir, storeName)
+ }
+
+ def getStorePartitionDir(storeBaseDir: File, storeName: String, partition: Partition) = {
+ new File(storeBaseDir, storeName + File.separator + partition.getPartitionId)
+ }
+}
+
+/**
+ * Manage all the storage engines for a given task
+ */
+class TaskStorageManager(
+ partition: Partition,
+ taskStores: Map[String, StorageEngine] = Map(),
+ storeConsumers: Map[String, SystemConsumer] = Map(),
+ changeLogSystemStreams: Map[String, SystemStream] = Map(),
+ storeBaseDir: File = new File(System.getProperty("user.dir"), "state")) extends Logging {
+
+ def apply(storageEngineName: String) = taskStores(storageEngineName)
+
+ def init(collector: MessageCollector) {
+ cleanBaseDirs
+ startConsumers
+ restoreStores(collector)
+ stopConsumers
+ }
+
+ private def cleanBaseDirs {
+ debug("Cleaning base directories for stores.")
+
+ taskStores.keys.foreach(storeName => {
+ val storagePartitionDir = TaskStorageManager.getStorePartitionDir(storeBaseDir, storeName, partition)
+
+ debug("Cleaning %s for store %s." format (storagePartitionDir, storeName))
+
+ Util.rm(storagePartitionDir)
+ storagePartitionDir.mkdirs
+ })
+ }
+
+ private def startConsumers {
+ debug("Starting consumers for stores.")
+
+ for ((storeName, systemStream) <- changeLogSystemStreams) {
+ val systemStreamPartition = new SystemStreamPartition(systemStream, partition)
+ val consumer = storeConsumers(storeName)
+
+ debug("Registering consumer for system stream partition %s." format systemStreamPartition)
+
+ consumer.register(systemStreamPartition, null)
+ }
+
+ storeConsumers.values.foreach(_.start)
+ }
+
+ private def restoreStores(collector: MessageCollector) {
+ debug("Restoring stores.")
+
+ for ((storeName, store) <- taskStores) {
+ if (changeLogSystemStreams.contains(storeName)) {
+ val systemStream = changeLogSystemStreams(storeName)
+ val systemStreamPartition = new SystemStreamPartition(systemStream, partition)
+ val systemConsumer = storeConsumers(storeName)
+ val systemConsumerIterator = new SystemStreamPartitionIterator(systemConsumer, systemStreamPartition);
+ store.restore(systemConsumerIterator)
+ }
+ }
+ }
+
+ private def stopConsumers {
+ debug("Stopping consumers for stores.")
+
+ storeConsumers.values.foreach(_.stop)
+ }
+
+ def flush() {
+ debug("Flushing stores.")
+
+ taskStores.values.foreach(_.flush)
+ }
+
+ def stop() {
+ debug("Stopping stores.")
+
+ taskStores.values.foreach(_.stop)
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/5ff71e51/samza-core/src/main/scala/org/apache/samza/system/DefaultPicker.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/system/DefaultPicker.scala b/samza-core/src/main/scala/org/apache/samza/system/DefaultPicker.scala
new file mode 100644
index 0000000..8bad75c
--- /dev/null
+++ b/samza-core/src/main/scala/org/apache/samza/system/DefaultPicker.scala
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.system
+
+import java.util.ArrayDeque
+
+class DefaultPicker extends IncomingMessageEnvelopePicker {
+ var q = new ArrayDeque[IncomingMessageEnvelope]()
+ def update(envelope: IncomingMessageEnvelope) = q.add(envelope)
+ def pick = q.poll
+}
http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/5ff71e51/samza-core/src/main/scala/org/apache/samza/system/SystemConsumers.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/system/SystemConsumers.scala b/samza-core/src/main/scala/org/apache/samza/system/SystemConsumers.scala
new file mode 100644
index 0000000..2e6f3b8
--- /dev/null
+++ b/samza-core/src/main/scala/org/apache/samza/system/SystemConsumers.scala
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.system
+
+import scala.collection.JavaConversions._
+import scala.collection.mutable.Queue
+import grizzled.slf4j.Logging
+import org.apache.samza.serializers.SerdeManager
+
+class SystemConsumers(
+ picker: IncomingMessageEnvelopePicker,
+ consumers: Map[String, SystemConsumer],
+ serdeManager: SerdeManager,
+ maxMsgsPerStreamPartition: Int = 1000,
+ noNewMessagesTimeout: Long = 10) extends Logging {
+
+ // TODO add metrics
+
+ var unprocessedMessages = Map[SystemStreamPartition, Queue[IncomingMessageEnvelope]]()
+ var neededByPicker = Set[SystemStreamPartition]()
+ var fetchMap = Map[SystemStreamPartition, java.lang.Integer]()
+ var timeout = noNewMessagesTimeout
+
+ debug("Got stream consumers: %s" format consumers)
+ debug("Got max messages per stream: %s" format maxMsgsPerStreamPartition)
+ debug("Got no new message timeout: %s" format noNewMessagesTimeout)
+
+ def start {
+ debug("Starting consumers.")
+
+ consumers.values.foreach(_.start)
+ }
+
+ def stop {
+ debug("Stopping consumers.")
+
+ consumers.values.foreach(_.stop)
+ }
+
+ def register(systemStreamPartition: SystemStreamPartition, lastReadOffset: String) {
+ debug("Registering stream: %s, %s" format (systemStreamPartition, lastReadOffset))
+
+ neededByPicker += systemStreamPartition
+ fetchMap += systemStreamPartition -> maxMsgsPerStreamPartition
+ unprocessedMessages += systemStreamPartition -> Queue[IncomingMessageEnvelope]()
+ consumers(systemStreamPartition.getSystem).register(systemStreamPartition, lastReadOffset)
+ }
+
+ def pick = {
+ val picked = picker.pick
+
+ if (picked == null) {
+ debug("Picker returned null.")
+
+ // Allow blocking if the picker didn't pick a message.
+ timeout = noNewMessagesTimeout
+ } else {
+ debug("Picker returned an incoming message envelope: %s" format picked)
+
+ // Don't block if we have a message to process.
+ timeout = 0
+
+ // Ok to give the picker a new message from this stream.
+ neededByPicker += picked.getSystemStreamPartition
+ }
+
+ refresh
+ picked
+ }
+
+ private def refresh {
+ debug("Refreshing picker with new messages.")
+
+ // Poll every system for new messages.
+ consumers.keys.foreach(poll(_))
+
+ // Update the picker.
+ neededByPicker.foreach(systemStreamPartition =>
+ // If we have messages for a stream that the picker needs, then update.
+ if (fetchMap(systemStreamPartition).intValue < maxMsgsPerStreamPartition) {
+ picker.update(unprocessedMessages(systemStreamPartition).dequeue)
+ fetchMap += systemStreamPartition -> (fetchMap(systemStreamPartition).intValue + 1)
+ neededByPicker -= systemStreamPartition
+ })
+ }
+
+ private def poll(systemName: String) = {
+ debug("Polling system consumer: %s" format systemName)
+
+ val consumer = consumers(systemName)
+
+ debug("Filtering for system: %s, %s" format (systemName, fetchMap))
+
+ val systemFetchMap = fetchMap.filterKeys(_.getSystem.equals(systemName))
+
+ debug("Fetching: %s" format systemFetchMap)
+
+ val incomingEnvelopes = consumer.poll(systemFetchMap, timeout)
+
+ debug("Got incoming message envelopes: %s" format incomingEnvelopes)
+
+ // We have new un-processed envelopes, so update maps accordingly.
+ incomingEnvelopes.foreach(envelope => {
+ val systemStreamPartition = envelope.getSystemStreamPartition
+
+ debug("Got message for: %s, %s" format (systemStreamPartition, envelope))
+
+ fetchMap += systemStreamPartition -> (fetchMap(systemStreamPartition).intValue - 1)
+
+ debug("Updated fetch map for: %s, %s" format (systemStreamPartition, fetchMap))
+
+ unprocessedMessages(envelope.getSystemStreamPartition).enqueue(serdeManager.fromBytes(envelope))
+
+ debug("Updated unprocessed messages for: %s, %s" format (systemStreamPartition, unprocessedMessages))
+ })
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/5ff71e51/samza-core/src/main/scala/org/apache/samza/system/SystemProducers.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/system/SystemProducers.scala b/samza-core/src/main/scala/org/apache/samza/system/SystemProducers.scala
new file mode 100644
index 0000000..738d04f
--- /dev/null
+++ b/samza-core/src/main/scala/org/apache/samza/system/SystemProducers.scala
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.system
+
+import org.apache.samza.serializers.SerdeManager
+
+class SystemProducers(
+ producers: Map[String, SystemProducer],
+ serdeManager: SerdeManager) {
+
+ // TODO add metrics and logging
+
+ def start {
+ producers.values.foreach(_.start)
+ }
+
+ def stop {
+ producers.values.foreach(_.stop)
+ }
+
+ def register(source: String) {
+ producers.values.foreach(_.register(source))
+ }
+
+ def commit(source: String) {
+ producers.values.foreach(_.commit(source))
+ }
+
+ def send(source: String, envelope: OutgoingMessageEnvelope) {
+ producers(envelope.getSystemStream.getSystem).send(source, serdeManager.toBytes(envelope))
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/5ff71e51/samza-core/src/main/scala/org/apache/samza/task/ReadableCollector.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/task/ReadableCollector.scala b/samza-core/src/main/scala/org/apache/samza/task/ReadableCollector.scala
new file mode 100644
index 0000000..0ec79da
--- /dev/null
+++ b/samza-core/src/main/scala/org/apache/samza/task/ReadableCollector.scala
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.task
+
+import scala.collection.mutable
+
+import org.apache.samza.system.OutgoingMessageEnvelope
+
+/** An in-memory implementation of MessageCollector that stores all outgoing messages in a list */
+class ReadableCollector extends MessageCollector {
+ var envelopes = new mutable.ArrayBuffer[OutgoingMessageEnvelope]()
+
+ def send(envelope: OutgoingMessageEnvelope) {
+ envelopes += envelope
+ }
+
+ def getEnvelopes = envelopes
+
+ def reset() = envelopes.clear
+}
http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/5ff71e51/samza-core/src/main/scala/org/apache/samza/task/ReadableCoordinator.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/task/ReadableCoordinator.scala b/samza-core/src/main/scala/org/apache/samza/task/ReadableCoordinator.scala
new file mode 100644
index 0000000..aaf631e
--- /dev/null
+++ b/samza-core/src/main/scala/org/apache/samza/task/ReadableCoordinator.scala
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.task
+
+/** An in-memory implementation of TaskCoordinator that stores all coordination messages */
+class ReadableCoordinator extends TaskCoordinator {
+ var commitRequested = false
+ var shutdownRequested = false
+
+ def commit { commitRequested = true }
+
+ def isCommitRequested = commitRequested
+
+ def shutdown { shutdownRequested = true }
+
+ def isShutdownRequested = shutdownRequested
+}
http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/5ff71e51/samza-core/src/main/scala/org/apache/samza/util/DaemonThreadFactory.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/util/DaemonThreadFactory.scala b/samza-core/src/main/scala/org/apache/samza/util/DaemonThreadFactory.scala
new file mode 100644
index 0000000..04e67a2
--- /dev/null
+++ b/samza-core/src/main/scala/org/apache/samza/util/DaemonThreadFactory.scala
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.util
+
+import java.util.concurrent.ThreadFactory
+
+class DaemonThreadFactory extends ThreadFactory {
+ def newThread(r: Runnable) = {
+ val thread = new Thread(r)
+ thread.setDaemon(true)
+ thread
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/5ff71e51/samza-core/src/main/scala/org/apache/samza/util/Util.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/util/Util.scala b/samza-core/src/main/scala/org/apache/samza/util/Util.scala
new file mode 100644
index 0000000..8386324
--- /dev/null
+++ b/samza-core/src/main/scala/org/apache/samza/util/Util.scala
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.util
+
+import java.io.File
+import java.net.InetAddress
+import java.net.UnknownHostException
+import java.util.Random
+import grizzled.slf4j.Logging
+import org.apache.samza.SamzaException
+import org.apache.samza.config.Config
+import org.apache.samza.config.SystemConfig.Config2System
+import org.apache.samza.config.TaskConfig.Config2Task
+import scala.collection.JavaConversions._
+import java.util.concurrent.ThreadFactory
+import org.apache.samza.system.SystemFactory
+import org.apache.samza.system.SystemStream
+
+object Util extends Logging {
+ val random = new Random
+
+ /**
+ * Make an environment variable string safe to pass.
+ */
+ def envVarEscape(str: String) = str.replace("\"", "\\\"").replace("'", "\\'")
+
+ /**
+ * Get a random number >= startInclusive, and < endExclusive.
+ */
+ def randomBetween(startInclusive: Int, endExclusive: Int) =
+ startInclusive + random.nextInt(endExclusive - startInclusive)
+
+ /**
+ * Recursively remove a directory (or file), and all sub-directories. Equivalent
+ * to rm -rf.
+ */
+ def rm(file: File) {
+ if (file == null) {
+ return
+ } else if (file.isDirectory) {
+ val files = file.listFiles()
+ if (files != null) {
+ for (f <- files)
+ rm(f)
+ }
+ file.delete()
+ } else {
+ file.delete()
+ }
+ }
+
+ /**
+ * Instantiate a class instance from a given className.
+ */
+ def getObj[T](className: String) = {
+ Class
+ .forName(className)
+ .newInstance
+ .asInstanceOf[T]
+ }
+
+ /**
+ * Uses config to create SystemAdmin classes for all input stream systems to
+ * get each input stream's partition count, then returns the maximum count.
+ * An input stream with two partitions, and a second input stream with four
+ * partitions would result in this method returning 4.
+ */
+ def getMaxInputStreamPartitions(config: Config) = {
+ val inputStreams = config.getInputStreams
+ val systemNames = config.getSystemNames
+ val systemAdmins = systemNames.map(systemName => {
+ val systemFactoryClassName = config
+ .getSystemFactory(systemName)
+ .getOrElse(throw new SamzaException("A stream uses system %s, which is missing from the configuration." format systemName))
+ val systemFactory = Util.getObj[SystemFactory](systemFactoryClassName)
+ val systemAdmin = systemFactory.getAdmin(systemName, config)
+ (systemName, systemAdmin)
+ }).toMap
+ inputStreams.flatMap(systemStream => {
+ systemAdmins.get(systemStream.getSystem) match {
+ case Some(sysAdmin) => sysAdmin.getPartitions(systemStream.getStream)
+ case None => throw new IllegalArgumentException("Could not find a stream admin for system '" + systemStream.getSystem + "'")
+ }
+ }).toSet
+ }
+
+ /**
+ * Returns a SystemStream object based on the system stream name given. For
+ * example, kafka.topic would return new SystemStream("kafka", "topic").
+ */
+ def getSystemStreamFromNames(systemStreamNames: String): SystemStream = {
+ val idx = systemStreamNames.indexOf('.')
+ if(idx < 0)
+ throw new IllegalArgumentException("No '.' in stream name '" + systemStreamNames + "'. Stream names should be in the form 'system.stream'")
+ new SystemStream(systemStreamNames.substring(0, idx), systemStreamNames.substring(idx + 1, systemStreamNames.length))
+ }
+
+ /**
+ * Returns a SystemStream object based on the system stream name given. For
+ * example, kafka.topic would return new SystemStream("kafka", "topic").
+ */
+ def getNameFromSystemStream(systemStream: SystemStream) = {
+ systemStream.getSystem + "." + systemStream.getStream
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/5ff71e51/samza-core/src/test/resources/test.properties
----------------------------------------------------------------------
diff --git a/samza-core/src/test/resources/test.properties b/samza-core/src/test/resources/test.properties
new file mode 100644
index 0000000..9348c7d
--- /dev/null
+++ b/samza-core/src/test/resources/test.properties
@@ -0,0 +1,24 @@
+#
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+#
+
+job.factory.class=org.apache.samza.job.MockJobFactory
+job.name=test-job
+foo=bar
http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/5ff71e51/samza-core/src/test/scala/org/apache/samza/checkpoint/file/TestFileSystemCheckpointManager.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/test/scala/org/apache/samza/checkpoint/file/TestFileSystemCheckpointManager.scala b/samza-core/src/test/scala/org/apache/samza/checkpoint/file/TestFileSystemCheckpointManager.scala
new file mode 100644
index 0000000..50d9a05
--- /dev/null
+++ b/samza-core/src/test/scala/org/apache/samza/checkpoint/file/TestFileSystemCheckpointManager.scala
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.checkpoint.file
+
+import java.io.File
+import scala.collection.JavaConversions._
+import java.util.Random
+import org.junit.Assert._
+import org.junit.Test
+import org.apache.samza.SamzaException
+import org.apache.samza.Partition
+import org.apache.samza.checkpoint.Checkpoint
+import org.apache.samza.system.SystemStream
+
+class TestFileSystemCheckpointManager {
+ val checkpointRoot = System.getProperty("java.io.tmpdir")
+
+ @Test
+ def testReadForCheckpointFileThatDoesNotExistShouldReturnNull {
+ val cpm = new FileSystemCheckpointManager("some-job-name", new File(checkpointRoot))
+ assert(cpm.readLastCheckpoint(new Partition(1)) == null)
+ }
+
+ @Test
+ def testReadForCheckpointFileThatDoesExistShouldReturnProperCheckpoint {
+ val cpm = new FileSystemCheckpointManager("some-job-name", new File(checkpointRoot))
+ val partition = new Partition(2)
+ val cp = new Checkpoint(Map(
+ new SystemStream("a", "b") -> "c",
+ new SystemStream("a", "c") -> "d",
+ new SystemStream("b", "d") -> "e"))
+ cpm.start
+ cpm.writeCheckpoint(partition, cp)
+ val readCp = cpm.readLastCheckpoint(partition)
+ cpm.stop
+ assert(readCp.equals(cp))
+ }
+
+ @Test
+ def testMissingRootDirectoryShouldFailOnManagerCreation {
+ val cpm = new FileSystemCheckpointManager("some-job-name", new File(checkpointRoot + new Random().nextInt))
+ try {
+ cpm.start
+ fail("Expected an exception since root directory for fs checkpoint manager doesn't exist.")
+ } catch {
+ case e: SamzaException => None // this is expected
+ }
+ cpm.stop
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/5ff71e51/samza-core/src/test/scala/org/apache/samza/config/factories/TestPropertiesConfigFactory.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/test/scala/org/apache/samza/config/factories/TestPropertiesConfigFactory.scala b/samza-core/src/test/scala/org/apache/samza/config/factories/TestPropertiesConfigFactory.scala
new file mode 100644
index 0000000..f254741
--- /dev/null
+++ b/samza-core/src/test/scala/org/apache/samza/config/factories/TestPropertiesConfigFactory.scala
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.config.factories
+import java.net.URI
+import java.io.File
+import org.apache.samza.SamzaException
+import org.junit.Assert._
+import org.junit.Test
+
+class TestPropertiesConfigFactory {
+ val factory = new PropertiesConfigFactory()
+
+ @Test
+ def testCanReadPropertiesConfigFiles {
+ val config = factory.getConfig(URI.create("file://%s/src/test/resources/test.properties" format new File(".").getCanonicalPath))
+ assert("bar".equals(config.get("foo")))
+ }
+
+ @Test
+ def testCanNotReadNonLocalPropertiesConfigFiles {
+ try {
+ factory.getConfig(URI.create("hdfs://foo"))
+ fail("should have gotten a samza exception")
+ } catch {
+ case e: SamzaException => None // Do nothing
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/5ff71e51/samza-core/src/test/scala/org/apache/samza/job/TestJobRunner.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/test/scala/org/apache/samza/job/TestJobRunner.scala b/samza-core/src/test/scala/org/apache/samza/job/TestJobRunner.scala
new file mode 100644
index 0000000..21d8a78
--- /dev/null
+++ b/samza-core/src/test/scala/org/apache/samza/job/TestJobRunner.scala
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.job
+import java.io.File
+import org.apache.samza.config.Config
+import org.junit.Assert._
+import org.junit.Test
+
+object TestJobRunner {
+ var processCount = 0
+}
+
+class TestJobRunner {
+ @Test
+ def testJobRunnerWorks {
+ JobRunner.main(Array(
+ "--config-factory",
+ "org.apache.samza.config.factories.PropertiesConfigFactory",
+ "--config-path",
+ "file://%s/src/test/resources/test.properties" format new File(".").getCanonicalPath))
+ assert(TestJobRunner.processCount == 1)
+ }
+}
+
+class MockJobFactory extends StreamJobFactory {
+ def getJob(config: Config): StreamJob = {
+ return new StreamJob {
+ def submit() = { TestJobRunner.processCount += 1; this }
+ def kill() = this
+ def waitForFinish(timeoutMs: Long) = ApplicationStatus.SuccessfulFinish
+ def waitForStatus(status: ApplicationStatus, timeoutMs: Long) = status
+ def getStatus() = ApplicationStatus.SuccessfulFinish
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/5ff71e51/samza-core/src/test/scala/org/apache/samza/job/local/TestProcessJob.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/test/scala/org/apache/samza/job/local/TestProcessJob.scala b/samza-core/src/test/scala/org/apache/samza/job/local/TestProcessJob.scala
new file mode 100644
index 0000000..d56024d
--- /dev/null
+++ b/samza-core/src/test/scala/org/apache/samza/job/local/TestProcessJob.scala
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.job.local;
+import org.junit.Assert._
+import org.junit.Test
+import org.apache.samza.job.ApplicationStatus
+
+class TestProcessJob {
+ @Test
+ def testProcessJobShouldFinishOnItsOwn {
+ val builder = new ProcessBuilder("sleep", "1")
+ val job = new ProcessJob(builder)
+ job.submit
+ job.waitForFinish(999999)
+ }
+
+ @Test
+ def testProcessJobKillShouldWork {
+ val builder = new ProcessBuilder("sleep", "999999999")
+ val job = new ProcessJob(builder)
+ job.submit
+ job.waitForFinish(500)
+ job.kill
+ job.waitForFinish(999999)
+ assert(ApplicationStatus.UnsuccessfulFinish.equals(job.waitForFinish(999999999)))
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/5ff71e51/samza-core/src/test/scala/org/apache/samza/job/local/TestThreadJob.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/test/scala/org/apache/samza/job/local/TestThreadJob.scala b/samza-core/src/test/scala/org/apache/samza/job/local/TestThreadJob.scala
new file mode 100644
index 0000000..7d45889
--- /dev/null
+++ b/samza-core/src/test/scala/org/apache/samza/job/local/TestThreadJob.scala
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.job.local
+import org.junit.Assert._
+import org.junit.Test
+import org.apache.samza.job.ApplicationStatus
+
+class TestThreadJob {
+ @Test
+ def testThreadJobShouldFinishOnItsOwn {
+ val job = new ThreadJob(new Runnable {
+ override def run {
+ }
+ })
+ job.submit
+ job.waitForFinish(999999)
+ }
+
+ @Test
+ def testThreadJobKillShouldWork {
+ val job = new ThreadJob(new Runnable {
+ override def run {
+ Thread.sleep(999999)
+ }
+ })
+ job.submit
+ job.waitForFinish(500)
+ job.kill
+ job.waitForFinish(999999)
+ assert(ApplicationStatus.UnsuccessfulFinish.equals(job.waitForFinish(999999999)))
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/5ff71e51/samza-core/src/test/scala/org/apache/samza/metrics/TestJmxServer.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/test/scala/org/apache/samza/metrics/TestJmxServer.scala b/samza-core/src/test/scala/org/apache/samza/metrics/TestJmxServer.scala
new file mode 100644
index 0000000..f5594d0
--- /dev/null
+++ b/samza-core/src/test/scala/org/apache/samza/metrics/TestJmxServer.scala
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.metrics
+
+import org.junit.Assert._
+import org.junit.Test
+import org.apache.samza.config.MapConfig
+import grizzled.slf4j.Logging
+import javax.management.remote.{JMXConnector, JMXConnectorFactory, JMXServiceURL}
+import java.io.IOException
+
+
+class TestJmxServer extends Logging {
+ @Test
+ def serverStartsUp {
+ var jmxServer:JmxServer = null
+
+ try {
+ jmxServer = new JmxServer
+
+ println("Got jmxServer on port " + jmxServer.getPort)
+
+ val jmxURL = new JMXServiceURL(jmxServer.getJmxUrl)
+ var jmxConnector:JMXConnector = null
+ try {
+ jmxConnector = JMXConnectorFactory.connect(jmxURL, null)
+ val connection = jmxConnector.getMBeanServerConnection()
+ assertTrue("Connected but mbean count is somehow 0", connection.getMBeanCount.intValue() > 0)
+ } catch {
+ case ioe:IOException => fail("Couldn't open connection to local JMX server")
+ }finally {
+ if(jmxConnector != null) jmxConnector.close
+ }
+
+ } finally {
+ if (jmxServer != null) jmxServer.stop
+ }
+
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/5ff71e51/samza-core/src/test/scala/org/apache/samza/metrics/reporter/TestJmxReporter.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/test/scala/org/apache/samza/metrics/reporter/TestJmxReporter.scala b/samza-core/src/test/scala/org/apache/samza/metrics/reporter/TestJmxReporter.scala
new file mode 100644
index 0000000..8827697
--- /dev/null
+++ b/samza-core/src/test/scala/org/apache/samza/metrics/reporter/TestJmxReporter.scala
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.metrics.reporter
+
+import org.junit.Assert._
+import org.junit.AfterClass
+import org.junit.BeforeClass
+import org.junit.Test
+import scala.collection.JavaConversions._
+import org.apache.samza.task.TaskContext
+import javax.management.remote.JMXConnectorFactory
+import org.apache.samza.metrics.MetricsRegistryMap
+import javax.management.remote.JMXConnectorServerFactory
+import javax.management.remote.JMXConnectorServer
+import java.rmi.registry.LocateRegistry
+import javax.management.remote.JMXServiceURL
+import org.apache.samza.config.MapConfig
+import java.lang.management.ManagementFactory
+import org.apache.samza.Partition
+import javax.management.ObjectName
+import org.apache.samza.metrics.JvmMetrics
+
+object TestJmxReporter {
+ val port = 4500
+ val url = new JMXServiceURL("service:jmx:rmi:///jndi/rmi://localhost:%d/jmxapitestrmi" format port)
+ var cs: JMXConnectorServer = null
+
+ @BeforeClass
+ def beforeSetupServers {
+ LocateRegistry.createRegistry(4500)
+ val mbs = ManagementFactory.getPlatformMBeanServer()
+ cs = JMXConnectorServerFactory.newJMXConnectorServer(url, null, mbs)
+ cs.start
+ }
+
+ @AfterClass
+ def afterCleanLogDirs {
+ if (cs != null) {
+ cs.stop
+ }
+ }
+}
+
+class TestJmxReporter {
+ import TestJmxReporter.url
+
+ @Test
+ def testJmxReporter {
+ val registry = new MetricsRegistryMap
+ val jvm = new JvmMetrics("test", registry)
+ val reporter = new JmxReporterFactory().getMetricsReporter("", "", new MapConfig(Map[String, String]()))
+
+ reporter.register("test", registry)
+ reporter.start
+ jvm.run
+
+ val mbserver = JMXConnectorFactory.connect(url).getMBeanServerConnection
+ val stateViaJMX = mbserver.getAttribute(new ObjectName("test:type=test,name=MemNonHeapUsedM"), "Value").asInstanceOf[Float]
+
+ assertTrue(stateViaJMX > 0)
+
+ reporter.stop
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/5ff71e51/samza-core/src/test/scala/org/apache/samza/task/TestReadableCoordinator.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/test/scala/org/apache/samza/task/TestReadableCoordinator.scala b/samza-core/src/test/scala/org/apache/samza/task/TestReadableCoordinator.scala
new file mode 100644
index 0000000..c45ed9b
--- /dev/null
+++ b/samza-core/src/test/scala/org/apache/samza/task/TestReadableCoordinator.scala
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.task
+
+import org.junit.Assert._
+import org.junit.Test
+
+class TestReadableCoordinator {
+ @Test
+ def testCommit {
+ val coord = new ReadableCoordinator
+ assert(!coord.isCommitRequested)
+ coord.commit
+ assert(coord.isCommitRequested)
+ coord.commit
+ assert(coord.isCommitRequested)
+ }
+
+ @Test
+ def testShutdown {
+ val coord = new ReadableCoordinator
+ assert(!coord.isShutdownRequested)
+ coord.shutdown
+ assert(coord.isShutdownRequested)
+ coord.shutdown
+ assert(coord.isShutdownRequested)
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/5ff71e51/samza-kafka/lib/kafka_2.8.1-0.8.1-SNAPSHOT-test.jar
----------------------------------------------------------------------
diff --git a/samza-kafka/lib/kafka_2.8.1-0.8.1-SNAPSHOT-test.jar b/samza-kafka/lib/kafka_2.8.1-0.8.1-SNAPSHOT-test.jar
new file mode 100644
index 0000000..ce88269
Binary files /dev/null and b/samza-kafka/lib/kafka_2.8.1-0.8.1-SNAPSHOT-test.jar differ