You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by cr...@apache.org on 2014/07/17 21:41:40 UTC
git commit: SAMZA-337;
compress configs passed through environment variables in YARN
Repository: incubator-samza
Updated Branches:
refs/heads/master 40bbe4da5 -> 5aef56f4c
SAMZA-337; compress configs passed through environment variables in YARN
Project: http://git-wip-us.apache.org/repos/asf/incubator-samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-samza/commit/5aef56f4
Tree: http://git-wip-us.apache.org/repos/asf/incubator-samza/tree/5aef56f4
Diff: http://git-wip-us.apache.org/repos/asf/incubator-samza/diff/5aef56f4
Branch: refs/heads/master
Commit: 5aef56f4c169be354be1dda2ce6126cb67c83009
Parents: 40bbe4d
Author: Chinmay Soman <ch...@gmail.com>
Authored: Thu Jul 17 12:41:29 2014 -0700
Committer: Chris Riccomini <cr...@criccomi-mn.linkedin.biz>
Committed: Thu Jul 17 12:41:29 2014 -0700
----------------------------------------------------------------------
build.gradle | 4 ++
gradle/dependency-versions.gradle | 1 +
.../samza/config/ShellCommandConfig.scala | 8 ++++
.../apache/samza/container/SamzaContainer.scala | 30 +++++++++++++--
.../apache/samza/job/ShellCommandBuilder.scala | 21 +++++++++--
.../main/scala/org/apache/samza/util/Util.scala | 39 +++++++++++++++++++-
.../scala/org/apache/samza/util/TestUtil.scala | 24 ++++++++++++
7 files changed, 119 insertions(+), 8 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/5aef56f4/build.gradle
----------------------------------------------------------------------
diff --git a/build.gradle b/build.gradle
index 6d0ac97..b201f77 100644
--- a/build.gradle
+++ b/build.gradle
@@ -114,6 +114,10 @@ project(":samza-core_$scalaVersion") {
compile "org.clapper:grizzled-slf4j_$scalaVersion:$grizzledVersion"
compile "net.sf.jopt-simple:jopt-simple:$joptSimpleVersion"
compile "org.codehaus.jackson:jackson-jaxrs:$jacksonVersion"
+
+ // Temporary workaround to reduce config size via compression (SAMZA-337). Remove this
+ // once we figure out a clean way to do this.
+ compile "commons-codec:commons-codec:$commonsCodecVersion"
testCompile "junit:junit:$junitVersion"
testCompile "org.mockito:mockito-all:$mockitoVersion"
testCompile "org.scalatest:scalatest_$scalaVersion:$scalaTestVersion"
http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/5aef56f4/gradle/dependency-versions.gradle
----------------------------------------------------------------------
diff --git a/gradle/dependency-versions.gradle b/gradle/dependency-versions.gradle
index 7373582..c8bd830 100644
--- a/gradle/dependency-versions.gradle
+++ b/gradle/dependency-versions.gradle
@@ -32,4 +32,5 @@
yarnVersion = "2.4.0"
slf4jVersion = "1.6.2"
guavaVersion = "17.0"
+ commonsCodecVersion = "1.9"
}
http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/5aef56f4/samza-core/src/main/scala/org/apache/samza/config/ShellCommandConfig.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/config/ShellCommandConfig.scala b/samza-core/src/main/scala/org/apache/samza/config/ShellCommandConfig.scala
index 4c2d365..0cdc0d1 100644
--- a/samza-core/src/main/scala/org/apache/samza/config/ShellCommandConfig.scala
+++ b/samza-core/src/main/scala/org/apache/samza/config/ShellCommandConfig.scala
@@ -41,8 +41,14 @@ object ShellCommandConfig {
*/
val ENV_JAVA_OPTS = "JAVA_OPTS"
+ /**
+ * Specifies whether the config for ENV_CONFIG and ENV_SYSTEM_STREAMS are compressed or not.
+ */
+ val ENV_COMPRESS_CONFIG = "SAMZA_COMPRESS_CONFIG"
+
val COMMAND_SHELL_EXECUTE = "task.execute"
val TASK_JVM_OPTS = "task.opts"
+ val COMPRESS_ENV_CONFIG = "task.config.compress"
implicit def Config2ShellCommand(config: Config) = new ShellCommandConfig(config)
}
@@ -51,4 +57,6 @@ class ShellCommandConfig(config: Config) extends ScalaMapConfig(config) {
def getCommand = getOption(ShellCommandConfig.COMMAND_SHELL_EXECUTE).getOrElse("bin/run-container.sh")
def getTaskOpts = getOption(ShellCommandConfig.TASK_JVM_OPTS)
+
+ def isEnvConfigCompressed = getBoolean(ShellCommandConfig.COMPRESS_ENV_CONFIG, false)
}
http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/5aef56f4/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala b/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
index b303615..bff6000 100644
--- a/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
+++ b/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
@@ -65,13 +65,37 @@ import org.apache.samza.checkpoint.OffsetManager
import org.apache.samza.system.StreamMetadataCache
object SamzaContainer extends Logging {
+
+ /**
+ * Get the decompressed parameter value for the given parameter (if necessary)
+ * @param param The parameter which is to be decompressed (if necessary)
+ * @return A valid parameter value
+ */
+ def getParameter(param: String, isCompressed: Boolean) : String = {
+ if (isCompressed) {
+ info("Compression is ON !")
+ val decomressedParam = Util.decompress(param)
+ info("Got param = " + decomressedParam)
+ decomressedParam
+ } else {
+ info("Parameter is uncompressed. Using it as-is")
+ param
+ }
+ }
+
def main(args: Array[String]) {
val jmxServer = new JmxServer
val containerName = System.getenv(ShellCommandConfig.ENV_CONTAINER_NAME)
- val configStr = System.getenv(ShellCommandConfig.ENV_CONFIG)
- val config = JsonConfigSerializer.fromJson(configStr)
- val encodedStreamsAndPartitions = System.getenv(ShellCommandConfig.ENV_SYSTEM_STREAMS)
+ /**
+ * If the compressed option is enabled in config, de-compress the 'ENV_CONFIG' and 'ENV_SYSTEM_STREAMS'
+ * properties. Note: This is a temporary workaround to reduce the size of the config and hence size
+ * of the environment variable(s) exported while starting a Samza container (SAMZA-337)
+ */
+ val isCompressed = if (System.getenv(ShellCommandConfig.ENV_COMPRESS_CONFIG).equals("TRUE")) true else false
+ val configStr = getParameter(System.getenv(ShellCommandConfig.ENV_CONFIG), isCompressed)
+ val config = JsonConfigSerializer.fromJson(configStr)
+ val encodedStreamsAndPartitions = getParameter(System.getenv(ShellCommandConfig.ENV_SYSTEM_STREAMS), isCompressed)
val partitions = Util.deserializeSSPSetFromJSON(encodedStreamsAndPartitions)
if (partitions.isEmpty) {
http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/5aef56f4/samza-core/src/main/scala/org/apache/samza/job/ShellCommandBuilder.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/job/ShellCommandBuilder.scala b/samza-core/src/main/scala/org/apache/samza/job/ShellCommandBuilder.scala
index f8865b1..4635bb2 100644
--- a/samza-core/src/main/scala/org/apache/samza/job/ShellCommandBuilder.scala
+++ b/samza-core/src/main/scala/org/apache/samza/job/ShellCommandBuilder.scala
@@ -29,12 +29,25 @@ class ShellCommandBuilder extends CommandBuilder {
def buildCommand() = config.getCommand
def buildEnvironment(): java.util.Map[String, String] = {
- val streamsAndPartsString = Util.serializeSSPSetToJSON(systemStreamPartitions.toSet) // Java to Scala set conversion
-
+ var streamsAndPartsString = Util.serializeSSPSetToJSON(systemStreamPartitions.toSet) // Java to Scala set conversion
+ var envConfig = JsonConfigSerializer.toJson(config)
+ val isCompressed = if(config.isEnvConfigCompressed) "TRUE" else "FALSE"
+
+ if (config.isEnvConfigCompressed) {
+ /**
+ * If the compressed option is enabled in config, compress the 'ENV_CONFIG' and 'ENV_SYSTEM_STREAMS'
+ * properties. Note: This is a temporary workaround to reduce the size of the config and hence size
+ * of the environment variable(s) exported while starting a Samza container (SAMZA-337)
+ */
+ streamsAndPartsString = Util.compress(streamsAndPartsString)
+ envConfig = Util.compress(envConfig)
+ }
+
Map(
ShellCommandConfig.ENV_CONTAINER_NAME -> name,
ShellCommandConfig.ENV_SYSTEM_STREAMS -> streamsAndPartsString,
- ShellCommandConfig.ENV_CONFIG -> JsonConfigSerializer.toJson(config),
- ShellCommandConfig.ENV_JAVA_OPTS -> config.getTaskOpts.getOrElse(""))
+ ShellCommandConfig.ENV_CONFIG -> envConfig,
+ ShellCommandConfig.ENV_JAVA_OPTS -> config.getTaskOpts.getOrElse(""),
+ ShellCommandConfig.ENV_COMPRESS_CONFIG -> isCompressed)
}
}
http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/5aef56f4/samza-core/src/main/scala/org/apache/samza/util/Util.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/util/Util.scala b/samza-core/src/main/scala/org/apache/samza/util/Util.scala
index 11c23d0..60d96c9 100644
--- a/samza-core/src/main/scala/org/apache/samza/util/Util.scala
+++ b/samza-core/src/main/scala/org/apache/samza/util/Util.scala
@@ -19,10 +19,12 @@
package org.apache.samza.util
-import java.io.File
+import java.io._
import java.lang.management.ManagementFactory
import java.util.Random
+import java.util.zip.{GZIPInputStream, GZIPOutputStream}
import grizzled.slf4j.Logging
+import org.apache.commons.codec.binary.Base64
import org.apache.samza.{ Partition, SamzaException }
import org.apache.samza.config.Config
import org.apache.samza.config.SystemConfig.Config2System
@@ -180,4 +182,39 @@ object Util extends Logging {
def getContainerPID(): String = {
ManagementFactory.getRuntimeMXBean().getName()
}
+
+ /**
+ * Returns a compressed + base64 encoded representation of the given string
+ * @param origStr Original string to be processed
+ * @return A processed string after compression (gzip) and encoding (base64)
+ */
+ def compress(origStr: String): String = {
+ val bos = new ByteArrayOutputStream()
+ val gzos = new GZIPOutputStream(bos)
+ gzos.write(origStr.getBytes())
+ gzos.close()
+ val compressedStr = Base64.encodeBase64String(bos.toByteArray)
+ bos.close()
+ compressedStr
+ }
+
+ /**
+ * Returns the original string from the given compressed string. This function assumes
+ * that the given string is compressed + base64 encode (using the above compress function).
+ *
+ * @param compresedStr Specified string to decompress
+ * @return Original string after decoding (base64) and uncompressing (gzip)
+ */
+ def decompress(compresedStr: String) : String = {
+ val rawBytes = Base64.decodeBase64(compresedStr)
+ val gzis = new GZIPInputStream(new ByteArrayInputStream(rawBytes))
+ val br = new BufferedReader(new InputStreamReader(gzis))
+ val builder = new StringBuilder
+ var line = br.readLine()
+ while (line != null) {
+ builder.append(line)
+ line = br.readLine()
+ }
+ builder.toString()
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/5aef56f4/samza-core/src/test/scala/org/apache/samza/util/TestUtil.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/test/scala/org/apache/samza/util/TestUtil.scala b/samza-core/src/test/scala/org/apache/samza/util/TestUtil.scala
index a67ecdf..5ed167d 100644
--- a/samza-core/src/test/scala/org/apache/samza/util/TestUtil.scala
+++ b/samza-core/src/test/scala/org/apache/samza/util/TestUtil.scala
@@ -29,7 +29,11 @@ import org.apache.samza.system.SystemFactory
import org.apache.samza.metrics.MetricsRegistry
import org.apache.samza.config.Config
+import scala.util.Random
+
class TestUtil {
+ val random = new Random(System.currentTimeMillis())
+
@Test
def testGetInputStreamPartitions {
val expectedPartitionsPerStream = 1
@@ -94,6 +98,26 @@ class TestUtil {
val backToStreamsAndParts = deserializeSSPSetFromJSON(asString)
assertEquals(streamsAndParts, backToStreamsAndParts)
}
+
+ /**
+ * Generate a random alphanumeric string of the specified length
+ * @param length Specifies length of the string to generate
+ * @return An alphanumeric string
+ */
+ def generateString (length : Int) : String = {
+ random.alphanumeric.take(length).mkString
+ }
+
+ @Test
+ def testCompressAndDecompressUtility() {
+ var len : Integer = 0
+ (10 until 1000).foreach(len => {
+ val sample = generateString(len)
+ val compressedStr = Util.compress(sample)
+ val deCompressedStr = Util.decompress(compressedStr)
+ assertEquals(sample, deCompressedStr)
+ })
+ }
}
/**