You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by cr...@apache.org on 2013/08/12 18:21:38 UTC
[08/15] initial import.
http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/5ff71e51/samza-core/src/main/scala/org/apache/samza/config/FileSystemCheckpointManagerConfig.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/config/FileSystemCheckpointManagerConfig.scala b/samza-core/src/main/scala/org/apache/samza/config/FileSystemCheckpointManagerConfig.scala
new file mode 100644
index 0000000..707ea59
--- /dev/null
+++ b/samza-core/src/main/scala/org/apache/samza/config/FileSystemCheckpointManagerConfig.scala
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.config
+
+object FileSystemCheckpointManagerConfig {
+ // file system checkpoint manager config constants
+ val CHECKPOINT_MANAGER_ROOT = "task.checkpoint.path" // system name to use when sending offset checkpoints
+
+ implicit def Config2FSCP(config: Config) = new FileSystemCheckpointManagerConfig(config)
+}
+
+class FileSystemCheckpointManagerConfig(config: Config) extends ScalaMapConfig(config) {
+ def getFileSystemCheckpointRoot = getOption(FileSystemCheckpointManagerConfig.CHECKPOINT_MANAGER_ROOT)
+}
http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/5ff71e51/samza-core/src/main/scala/org/apache/samza/config/JobConfig.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/config/JobConfig.scala b/samza-core/src/main/scala/org/apache/samza/config/JobConfig.scala
new file mode 100644
index 0000000..150ea93
--- /dev/null
+++ b/samza-core/src/main/scala/org/apache/samza/config/JobConfig.scala
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.config
+
+object JobConfig {
+ // job config constants
+ val STREAM_JOB_FACTORY_CLASS = "job.factory.class" // streaming.job_factory_class
+ val CONFIG_REWRITERS = "job.config.rewriters" // CSV list of config rewriter classes to apply
+ val CONFIG_REWRITER_CLASS = "job.config.rewriter.%s.class"
+ val JOB_NAME = "job.name" // streaming.job_name
+ val JOB_ID = "job.id" // streaming.job_id
+
+ implicit def Config2Job(config: Config) = new JobConfig(config)
+}
+
+class JobConfig(config: Config) extends ScalaMapConfig(config) {
+ def getName = getOption(JobConfig.JOB_NAME)
+
+ def getStreamJobFactoryClass = getOption(JobConfig.STREAM_JOB_FACTORY_CLASS)
+
+ def getJobId = getOption(JobConfig.JOB_ID)
+
+ def getConfigRewriters = getOption(JobConfig.CONFIG_REWRITERS)
+
+ def getConfigRewriterClass(name: String) = getOption(JobConfig.CONFIG_REWRITER_CLASS format name)
+}
http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/5ff71e51/samza-core/src/main/scala/org/apache/samza/config/MetricsConfig.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/config/MetricsConfig.scala b/samza-core/src/main/scala/org/apache/samza/config/MetricsConfig.scala
new file mode 100644
index 0000000..f7a11c5
--- /dev/null
+++ b/samza-core/src/main/scala/org/apache/samza/config/MetricsConfig.scala
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.config
+import scala.collection.JavaConversions._
+
+object MetricsConfig {
+ // metrics config constants
+ val METRICS_REPORTERS = "metrics.reporters"
+ val METRICS_REPORTER_FACTORY = "metrics.reporter.%s.class"
+ val METRICS_SNAPSHOT_REPORTER_STREAM = "metrics.reporter.%s.stream"
+
+ implicit def Config2Metrics(config: Config) = new MetricsConfig(config)
+}
+
+class MetricsConfig(config: Config) extends ScalaMapConfig(config) {
+ def getMetricsReporters(): Option[String] = getOption(MetricsConfig.METRICS_REPORTERS)
+
+ def getMetricsFactoryClass(name: String): Option[String] = getOption(MetricsConfig.METRICS_REPORTER_FACTORY format name)
+
+ def getMetricsReporterStream(name: String): Option[String] = getOption(MetricsConfig.METRICS_SNAPSHOT_REPORTER_STREAM format name)
+
+ /**
+ * Returns a list of all metrics names from the config file. Useful for
+ * getting individual metrics.
+ */
+ def getMetricReporterNames() = {
+ getMetricsReporters match {
+ case Some(mr) => if (!"".equals(mr)) {
+ mr.split(",").map(name => name.trim).toList
+ } else {
+ List[String]()
+ }
+ case _ => List[String]()
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/5ff71e51/samza-core/src/main/scala/org/apache/samza/config/ScalaMapConfig.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/config/ScalaMapConfig.scala b/samza-core/src/main/scala/org/apache/samza/config/ScalaMapConfig.scala
new file mode 100644
index 0000000..5fb1f52
--- /dev/null
+++ b/samza-core/src/main/scala/org/apache/samza/config/ScalaMapConfig.scala
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.config
+import org.apache.samza.SamzaException
+
+class ScalaMapConfig(config: Config) extends MapConfig(config) {
+ def getOrElse(k: String, els: String) = getOption(k).getOrElse(els)
+
+ def getOption(k: String): Option[String] = if (containsKey(k)) Some(config.get(k)) else None
+
+ def getExcept(k: String, msg: String = null): String =
+ getOption(k) match {
+ case Some(s) => s
+ case _ =>
+ val error =
+ if(msg == null) "Missing required configuration '%s'".format(k)
+ else "Missing required configuration '%s': %s".format(k, msg)
+ throw new SamzaException(error)
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/5ff71e51/samza-core/src/main/scala/org/apache/samza/config/SerializerConfig.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/config/SerializerConfig.scala b/samza-core/src/main/scala/org/apache/samza/config/SerializerConfig.scala
new file mode 100644
index 0000000..0c026e6
--- /dev/null
+++ b/samza-core/src/main/scala/org/apache/samza/config/SerializerConfig.scala
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.config
+import scala.collection.JavaConversions._
+
+object SerializerConfig {
+ // serializer config constants
+ val SERIALIZER_PREFIX = "serializers.registry.%s"
+ val SERDE = "serializers.registry.%s.class"
+
+ implicit def Config2Serializer(config: Config) = new SerializerConfig(config)
+}
+
+class SerializerConfig(config: Config) extends ScalaMapConfig(config) {
+ def getSerdeClass(name: String) = getOption(SerializerConfig.SERDE format name)
+
+ /**
+ * Returns a list of all serializer names from the config file. Useful for
+ * getting individual serializers.
+ */
+ def getSerdeNames() = {
+ val subConf = config.subset("serializers.registry.", true)
+ subConf.keys.filter(k => k.endsWith(".class")).map(_.replace(".class", ""))
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/5ff71e51/samza-core/src/main/scala/org/apache/samza/config/ShellCommandConfig.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/config/ShellCommandConfig.scala b/samza-core/src/main/scala/org/apache/samza/config/ShellCommandConfig.scala
new file mode 100644
index 0000000..fd8dab8
--- /dev/null
+++ b/samza-core/src/main/scala/org/apache/samza/config/ShellCommandConfig.scala
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.config
+
+object ShellCommandConfig {
+ /**
+ * This environment variable is used to store a JSON serialized map of all configuration.
+ */
+ val ENV_CONFIG = "STREAMING_CONFIG"
+
+ /**
+ * A CSV list of partition IDs that a TaskRunner is responsible for (e.g. 0,2,4,6).
+ */
+ val ENV_PARTITION_IDS = "PARTITION_IDS"
+
+ /**
+ * A name for a TaskRunner.
+ */
+ val ENV_TASK_NAME = "TASK_NAME"
+
+ /**
+ * Arguments to be passed to the processing running the TaskRunner (or equivalent, for non JVM languages).
+ */
+ val ENV_SAMZA_OPTS = "SAMZA_OPTS"
+
+ val COMMAND_SHELL_EXECUTE = "task.execute"
+ val TASK_JVM_OPTS = "task.opts"
+
+ implicit def Config2ShellCommand(config: Config) = new ShellCommandConfig(config)
+}
+
+class ShellCommandConfig(config: Config) extends ScalaMapConfig(config) {
+ def getCommand = getOption(ShellCommandConfig.COMMAND_SHELL_EXECUTE).getOrElse("bin/run-task.sh")
+
+ def getTaskOpts = getOption(ShellCommandConfig.TASK_JVM_OPTS)
+}
http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/5ff71e51/samza-core/src/main/scala/org/apache/samza/config/StorageConfig.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/config/StorageConfig.scala b/samza-core/src/main/scala/org/apache/samza/config/StorageConfig.scala
new file mode 100644
index 0000000..abcb034
--- /dev/null
+++ b/samza-core/src/main/scala/org/apache/samza/config/StorageConfig.scala
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.config
+
+import scala.collection.JavaConversions._
+
+import grizzled.slf4j.Logging
+
+object StorageConfig {
+ // stream config constants
+ val FACTORY = "stores.%s.factory"
+ val KEY_SERDE = "stores.%s.key.serde"
+ val MSG_SERDE = "stores.%s.msg.serde"
+ val CHANGELOG_STREAM = "stores.%s.changelog"
+
+ implicit def Config2Storage(config: Config) = new StorageConfig(config)
+}
+
+class StorageConfig(config: Config) extends ScalaMapConfig(config) with Logging {
+ import StorageConfig._
+ def getStorageFactoryClassName(name: String) = getOption(FACTORY.format(name))
+ def getStorageKeySerde(name: String) = getOption(StorageConfig.KEY_SERDE format name)
+ def getStorageMsgSerde(name: String) = getOption(StorageConfig.MSG_SERDE format name)
+ def getChangelogStream(name: String) = getOption(CHANGELOG_STREAM format name)
+ def getStoreNames: Seq[String] = {
+ val conf = config.subset("stores.", true)
+ conf.keys.filter(k => k.endsWith(".factory")).map(k => k.substring(0, k.length - ".factory".length)).toSeq
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/5ff71e51/samza-core/src/main/scala/org/apache/samza/config/StreamConfig.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/config/StreamConfig.scala b/samza-core/src/main/scala/org/apache/samza/config/StreamConfig.scala
new file mode 100644
index 0000000..517e9ae
--- /dev/null
+++ b/samza-core/src/main/scala/org/apache/samza/config/StreamConfig.scala
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.config
+
+import grizzled.slf4j.Logging
+import scala.collection.JavaConversions._
+import org.apache.samza.SamzaException
+import org.apache.samza.util.Util
+import org.apache.samza.system.SystemStream
+
+object StreamConfig {
+ // stream config constants
+ val STREAM_PREFIX = "systems.%s.streams.%s."
+ val MSG_SERDE = STREAM_PREFIX + "samza.msg.serde"
+ val KEY_SERDE = STREAM_PREFIX + "samza.key.serde"
+ val CONSUMER_RESET_OFFSET = STREAM_PREFIX + "samza.reset.offset"
+
+ implicit def Config2Stream(config: Config) = new StreamConfig(config)
+}
+
+class StreamConfig(config: Config) extends ScalaMapConfig(config) with Logging {
+ def getStreamMsgSerde(systemStream: SystemStream) =
+ getOption(StreamConfig.MSG_SERDE format (systemStream.getSystem, systemStream.getStream))
+
+ def getStreamKeySerde(systemStream: SystemStream) =
+ getOption(StreamConfig.KEY_SERDE format (systemStream.getSystem, systemStream.getStream))
+
+ def getResetOffsetMap(systemName: String) = {
+ val subConf = config.subset("systems.%s.streams." format systemName, true)
+ subConf
+ .filterKeys(k => k.endsWith(".samza.reset.offset"))
+ .map(kv => {
+ val streamName = kv._1.replace(".samza.reset.offset", "")
+ val systemStream = new SystemStream(systemName, streamName)
+ val resetVal = getResetOffset(systemStream)
+ (systemStream, resetVal)
+ }).toMap
+ }
+
+ def getResetOffset(systemStream: SystemStream) =
+ getOption(StreamConfig.CONSUMER_RESET_OFFSET format (systemStream.getSystem, systemStream.getStream)) match {
+ case Some("true") => true
+ case Some("false") => false
+ case Some(resetOffset) =>
+ warn("Got a configuration for %s that is not true, or false (was %s). Defaulting to false." format (StreamConfig.CONSUMER_RESET_OFFSET format (systemStream.getSystem, systemStream.getStream), resetOffset))
+ false
+ case _ => false
+ }
+
+ /**
+ * Returns a list of all SystemStreams that have a serde defined from the config file.
+ */
+ def getSerdeStreams(systemName: String) = {
+ val subConf = config.subset("systems.%s.streams." format systemName, true)
+ subConf
+ .keys
+ .filter(k => k.endsWith(".samza.msg.serde") || k.endsWith(".samza.key.serde"))
+ .map(k => {
+ val streamName = k.substring(0, k.length - 16 /* .samza.XXX.serde length */ )
+ new SystemStream(systemName, streamName)
+ }).toSet
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/5ff71e51/samza-core/src/main/scala/org/apache/samza/config/SystemConfig.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/config/SystemConfig.scala b/samza-core/src/main/scala/org/apache/samza/config/SystemConfig.scala
new file mode 100644
index 0000000..ce63a8a
--- /dev/null
+++ b/samza-core/src/main/scala/org/apache/samza/config/SystemConfig.scala
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.config
+
+import scala.collection.JavaConversions._
+
+import grizzled.slf4j.Logging
+
+object SystemConfig {
+ // system config constants
+ val SYSTEM_PREFIX = "systems.%s."
+ val SYSTEM_FACTORY = "systems.%s.samza.factory"
+ val KEY_SERDE = "systems.%s.samza.key.serde"
+ val MSG_SERDE = "systems.%s.samza.msg.serde"
+
+ implicit def Config2System(config: Config) = new SystemConfig(config)
+}
+
+class SystemConfig(config: Config) extends ScalaMapConfig(config) with Logging {
+ def getSystemFactory(name: String) = getOption(SystemConfig.SYSTEM_FACTORY format name)
+
+ def getSystemKeySerde(name: String) = getOption(SystemConfig.KEY_SERDE format name)
+
+ def getSystemMsgSerde(name: String) = getOption(SystemConfig.MSG_SERDE format name)
+
+ /**
+ * Returns a list of all system names from the config file. Useful for
+ * getting individual systems.
+ */
+ def getSystemNames() = {
+ val subConf = config.subset("systems.", true)
+ // find all .samza.partition.manager keys, and strip the suffix
+ subConf.keys.filter(k => k.endsWith(".samza.factory")).map(_.replace(".samza.factory", ""))
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/5ff71e51/samza-core/src/main/scala/org/apache/samza/config/TaskConfig.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/config/TaskConfig.scala b/samza-core/src/main/scala/org/apache/samza/config/TaskConfig.scala
new file mode 100644
index 0000000..0c742d8
--- /dev/null
+++ b/samza-core/src/main/scala/org/apache/samza/config/TaskConfig.scala
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.config
+
+import org.apache.samza.config.StreamConfig.Config2Stream
+import org.apache.samza.SamzaException
+import org.apache.samza.util.Util
+import org.apache.samza.system.SystemStream
+
+object TaskConfig {
+ // task config constants
+ val INPUT_STREAMS = "task.inputs" // streaming.input-streams
+ val WINDOW_MS = "task.window.ms" // window period in milliseconds
+ val COMMIT_MS = "task.commit.ms" // commit period in milliseconds
+ val TASK_CLASS = "task.class" // streaming.task-factory-class
+ val COMMAND_BUILDER = "task.command.class" // streaming.task-factory-class
+ val LIFECYCLE_LISTENERS = "task.lifecycle.listeners" // li-generator,foo
+ val LIFECYCLE_LISTENER = "task.lifecycle.listener.%s.class" // task.lifecycle.listener.li-generator.class
+ val CHECKPOINT_MANAGER_FACTORY = "task.checkpoint.factory" // class name to use when sending offset checkpoints
+ val TASK_JMX_ENABLED = "task.jmx.enabled" // Start up a JMX server for this task?
+
+ implicit def Config2Task(config: Config) = new TaskConfig(config)
+}
+
+class TaskConfig(config: Config) extends ScalaMapConfig(config) {
+ def getInputStreams = getOption(TaskConfig.INPUT_STREAMS) match {
+ case Some(streams) => if (streams.length > 0) {
+ streams.split(",").map(systemStreamNames => {
+ Util.getSystemStreamFromNames(systemStreamNames)
+ }).toSet
+ } else {
+ Set[SystemStream]()
+ }
+ case _ => Set[SystemStream]()
+ }
+
+ def getWindowMs: Option[Long] = getOption(TaskConfig.WINDOW_MS) match {
+ case Some(ms) => Some(ms.toLong)
+ case _ => None
+ }
+
+ def getCommitMs: Option[Long] = getOption(TaskConfig.COMMIT_MS) match {
+ case Some(ms) => Some(ms.toLong)
+ case _ => None
+ }
+
+ def getLifecycleListeners(): Option[String] = getOption(TaskConfig.LIFECYCLE_LISTENERS)
+
+ def getLifecycleListenerClass(name: String): Option[String] = getOption(TaskConfig.LIFECYCLE_LISTENER format name)
+
+ def getTaskClass = getOption(TaskConfig.TASK_CLASS)
+
+ def getCommandClass = getOption(TaskConfig.COMMAND_BUILDER)
+
+ def getCheckpointManagerFactory() = getOption(TaskConfig.CHECKPOINT_MANAGER_FACTORY)
+
+ def getJmxServerEnabled = getBoolean(TaskConfig.TASK_JMX_ENABLED, true)
+}
http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/5ff71e51/samza-core/src/main/scala/org/apache/samza/config/factories/PropertiesConfigFactory.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/config/factories/PropertiesConfigFactory.scala b/samza-core/src/main/scala/org/apache/samza/config/factories/PropertiesConfigFactory.scala
new file mode 100644
index 0000000..768cfa4
--- /dev/null
+++ b/samza-core/src/main/scala/org/apache/samza/config/factories/PropertiesConfigFactory.scala
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.config.factories
+import java.io.FileInputStream
+import java.net.URI
+import java.util.Properties
+import scala.collection.JavaConversions._
+import org.apache.samza.config.Config
+import org.apache.samza.config.ConfigFactory
+import org.apache.samza.config.MapConfig
+import grizzled.slf4j.Logging
+import org.apache.samza.SamzaException
+
+class PropertiesConfigFactory extends ConfigFactory with Logging {
+ def getConfig(configUri: URI): Config = {
+ val scheme = configUri.getScheme
+ if (scheme != null && !scheme.equals("file")) {
+ throw new SamzaException("only the file:// scheme is supported for properties files")
+ }
+
+ val configPath = configUri.getPath
+ val props = new Properties();
+ val in = new FileInputStream(configPath);
+
+ props.load(in);
+ in.close
+
+ debug("got config %s from config %s" format (props, configPath))
+
+ new MapConfig(props.toMap[String, String])
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/5ff71e51/samza-core/src/main/scala/org/apache/samza/config/serializers/JsonConfigSerializer.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/config/serializers/JsonConfigSerializer.scala b/samza-core/src/main/scala/org/apache/samza/config/serializers/JsonConfigSerializer.scala
new file mode 100644
index 0000000..60e65ea
--- /dev/null
+++ b/samza-core/src/main/scala/org/apache/samza/config/serializers/JsonConfigSerializer.scala
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.config.serializers
+import scala.collection.JavaConversions._
+
+import org.codehaus.jackson.map.ObjectMapper
+
+import org.apache.samza.config.Config
+import org.apache.samza.config.MapConfig
+
+import java.util.HashMap
+
+object JsonConfigSerializer {
+ val jsonMapper = new ObjectMapper()
+
+ def fromJson(string: String): Config = {
+ val map = jsonMapper.readValue(string, classOf[HashMap[String, String]])
+ new MapConfig(map)
+ }
+
+ def toJson(config: Config) = jsonMapper.writeValueAsString(new HashMap[String, String](config))
+}
http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/5ff71e51/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala b/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
new file mode 100644
index 0000000..2243b5c
--- /dev/null
+++ b/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
@@ -0,0 +1,608 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.container
+
+import java.io.File
+import grizzled.slf4j.Logging
+import org.apache.samza.Partition
+import org.apache.samza.SamzaException
+import org.apache.samza.checkpoint.CheckpointManager
+import org.apache.samza.checkpoint.CheckpointManagerFactory
+import org.apache.samza.config.Config
+import org.apache.samza.config.MetricsConfig.Config2Metrics
+import org.apache.samza.config.SerializerConfig.Config2Serializer
+import org.apache.samza.config.ShellCommandConfig
+import org.apache.samza.config.StorageConfig.Config2Storage
+import org.apache.samza.config.StreamConfig.Config2Stream
+import org.apache.samza.config.SystemConfig.Config2System
+import org.apache.samza.config.TaskConfig.Config2Task
+import org.apache.samza.config.serializers.JsonConfigSerializer
+import org.apache.samza.metrics.JmxServer
+import org.apache.samza.metrics.JvmMetrics
+import org.apache.samza.metrics.MetricsReporter
+import org.apache.samza.metrics.MetricsReporterFactory
+import org.apache.samza.serializers.Serde
+import org.apache.samza.serializers.SerdeFactory
+import org.apache.samza.serializers.SerdeManager
+import org.apache.samza.storage.StorageEngineFactory
+import org.apache.samza.storage.TaskStorageManager
+import org.apache.samza.system.SystemFactory
+import org.apache.samza.system.SystemStream
+import org.apache.samza.system.SystemStreamPartition
+import org.apache.samza.task.StreamTask
+import org.apache.samza.task.TaskLifecycleListener
+import org.apache.samza.task.TaskLifecycleListenerFactory
+import org.apache.samza.util.Util
+import org.apache.samza.task.ReadableCoordinator
+import org.apache.samza.system.SystemProducers
+import org.apache.samza.task.ReadableCollector
+import org.apache.samza.system.DefaultPicker
+import org.apache.samza.system.SystemConsumers
+
+object SamzaContainer extends Logging {
+ def main(args: Array[String]) {
+ val jmxServer = new JmxServer
+ val containerName = System.getenv(ShellCommandConfig.ENV_TASK_NAME)
+ val configStr = System.getenv(ShellCommandConfig.ENV_CONFIG)
+ val config = JsonConfigSerializer.fromJson(configStr)
+ val partitionIdsCsv = System.getenv(ShellCommandConfig.ENV_PARTITION_IDS)
+ val partitions = if (partitionIdsCsv.length > 0) {
+ partitionIdsCsv.split(",")
+ .map(partitionIdStr => new Partition(partitionIdStr.toInt))
+ .toSet
+ } else {
+ throw new SamzaException("No partitions for this task. Can't run a task without partition assignments. It's likely that the partition manager for this system doesn't know about the stream you're trying to read.")
+ }
+
+ try {
+ SamzaContainer(containerName, partitions, config).run
+ } finally {
+ jmxServer.stop
+ }
+ }
+
+ def apply(containerName: String, partitions: Set[Partition], config: Config) = {
+ info("Setting up Samza container: %s" format containerName)
+ info("Using partitions: %s" format partitions)
+ info("Using configuration: %s" format config)
+
+ val samzaContainerMetrics = new SamzaContainerMetrics(containerName)
+
+ val inputStreams = config.getInputStreams
+ val inputSystems = inputStreams.map(_.getSystem)
+
+ info("Got input streams: %s" format inputStreams)
+
+ val systemNames = config.getSystemNames
+
+ info("Got system names: %s" format systemNames)
+
+ val resetInputStreams = systemNames.flatMap(systemName => {
+ config.getResetOffsetMap(systemName)
+ }).toMap
+
+ info("Got input stream resets: %s" format resetInputStreams)
+
+ val serdeStreams = systemNames.foldLeft(Set[SystemStream]())(_ ++ config.getSerdeStreams(_))
+
+ debug("Got serde streams: %s" format serdeStreams)
+
+ val serdeNames = config.getSerdeNames
+
+ info("Got serde names: %s" format serdeNames)
+
+ val systemFactories = systemNames.map(systemName => {
+ val systemFactoryClassName = config
+ .getSystemFactory(systemName)
+ .getOrElse(throw new SamzaException("A stream uses system %s, which is missing from the configuration." format systemName))
+ (systemName, Util.getObj[SystemFactory](systemFactoryClassName))
+ }).toMap
+
+ info("Got system factories: %s" format systemFactories.keys)
+
+ val consumers = inputSystems
+ .map(systemName => {
+ val systemFactory = systemFactories(systemName)
+
+ try {
+ (systemName, systemFactory.getConsumer(systemName, config, samzaContainerMetrics.registry))
+ } catch {
+ case e: Throwable =>
+ info("Failed to create a consumer for %s, so skipping." format systemName)
+ debug(e)
+ (systemName, null)
+ }
+ })
+ .filter(_._2 != null)
+ .toMap
+
+ info("Got system consumers: %s" format consumers.keys)
+
+ val producers = systemFactories
+ .map {
+ case (systemName, systemFactory) =>
+ try {
+ (systemName, systemFactory.getProducer(systemName, config, samzaContainerMetrics.registry))
+ } catch {
+ case e: Throwable =>
+ info("Failed to create a producer for %s, so skipping." format systemName)
+ debug(e)
+ (systemName, null)
+ }
+ }
+ .filter(_._2 != null)
+ .toMap
+
+ info("Got system producers: %s" format producers.keys)
+
+ val serdes = serdeNames.map(serdeName => {
+ val serdeClassName = config
+ .getSerdeClass(serdeName)
+ .getOrElse(throw new SamzaException("No class defined for serde: %s." format serdeName))
+
+ val serde = Util.getObj[SerdeFactory[Object]](serdeClassName)
+ .getSerde(serdeName, config)
+
+ (serdeName, serde)
+ }).toMap
+
+ info("Got serdes: %s" format serdes.keys)
+
+ /*
+ * A Helper function to build a Map[String, Serde] (systemName -> Serde) for systems defined in the config. This is useful to build both key and message serde maps.
+ */
+ val buildSystemSerdeMap = (getSerdeName: (String) => Option[String]) => {
+ systemNames
+ .filter(getSerdeName(_).isDefined)
+ .map(systemName => {
+ val serdeName = getSerdeName(systemName).get
+ val serde = serdes.getOrElse(serdeName, throw new SamzaException("Serde %s for system %s does not exist in configuration." format (serdeName, systemName)))
+ (systemName, serde)
+ }).toMap
+ }
+
+ /*
+ * A Helper function to build a Map[SystemStream, Serde] for streams defined in the config. This is useful to build both key and message serde maps.
+ */
+ val buildSystemStreamSerdeMap = (getSerdeName: (SystemStream) => Option[String]) => {
+ (serdeStreams ++ inputStreams)
+ .filter(systemStream => getSerdeName(systemStream).isDefined)
+ .map(systemStream => {
+ val serdeName = getSerdeName(systemStream).get
+ val serde = serdes.getOrElse(serdeName, throw new SamzaException("Serde %s for system %s does not exist in configuration." format (serdeName, systemStream)))
+ (systemStream, serde)
+ }).toMap
+ }
+
+ val systemKeySerdes = buildSystemSerdeMap((systemName: String) => config.getSystemKeySerde(systemName))
+
+ debug("Got system key serdes: %s" format systemKeySerdes)
+
+ val systemMessageSerdes = buildSystemSerdeMap((systemName: String) => config.getSystemMsgSerde(systemName))
+
+ debug("Got system message serdes: %s" format systemMessageSerdes)
+
+ val systemStreamKeySerdes = buildSystemStreamSerdeMap((systemStream: SystemStream) => config.getStreamKeySerde(systemStream))
+
+ debug("Got system stream key serdes: %s" format systemStreamKeySerdes)
+
+ val systemStreamMessageSerdes = buildSystemStreamSerdeMap((systemStream: SystemStream) => config.getStreamMsgSerde(systemStream))
+
+ debug("Got system stream message serdes: %s" format systemStreamMessageSerdes)
+
+ val changeLogSystemStreams = config
+ .getStoreNames
+ .filter(config.getChangelogStream(_).isDefined)
+ .map(name => (name, config.getChangelogStream(name).get)).toMap
+ .mapValues(Util.getSystemStreamFromNames(_))
+
+ info("Got change log system streams: %s" format changeLogSystemStreams)
+
+ val serdeManager = new SerdeManager(
+ serdes = serdes,
+ systemKeySerdes = systemKeySerdes,
+ systemMessageSerdes = systemMessageSerdes,
+ systemStreamKeySerdes = systemStreamKeySerdes,
+ systemStreamMessageSerdes = systemStreamMessageSerdes,
+ changeLogSystemStreams = changeLogSystemStreams.values.toSet)
+
+ info("Setting up JVM metrics.")
+
+ val jvm = new JvmMetrics(samzaContainerMetrics.registry)
+
+ info("Setting up incoming message envelope picker.")
+
+ val picker = new DefaultPicker
+
+ info("Setting up metrics reporters.")
+
+ val reporters = config.getMetricReporterNames.map(reporterName => {
+ val metricsFactoryClassName = config
+ .getMetricsFactoryClass(reporterName)
+ .getOrElse(throw new SamzaException("Metrics reporter %s missing .class config" format reporterName))
+
+ val reporter =
+ Util
+ .getObj[MetricsReporterFactory](metricsFactoryClassName)
+ .getMetricsReporter(reporterName, containerName, config)
+ (reporterName, reporter)
+ }).toMap
+
+ info("Got metrics reporters: %s" format reporters.keys)
+
+ val checkpointManager = config.getCheckpointManagerFactory match {
+ case Some(checkpointFactoryClassName) =>
+ Util
+ .getObj[CheckpointManagerFactory](checkpointFactoryClassName)
+ .getCheckpointManager(config, samzaContainerMetrics.registry)
+ case _ => null
+ }
+
+ info("Got checkpoint manager: %s" format checkpointManager)
+
+ val consumerMultiplexer = new SystemConsumers(
+ // TODO add config values for no new message timeout and max msgs per stream partition
+ picker = picker,
+ consumers = consumers,
+ serdeManager = serdeManager)
+
+ val producerMultiplexer = new SystemProducers(
+ producers = producers,
+ serdeManager = serdeManager)
+
+ val listeners = config.getLifecycleListeners match {
+ case Some(listeners) => {
+ listeners.split(",").map(listenerName => {
+ info("Loading lifecycle listener: %s" format listenerName)
+
+ val listenerClassName = config.getLifecycleListenerClass(listenerName).getOrElse(throw new SamzaException("Referencing missing listener %s in config" format listenerName))
+
+ Util.getObj[TaskLifecycleListenerFactory](listenerClassName)
+ .getLifecyleListener(listenerName, config)
+ }).toList
+ }
+ case _ => {
+ info("No lifecycle listeners found")
+
+ List[TaskLifecycleListener]()
+ }
+ }
+
+ // TODO not sure how we should make this config based, or not. Kind of
+ // strange, since it has some dynamic directories when used with YARN.
+ val storeBaseDir = new File(System.getProperty("user.dir"), "state")
+
+ info("Got storage engine base directory: %s" format storeBaseDir)
+
+ val storageEngineFactories = config
+ .getStoreNames
+ .map(storeName => {
+ val storageFactoryClassName = config
+ .getStorageFactoryClassName(storeName)
+ .getOrElse(throw new SamzaException("Missing storage factory for %s." format storeName))
+ (storeName, Util.getObj[StorageEngineFactory[Object, Object]](storageFactoryClassName))
+ }).toMap
+
+ info("Got storage engines: %s" format storageEngineFactories.keys)
+
+ val taskClassName = config
+ .getTaskClass
+ .getOrElse(throw new SamzaException("No task class defined in configuration."))
+
+ info("Got stream task class: %s" format taskClassName)
+
+ val taskWindowMs = config.getWindowMs.getOrElse(-1L)
+
+ info("Got window milliseconds: %s" format taskWindowMs)
+
+ val taskCommitMs = config.getCommitMs.getOrElse(60000L)
+
+ info("Got commit milliseconds: %s" format taskCommitMs)
+
+ // Wire up all task-level (unshared) objects.
+ val taskInstances = partitions.map(partition => {
+ debug("Setting up task instance: %s" format partition)
+
+ val task = Util.getObj[StreamTask](taskClassName)
+
+ val collector = new ReadableCollector
+
+ val taskInstanceMetrics = new TaskInstanceMetrics(partition)
+
+ val storeConsumers = changeLogSystemStreams
+ .map {
+ case (storeName, changeLogSystemStream) =>
+ val systemConsumer = systemFactories
+ .getOrElse(changeLogSystemStream.getSystem, throw new SamzaException("Changelog system %s for store %s does not exist in the config." format (changeLogSystemStream, storeName)))
+ .getConsumer(changeLogSystemStream.getSystem, config, taskInstanceMetrics.registry)
+ (storeName, systemConsumer)
+ }.toMap
+
+ info("Got store consumers: %s" format storeConsumers)
+
+ val taskStores = storageEngineFactories
+ .map {
+ case (storeName, storageEngineFactory) =>
+ val changeLogSystemStreamPartition = if (changeLogSystemStreams.contains(storeName)) {
+ new SystemStreamPartition(changeLogSystemStreams(storeName), partition)
+ } else {
+ null
+ }
+ val keySerde = config.getStorageKeySerde(storeName) match {
+ case Some(keySerde) => serdes(keySerde)
+ case _ => null
+ }
+ val msgSerde = config.getStorageMsgSerde(storeName) match {
+ case Some(msgSerde) => serdes(msgSerde)
+ case _ => null
+ }
+ val storePartitionDir = TaskStorageManager.getStorePartitionDir(storeBaseDir, storeName, partition)
+ val storageEngine = storageEngineFactory.getStorageEngine(
+ storeName,
+ storePartitionDir,
+ keySerde,
+ msgSerde,
+ collector,
+ config,
+ taskInstanceMetrics.registry,
+ changeLogSystemStreamPartition)
+ (storeName, storageEngine)
+ }
+
+ info("Got task stores: %s" format taskStores)
+
+ val storageManager = new TaskStorageManager(
+ partition = partition,
+ taskStores = taskStores,
+ storeConsumers = storeConsumers,
+ changeLogSystemStreams = changeLogSystemStreams,
+ storeBaseDir = storeBaseDir)
+
+ val taskInstance = new TaskInstance(
+ task = task,
+ partition = partition,
+ config = config,
+ metrics = taskInstanceMetrics,
+ consumerMultiplexer = consumerMultiplexer,
+ producerMultiplexer = producerMultiplexer,
+ storageManager = storageManager,
+ checkpointManager = checkpointManager,
+ reporters = reporters,
+ listeners = listeners,
+ inputStreams = inputStreams,
+ resetInputStreams = resetInputStreams,
+ windowMs = taskWindowMs,
+ commitMs = taskCommitMs,
+ collector = collector)
+
+ (partition, taskInstance)
+ }).toMap
+
+ info("Samza container setup complete.")
+
+ new SamzaContainer(
+ taskInstances = taskInstances,
+ config = config,
+ consumerMultiplexer = consumerMultiplexer,
+ producerMultiplexer = producerMultiplexer,
+ checkpointManager = checkpointManager,
+ metrics = samzaContainerMetrics,
+ reporters = reporters,
+ jvm = jvm)
+ }
+}
+
+class SamzaContainer(
+ taskInstances: Map[Partition, TaskInstance],
+ config: Config,
+ consumerMultiplexer: SystemConsumers,
+ producerMultiplexer: SystemProducers,
+ checkpointManager: CheckpointManager = null,
+ metrics: SamzaContainerMetrics = new SamzaContainerMetrics,
+ reporters: Map[String, MetricsReporter] = Map(),
+ jvm: JvmMetrics = null) extends Runnable with Logging {
+
+ def run {
+ info("Entering run loop.")
+
+ startMetrics
+ startCheckpoints
+ startStores
+ startTask
+ startProducers
+ startConsumers
+
+ try {
+ while (true) {
+ val coordinator = new ReadableCoordinator
+
+ process(coordinator)
+ window(coordinator)
+ send
+ commit(coordinator)
+
+ if (coordinator.shutdownRequested) {
+ info("Shutdown requested.")
+
+ return
+ }
+ }
+ } catch {
+ case e: Throwable =>
+ error("Caught exception in process loop.", e)
+ throw e
+ } finally {
+ info("Shutting down.")
+
+ shutdownConsumers
+ shutdownProducers
+ shutdownTask
+ shutdownStores
+ shutdownCheckpoints
+ shutdownMetrics
+
+ info("Shutdown complete.")
+ }
+ }
+
+ def startMetrics {
+ info("Registering task instances with metrics.")
+
+ taskInstances.values.foreach(_.registerMetrics)
+
+ info("Starting JVM metrics.")
+
+ if (jvm != null) {
+ jvm.start
+ }
+
+ info("Starting metrics reporters.")
+
+ reporters.values.foreach(reporter => {
+ reporter.register(metrics.source, metrics.registry)
+ reporter.start
+ })
+ }
+
+ def startCheckpoints {
+ info("Registering task instances with checkpoints.")
+
+ taskInstances.values.foreach(_.registerCheckpoints)
+
+ if (checkpointManager != null) {
+ info("Registering checkpoint manager.")
+
+ checkpointManager.start
+ } else {
+ warn("No checkpoint manager defined. No consumer offsets will be maintained for this job.")
+ }
+ }
+
+ def startStores {
+ info("Starting task instance stores.")
+
+ taskInstances.values.foreach(_.startStores)
+ }
+
+ def startTask {
+ info("Initializing stream tasks.")
+
+ taskInstances.values.foreach(_.initTask)
+ }
+
+ def startProducers {
+ info("Registering task instances with producers.")
+
+ taskInstances.values.foreach(_.registerProducers)
+
+ info("Starting producer multiplexer.")
+
+ producerMultiplexer.start
+ }
+
+ def startConsumers {
+ info("Registering task instances with consumers.")
+
+ taskInstances.values.foreach(_.registerConsumers)
+
+ info("Starting consumer multiplexer.")
+
+ consumerMultiplexer.start
+ }
+
+ def process(coordinator: ReadableCoordinator) {
+ trace("Attempting to pick a message to process.")
+
+ val envelope = consumerMultiplexer.pick
+
+ if (envelope != null) {
+ val partition = envelope.getSystemStreamPartition.getPartition
+
+ trace("Processing incoming message envelope for partition %s." format partition)
+
+ taskInstances(partition).process(envelope, coordinator)
+ } else {
+ trace("No incoming message envelope was available.")
+ }
+ }
+
+ def window(coordinator: ReadableCoordinator) {
+ trace("Windowing stream tasks.")
+
+ taskInstances.values.foreach(_.window(coordinator))
+ }
+
+ def send {
+ trace("Triggering send in task instances.")
+
+ taskInstances.values.foreach(_.send)
+ }
+
+ def commit(coordinator: ReadableCoordinator) {
+ trace("Committing task instances.")
+
+ taskInstances.values.foreach(_.commit(coordinator))
+ }
+
+ def shutdownConsumers {
+ info("Shutting down consumer multiplexer.")
+
+ consumerMultiplexer.stop
+ }
+
+ def shutdownProducers {
+ info("Shutting down producer multiplexer.")
+
+ producerMultiplexer.stop
+ }
+
+ def shutdownTask {
+ info("Shutting down task instance stream tasks.")
+
+ taskInstances.values.foreach(_.shutdownTask)
+ }
+
+ def shutdownStores {
+ info("Shutting down task instance stores.")
+
+ taskInstances.values.foreach(_.shutdownStores)
+ }
+
+ def shutdownCheckpoints {
+ if (checkpointManager != null) {
+ info("Shutting down checkpoint manager.")
+ checkpointManager.stop
+ } else {
+ info("No checkpoint manager defined, so skipping checkpoint manager stop.")
+ }
+ }
+
+ def shutdownMetrics {
+ info("Shutting down metrics reporters.")
+
+ reporters.values.foreach(_.stop)
+
+ if (jvm != null) {
+ info("Shutting down JVM metrics.")
+
+ jvm.stop
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/5ff71e51/samza-core/src/main/scala/org/apache/samza/container/SamzaContainerMetrics.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/container/SamzaContainerMetrics.scala b/samza-core/src/main/scala/org/apache/samza/container/SamzaContainerMetrics.scala
new file mode 100644
index 0000000..81cf356
--- /dev/null
+++ b/samza-core/src/main/scala/org/apache/samza/container/SamzaContainerMetrics.scala
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.container
+
+import org.apache.samza.metrics.ReadableMetricsRegistry
+import org.apache.samza.metrics.MetricsRegistryMap
+
+class SamzaContainerMetrics(
+ val containerName: String = "unnamed-container",
+ val registry: ReadableMetricsRegistry = new MetricsRegistryMap) {
+
+ val source = containerName
+ val commits = registry.newCounter("samza.task.SamzaContainer", "commits")
+
+ // TODO .. etc ..
+}
http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/5ff71e51/samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala b/samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala
new file mode 100644
index 0000000..5fc9316
--- /dev/null
+++ b/samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala
@@ -0,0 +1,241 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.container
+
+import org.apache.samza.metrics.MetricsReporter
+import org.apache.samza.config.Config
+import org.apache.samza.Partition
+import grizzled.slf4j.Logging
+import scala.collection.JavaConversions._
+import org.apache.samza.storage.TaskStorageManager
+import org.apache.samza.config.StreamConfig.Config2Stream
+import org.apache.samza.system.SystemStreamPartition
+import org.apache.samza.task.TaskContext
+import org.apache.samza.task.ClosableTask
+import org.apache.samza.task.InitableTask
+import org.apache.samza.system.IncomingMessageEnvelope
+import org.apache.samza.task.WindowableTask
+import org.apache.samza.checkpoint.CheckpointManager
+import org.apache.samza.task.TaskLifecycleListener
+import org.apache.samza.task.StreamTask
+import org.apache.samza.system.SystemStream
+import org.apache.samza.checkpoint.Checkpoint
+import org.apache.samza.task.ReadableCollector
+import org.apache.samza.system.SystemConsumers
+import org.apache.samza.system.SystemProducers
+import org.apache.samza.task.ReadableCoordinator
+
+class TaskInstance(
+ task: StreamTask,
+ partition: Partition,
+ config: Config,
+ metrics: TaskInstanceMetrics,
+ consumerMultiplexer: SystemConsumers,
+ producerMultiplexer: SystemProducers,
+ storageManager: TaskStorageManager = null,
+ checkpointManager: CheckpointManager = null,
+ reporters: Map[String, MetricsReporter] = Map(),
+ listeners: Seq[TaskLifecycleListener] = Seq(),
+ inputStreams: Set[SystemStream] = Set(),
+ resetInputStreams: Map[SystemStream, Boolean] = Map(),
+ queueSize: Int = 1000,
+ windowMs: Long = -1,
+ commitMs: Long = 60000,
+ clock: () => Long = { System.currentTimeMillis },
+ collector: ReadableCollector = new ReadableCollector) extends Logging {
+
+ var offsets = Map[SystemStream, String]()
+ var lastWindowMs = 0L
+ var lastCommitMs = 0L
+ val isInitableTask = task.isInstanceOf[InitableTask]
+ val isWindowableTask = task.isInstanceOf[WindowableTask]
+ val isClosableTask = task.isInstanceOf[ClosableTask]
+ val context = new TaskContext {
+ def getMetricsRegistry = metrics.registry
+ def getPartition = partition
+ def getStore(storeName: String) = if (storageManager != null) {
+ storageManager(storeName)
+ } else {
+ warn("No store found for name: %s" format storeName)
+
+ null
+ }
+ }
+
+ def registerMetrics {
+ debug("Registering metrics for partition: %s." format partition)
+
+ reporters.values.foreach(_.register(metrics.source, metrics.registry))
+ }
+
+ def registerCheckpoints {
+ if (checkpointManager != null) {
+ debug("Registering checkpoint manager for partition: %s." format partition)
+
+ checkpointManager.register(partition)
+ } else {
+ debug("Skipping checkpoint manager registration for partition: %s." format partition)
+ }
+ }
+
+ def startStores {
+ if (storageManager != null) {
+ debug("Starting storage manager for partition: %s." format partition)
+
+ storageManager.init(collector)
+ } else {
+ debug("Skipping storage manager initialization for partition: %s." format partition)
+ }
+ }
+
+ def initTask {
+ listeners.foreach(_.beforeInit(config, context))
+
+ if (isInitableTask) {
+ debug("Initializing task for partition: %s." format partition)
+
+ task.asInstanceOf[InitableTask].init(config, context)
+ } else {
+ debug("Skipping task initialization for partition: %s." format partition)
+ }
+
+ listeners.foreach(_.afterInit(config, context))
+ }
+
+ def registerProducers {
+ debug("Registering producers for partition: %s." format partition)
+
+ producerMultiplexer.register(metrics.source)
+ }
+
+ def registerConsumers {
+ if (checkpointManager != null) {
+ debug("Loading checkpoints for partition: %s." format partition)
+
+ val checkpoint = checkpointManager.readLastCheckpoint(partition)
+
+ if (checkpoint != null) {
+ for ((systemStream, offset) <- checkpoint.getOffsets) {
+ if (!resetInputStreams.getOrElse(systemStream, false)) {
+ offsets += systemStream -> offset
+ } else {
+ info("Got offset %s for %s, but ignoring, since stream was configured to reset offsets." format (offset, systemStream))
+ }
+ }
+
+ info("Successfully loaded offsets for partition: %s, %s" format (partition, offsets))
+ } else {
+ warn("No checkpoint found for partition: %s. This is allowed if this is your first time running the job, but if it's not, you've probably lost data." format partition)
+ }
+ }
+
+ debug("Registering consumers for partition: %s." format partition)
+
+ inputStreams.foreach(stream =>
+ consumerMultiplexer.register(
+ new SystemStreamPartition(stream, partition),
+ offsets.get(stream).getOrElse(null)))
+ }
+
+ def process(envelope: IncomingMessageEnvelope, coordinator: ReadableCoordinator) {
+ listeners.foreach(_.beforeProcess(envelope, config, context))
+
+ trace("Processing incoming message envelope for partition: %s, %s" format (partition, envelope.getSystemStreamPartition))
+
+ task.process(envelope, collector, coordinator)
+
+ listeners.foreach(_.afterProcess(envelope, config, context))
+
+ trace("Updating offset map for partition: %s, %s, %s" format (partition, envelope.getSystemStreamPartition, envelope.getOffset))
+
+ offsets += envelope.getSystemStreamPartition -> envelope.getOffset
+ }
+
+ def window(coordinator: ReadableCoordinator) {
+ if (isWindowableTask && windowMs >= 0 && lastWindowMs + windowMs < clock()) {
+ trace("Windowing for partition: %s" format partition)
+
+ task.asInstanceOf[WindowableTask].window(collector, coordinator)
+ lastWindowMs = clock()
+
+ trace("Assigned last window time for partition: %s, %s" format (partition, lastWindowMs))
+ } else {
+ trace("Skipping window for partition: %s" format partition)
+ }
+ }
+
+ def send {
+ if (collector.envelopes.size > 0) {
+ trace("Sending messages for partition: %s, %s" format (partition, collector.envelopes.size))
+
+ collector.envelopes.foreach(envelope => producerMultiplexer.send(metrics.source, envelope))
+
+ trace("Resetting collector for partition: %s" format partition)
+
+ collector.reset
+ } else {
+ trace("Skipping send for partition %s because no messages were collected." format partition)
+ }
+ }
+
+ def commit(coordinator: ReadableCoordinator) {
+ if (lastCommitMs + commitMs < clock() || coordinator.isCommitRequested || coordinator.isShutdownRequested) {
+ trace("Flushing state stores for partition: %s" format partition)
+
+ storageManager.flush
+
+ trace("Committing producers for partition: %s" format partition)
+
+ producerMultiplexer.commit(metrics.source)
+
+ if (checkpointManager != null) {
+ trace("Committing checkpoint manager for partition: %s" format partition)
+
+ checkpointManager.writeCheckpoint(partition, new Checkpoint(offsets))
+ }
+
+ lastCommitMs = clock()
+ }
+ }
+
+ def shutdownTask {
+ listeners.foreach(_.beforeClose(config, context))
+
+ if (task.isInstanceOf[ClosableTask]) {
+ debug("Shutting down stream task for partition: %s" format partition)
+
+ task.asInstanceOf[ClosableTask].close
+ } else {
+ debug("Skipping stream task shutdown for partition: %s" format partition)
+ }
+
+ listeners.foreach(_.afterClose(config, context))
+ }
+
+ def shutdownStores {
+ if (storageManager != null) {
+ debug("Shutting down storage manager for partition: %s" format partition)
+
+ storageManager.stop
+ } else {
+ debug("Skipping storage manager shutdown for partition: %s" format partition)
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/5ff71e51/samza-core/src/main/scala/org/apache/samza/container/TaskInstanceMetrics.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/container/TaskInstanceMetrics.scala b/samza-core/src/main/scala/org/apache/samza/container/TaskInstanceMetrics.scala
new file mode 100644
index 0000000..07d72c7
--- /dev/null
+++ b/samza-core/src/main/scala/org/apache/samza/container/TaskInstanceMetrics.scala
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.container
+
+import org.apache.samza.metrics.ReadableMetricsRegistry
+import org.apache.samza.metrics.MetricsRegistryMap
+import org.apache.samza.Partition
+
+class TaskInstanceMetrics(
+ val partition: Partition,
+ val registry: ReadableMetricsRegistry = new MetricsRegistryMap) {
+
+ val source = "Partition-%s" format partition.getPartitionId
+ val commits = registry.newCounter("samza.task.TaskInstance", "commits")
+
+ // TODO .. etc ..
+}
http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/5ff71e51/samza-core/src/main/scala/org/apache/samza/job/JobRunner.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/job/JobRunner.scala b/samza-core/src/main/scala/org/apache/samza/job/JobRunner.scala
new file mode 100644
index 0000000..f3a75af
--- /dev/null
+++ b/samza-core/src/main/scala/org/apache/samza/job/JobRunner.scala
@@ -0,0 +1,135 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.job
+
+import java.lang.String
+import java.net.URI
+import org.apache.samza.job.ApplicationStatus.Running
+import org.apache.samza.config.JobConfig.Config2Job
+import org.apache.samza.config._
+import grizzled.slf4j.Logging
+import joptsimple.OptionParser
+import joptsimple.util.KeyValuePair
+import org.apache.samza.SamzaException
+import org.apache.samza.config.factories.PropertiesConfigFactory
+import scala.Some
+import org.apache.samza.util.Util
+import scala.collection.mutable.Buffer
+import scala.collection.JavaConversions._
+
+object JobRunner extends Logging {
+ def main(args: Array[String]) {
+ // Define parameters.
+ var parser = new OptionParser()
+ val configFactoryOpt =
+ parser.accepts("config-factory", "The config factory to use to read your config file.")
+ .withRequiredArg
+ .ofType(classOf[java.lang.String])
+ .describedAs("com.foo.bar.ClassName")
+ .defaultsTo(classOf[PropertiesConfigFactory].getName)
+ val configPathOpt =
+ parser.accepts("config-path", "URI location to a config file (e.g. file:///some/local/path.properties). " +
+ "If multiple files are given they are all used with later files overriding any values that appear in earlier files.")
+ .withRequiredArg
+ .ofType(classOf[URI])
+ .describedAs("path")
+ val configOverrideOpt =
+ parser.accepts("config", "A configuration value in the form key=value. Command line properties override any configuration values given.")
+ .withRequiredArg
+ .ofType(classOf[KeyValuePair])
+ .describedAs("key=value")
+ var options = parser.parse(args: _*)
+
+ // Verify legitimate parameters.
+ if (!options.has(configPathOpt)) {
+ parser.printHelpOn(System.err)
+ System.exit(-1)
+ }
+
+ // Set up the job parameters.
+ val configFactoryClass = options.valueOf(configFactoryOpt)
+ val configPaths = options.valuesOf(configPathOpt)
+ val configFactory = Class.forName(configFactoryClass).newInstance.asInstanceOf[ConfigFactory]
+ val configOverrides = options.valuesOf(configOverrideOpt).map(kv => (kv.key, kv.value)).toMap
+
+ val configs: Buffer[java.util.Map[String, String]] = configPaths.map(configFactory.getConfig)
+ configs += configOverrides
+
+ new JobRunner(new MapConfig(configs)).run
+ }
+}
+
+/**
+ * ConfigRunner is a helper class that sets up and executes a Samza job based
+ * on a config URI. The configFactory is instantiated, fed the configPath,
+ * and returns a Config, which is used to execute the job.
+ */
+class JobRunner(config: Config) extends Logging with Runnable {
+
+ def run() {
+ val conf = rewriteConfig(config)
+
+ val jobFactoryClass = conf.getStreamJobFactoryClass match {
+ case Some(factoryClass) => factoryClass
+ case _ => throw new SamzaException("no job factory class defined")
+ }
+
+ val jobFactory = Class.forName(jobFactoryClass).newInstance.asInstanceOf[StreamJobFactory]
+
+ info("job factory: %s" format (jobFactoryClass))
+ debug("config: %s" format (conf))
+
+ // Create the actual job, and submit it.
+ val job = jobFactory.getJob(conf).submit
+
+ info("waiting for job to start")
+
+ // Wait until the job has started, then exit.
+ Option(job.waitForStatus(Running, 500)) match {
+ case Some(appStatus) => {
+ if (Running.equals(appStatus)) {
+ info("job started successfully")
+ } else {
+ warn("unable to start job successfully. job has status %s" format (appStatus))
+ }
+ }
+ case _ => warn("unable to start job successfully.")
+ }
+
+ info("exiting")
+ }
+
+ // Apply any and all config re-writer classes that the user has specified
+ def rewriteConfig(config: Config): Config = {
+ def rewrite(c: Config, rewriterName: String): Config = {
+ val klass = config
+ .getConfigRewriterClass(rewriterName)
+ .getOrElse(throw new SamzaException("Unable to find class config for config rewriter %s." format rewriterName))
+ val rewriter = Util.getObj[ConfigRewriter](klass)
+ info("Re-writing config file with " + rewriter)
+ rewriter.rewrite(rewriterName, c)
+ }
+
+ config.getConfigRewriters match {
+ case Some(rewriters) => rewriters.split(",").foldLeft(config)(rewrite(_, _))
+ case None => config
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/5ff71e51/samza-core/src/main/scala/org/apache/samza/job/ShellCommandBuilder.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/job/ShellCommandBuilder.scala b/samza-core/src/main/scala/org/apache/samza/job/ShellCommandBuilder.scala
new file mode 100644
index 0000000..f55ca4c
--- /dev/null
+++ b/samza-core/src/main/scala/org/apache/samza/job/ShellCommandBuilder.scala
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.job
+
+import scala.collection.JavaConversions._
+import org.apache.samza.config.ShellCommandConfig
+import org.apache.samza.config.ShellCommandConfig.Config2ShellCommand
+import org.apache.samza.config.serializers.JsonConfigSerializer
+
+class ShellCommandBuilder extends CommandBuilder {
+ def buildCommand() = config.getCommand
+
+ def buildEnvironment(): java.util.Map[String, String] = {
+ val parts = if (partitions.size() > 0) partitions.map(_.getPartitionId.toString).reduceLeft(_ + "," + _) else ""
+
+ Map(
+ ShellCommandConfig.ENV_TASK_NAME -> name,
+ ShellCommandConfig.ENV_PARTITION_IDS -> parts,
+ ShellCommandConfig.ENV_CONFIG -> JsonConfigSerializer.toJson(config),
+ ShellCommandConfig.ENV_SAMZA_OPTS -> config.getTaskOpts.getOrElse(""))
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/5ff71e51/samza-core/src/main/scala/org/apache/samza/job/local/LocalJobFactory.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/job/local/LocalJobFactory.scala b/samza-core/src/main/scala/org/apache/samza/job/local/LocalJobFactory.scala
new file mode 100644
index 0000000..ddb119b
--- /dev/null
+++ b/samza-core/src/main/scala/org/apache/samza/job/local/LocalJobFactory.scala
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.job.local
+import org.apache.samza.config.TaskConfig._
+import org.apache.samza.config.Config
+import org.apache.samza.config.SystemConfig._
+import org.apache.samza.config.StreamConfig._
+import org.apache.samza.config.ShellCommandConfig._
+import org.apache.samza.job.CommandBuilder
+import org.apache.samza.job.StreamJob
+import org.apache.samza.job.StreamJobFactory
+import scala.collection.JavaConversions._
+import org.apache.samza.Partition
+import grizzled.slf4j.Logging
+import org.apache.samza.SamzaException
+import org.apache.samza.container.SamzaContainer
+import org.apache.samza.util.Util
+import org.apache.samza.job.ShellCommandBuilder
+
+class LocalJobFactory extends StreamJobFactory with Logging {
+ def getJob(config: Config): StreamJob = {
+ val taskName = "local-task"
+ val partitions = Util.getMaxInputStreamPartitions(config)
+
+ info("got partitions for job %s" format partitions)
+
+ if (partitions.size <= 0) {
+ throw new SamzaException("No partitions were detected for your input streams. It's likely that the system(s) specified don't know about the input streams: %s" format config.getInputStreams)
+ }
+
+ config.getCommandClass match {
+ case Some(cmdBuilderClassName) => {
+ // A command class was specified, so we need to use a process job to
+ // execute the command in its own process.
+ val cmdBuilder = Class.forName(cmdBuilderClassName).newInstance.asInstanceOf[CommandBuilder]
+
+ cmdBuilder
+ .setConfig(config)
+ .setName(taskName)
+ .setPartitions(partitions)
+
+ val processBuilder = new ProcessBuilder(cmdBuilder.buildCommand.split(" ").toList)
+
+ processBuilder
+ .environment
+ .putAll(cmdBuilder.buildEnvironment)
+
+ new ProcessJob(processBuilder)
+ }
+ case _ => {
+ info("No config specified for %s. Defaulting to ThreadJob, which is only meant for debugging." format COMMAND_BUILDER)
+
+ // Give developers a nice friendly warning if they've specified task.opts and are using a threaded job.
+ config.getTaskOpts match {
+ case Some(taskOpts) => warn("%s was specified in config, but is not being used because job is being executed with ThreadJob. You probably want to run %s=%s." format (TASK_JVM_OPTS, COMMAND_BUILDER, classOf[ShellCommandBuilder].getName))
+ case _ => None
+ }
+
+ // No command class was specified, so execute the job in this process
+ // using a threaded job.
+ new ThreadJob(SamzaContainer(taskName, partitions, config))
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/5ff71e51/samza-core/src/main/scala/org/apache/samza/job/local/ProcessJob.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/job/local/ProcessJob.scala b/samza-core/src/main/scala/org/apache/samza/job/local/ProcessJob.scala
new file mode 100644
index 0000000..a9ecd97
--- /dev/null
+++ b/samza-core/src/main/scala/org/apache/samza/job/local/ProcessJob.scala
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.job.local
+
+import org.apache.samza.job.ApplicationStatus.New
+import org.apache.samza.job.ApplicationStatus.Running
+import org.apache.samza.job.ApplicationStatus.SuccessfulFinish
+import org.apache.samza.job.ApplicationStatus.UnsuccessfulFinish
+import org.apache.samza.job.ApplicationStatus
+import org.apache.samza.job.StreamJob
+import grizzled.slf4j.Logging
+import java.util.concurrent.CountDownLatch
+import java.io.BufferedReader
+import java.io.InputStreamReader
+import java.io.InputStream
+import java.io.OutputStream
+import org.apache.samza.SamzaException
+
+class ProcessJob(processBuilder: ProcessBuilder) extends StreamJob with Logging {
+ var jobStatus: Option[ApplicationStatus] = None
+ var process: Process = null
+
+ def submit: StreamJob = {
+ val waitForThreadStart = new CountDownLatch(1)
+ jobStatus = Some(New)
+
+ // create a non-daemon thread to make job runner block until the job finishes.
+ // without this, the proc dies when job runner ends.
+ val procThread = new Thread {
+ override def run {
+ process = processBuilder.start
+
+ // pipe all output to this process's streams
+ val outThread = new Thread(new Piper(process.getInputStream, System.out))
+ val errThread = new Thread(new Piper(process.getErrorStream, System.err))
+ outThread.setDaemon(true)
+ errThread.setDaemon(true)
+ outThread.start
+ errThread.start
+ waitForThreadStart.countDown
+ process.waitFor
+ }
+ }
+
+ procThread.start
+ waitForThreadStart.await
+ jobStatus = Some(Running)
+
+ ProcessJob.this
+ }
+
+ def kill: StreamJob = {
+ process.destroy
+ jobStatus = Some(UnsuccessfulFinish);
+ ProcessJob.this
+ }
+
+ def waitForFinish(timeoutMs: Long) = {
+ val thread = new Thread {
+ setDaemon(true)
+ override def run {
+ try {
+ process.waitFor
+ } catch {
+ case e: InterruptedException => None
+ }
+ }
+ }
+
+ thread.start
+ thread.join(timeoutMs)
+ thread.interrupt
+ jobStatus.getOrElse(null)
+ }
+
+ def waitForStatus(status: ApplicationStatus, timeoutMs: Long) = {
+ val start = System.currentTimeMillis
+
+ while (System.currentTimeMillis - start < timeoutMs && status != jobStatus) {
+ Thread.sleep(500)
+ }
+
+ jobStatus.getOrElse(null)
+ }
+
+ def getStatus = jobStatus.getOrElse(null)
+}
+
+/**
+ * Silly class to forward bytes from one stream to another. Using this to pipe
+ * output from subprocess to this process' stdout/stderr.
+ */
+class Piper(in: InputStream, out: OutputStream) extends Runnable {
+ def run() {
+ try {
+ val b = new Array[Byte](512)
+ var read = 1;
+ while (read > -1) {
+ read = in.read(b, 0, b.length)
+ if (read > -1) {
+ out.write(b, 0, read)
+ out.flush()
+ }
+ }
+ } catch {
+ case e: Exception => throw new SamzaException("Broken pipe", e);
+ } finally {
+ in.close()
+ out.close()
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/5ff71e51/samza-core/src/main/scala/org/apache/samza/job/local/ThreadJob.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/job/local/ThreadJob.scala b/samza-core/src/main/scala/org/apache/samza/job/local/ThreadJob.scala
new file mode 100644
index 0000000..62994b0
--- /dev/null
+++ b/samza-core/src/main/scala/org/apache/samza/job/local/ThreadJob.scala
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.job.local
+
+import grizzled.slf4j.Logging
+import org.apache.samza.job.StreamJob
+import org.apache.samza.job.ApplicationStatus
+import org.apache.samza.job.ApplicationStatus.New
+import org.apache.samza.job.ApplicationStatus.Running
+import org.apache.samza.job.ApplicationStatus.SuccessfulFinish
+import org.apache.samza.job.ApplicationStatus.UnsuccessfulFinish
+
+class ThreadJob(runnable: Runnable) extends StreamJob with Logging {
+ @volatile var jobStatus: Option[ApplicationStatus] = None
+ var thread: Thread = null
+
+ def submit: StreamJob = {
+ jobStatus = Some(New)
+
+ // create a non-daemon thread to make job runner block until the job finishes.
+ // without this, the proc dies when job runner ends.
+ thread = new Thread {
+ override def run {
+ try {
+ runnable.run
+ jobStatus = Some(SuccessfulFinish)
+ } catch {
+ case e: Throwable => {
+ error("Failing job with exception.", e)
+ jobStatus = Some(UnsuccessfulFinish)
+ throw e
+ }
+ }
+ }
+ }
+ thread.setName("ThreadJob")
+ thread.start
+ jobStatus = Some(Running)
+
+ ThreadJob.this
+ }
+
+ def kill: StreamJob = {
+ thread.interrupt
+ ThreadJob.this
+ }
+
+ def waitForFinish(timeoutMs: Long) = {
+ thread.join(timeoutMs)
+ jobStatus.getOrElse(null)
+ }
+
+ def waitForStatus(status: ApplicationStatus, timeoutMs: Long) = {
+ val start = System.currentTimeMillis
+
+ while (System.currentTimeMillis - start < timeoutMs && !status.equals(jobStatus.getOrElse(null))) {
+ Thread.sleep(500)
+ }
+
+ jobStatus.getOrElse(null)
+ }
+
+ def getStatus = jobStatus.getOrElse(null)
+}
http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/5ff71e51/samza-core/src/main/scala/org/apache/samza/metrics/JmxServer.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/metrics/JmxServer.scala b/samza-core/src/main/scala/org/apache/samza/metrics/JmxServer.scala
new file mode 100644
index 0000000..eee213d
--- /dev/null
+++ b/samza-core/src/main/scala/org/apache/samza/metrics/JmxServer.scala
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.metrics
+
+import grizzled.slf4j.Logging
+import java.rmi.server.RMIServerSocketFactory
+import java.net.{ InetAddress, ServerSocket }
+import java.rmi.registry.LocateRegistry
+import management.ManagementFactory
+import java.util
+import javax.management.remote.{ JMXConnectorServerFactory, JMXServiceURL }
+import org.apache.samza.config.Config
+
+/**
+ * Programmatically start the JMX server and its accompanying RMI server. This is necessary in order to reliably
+ * and - heh - simply request and know a dynamic port such that processes on the same machine do not collide when
+ * opening JMX servers on the same port. Server will start upon instantiation.
+ *
+ * Note: This server starts the JMX server, which runs in a separate thread and must be stopped or it will prevent
+ * the process from ending.
+ *
+ * @param requestedPort Port on which to start JMX server, 0 for ephemeral
+ */
+class JmxServer(requestedPort: Int) extends Logging {
+ def this() = this(0)
+
+ // Instance construction
+ val (jmxServer, url, actualPort) = {
+ // An RMIServerSocketFactory that will tell what port it opened up. Imagine that.
+ class UpfrontRMIServerSocketFactory extends RMIServerSocketFactory {
+ var lastSS: ServerSocket = null
+ def createServerSocket(port: Int): ServerSocket = {
+ lastSS = new ServerSocket(port)
+ lastSS
+ }
+ }
+
+ // Check if the system property has been set and, if not, set it to what we need. Warn otherwise.
+ def updateSystemProperty(prop: String, value: String) = {
+ val existingProp = System.getProperty(prop)
+ if (existingProp == null) {
+ debug("Setting new system property of %s to %s" format (prop, value))
+ System.setProperty(prop, value)
+ } else {
+ info("Not overriding system property %s as already has value %s" format (prop, existingProp))
+ }
+ }
+
+ if (System.getProperty("com.sun.management.jmxremote") != null) {
+ warn("System property com.sun.management.jmxremote has been specified, starting the JVM's JMX server as well. " +
+ "This behavior is not well defined and our values will collide with any set on command line.")
+ }
+
+ val hostname = InetAddress.getLocalHost.getHostName
+ info("According to InetAddress.getLocalHost.getHostName we are " + hostname)
+ updateSystemProperty("com.sun.management.jmxremote.authenticate", "false")
+ updateSystemProperty("com.sun.management.jmxremote.ssl", "false")
+ updateSystemProperty("java.rmi.server.hostname", hostname)
+
+ val ssFactory = new UpfrontRMIServerSocketFactory
+ LocateRegistry.createRegistry(requestedPort, null, ssFactory)
+ val actualPort = ssFactory.lastSS.getLocalPort
+ val mbs = ManagementFactory.getPlatformMBeanServer
+ val env = new util.HashMap[String, Object]()
+ val url = new JMXServiceURL("service:jmx:rmi:///jndi/rmi://" + hostname + ":" + actualPort + "/jmxrmi")
+ val jmxServer = JMXConnectorServerFactory.newJMXConnectorServer(url, env, mbs)
+
+ (jmxServer, url.toString, actualPort)
+ }
+
+ jmxServer.start
+ info("Started " + toString)
+
+ /**
+ * Get RMI port the JMX server is listening on.
+ * @return RMI port
+ */
+ def getPort = actualPort
+
+ /**
+ * Get Jmx URL for this server
+ * @return Jmx-Style URL string
+ */
+ def getJmxUrl = url
+
+ /**
+ * Stop the JMX server. Must be called at program end or will prevent termination.
+ */
+ def stop = jmxServer.stop
+
+ override def toString = "JmxServer port=%d url=%s" format (getPort, getJmxUrl)
+}
+