You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by cr...@apache.org on 2014/07/17 21:41:40 UTC

git commit: SAMZA-337; compress configs passed through environment variables in YARN

Repository: incubator-samza
Updated Branches:
  refs/heads/master 40bbe4da5 -> 5aef56f4c


SAMZA-337; compress configs passed through environment variables in YARN


Project: http://git-wip-us.apache.org/repos/asf/incubator-samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-samza/commit/5aef56f4
Tree: http://git-wip-us.apache.org/repos/asf/incubator-samza/tree/5aef56f4
Diff: http://git-wip-us.apache.org/repos/asf/incubator-samza/diff/5aef56f4

Branch: refs/heads/master
Commit: 5aef56f4c169be354be1dda2ce6126cb67c83009
Parents: 40bbe4d
Author: Chinmay Soman <ch...@gmail.com>
Authored: Thu Jul 17 12:41:29 2014 -0700
Committer: Chris Riccomini <cr...@criccomi-mn.linkedin.biz>
Committed: Thu Jul 17 12:41:29 2014 -0700

----------------------------------------------------------------------
 build.gradle                                    |  4 ++
 gradle/dependency-versions.gradle               |  1 +
 .../samza/config/ShellCommandConfig.scala       |  8 ++++
 .../apache/samza/container/SamzaContainer.scala | 30 +++++++++++++--
 .../apache/samza/job/ShellCommandBuilder.scala  | 21 +++++++++--
 .../main/scala/org/apache/samza/util/Util.scala | 39 +++++++++++++++++++-
 .../scala/org/apache/samza/util/TestUtil.scala  | 24 ++++++++++++
 7 files changed, 119 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/5aef56f4/build.gradle
----------------------------------------------------------------------
diff --git a/build.gradle b/build.gradle
index 6d0ac97..b201f77 100644
--- a/build.gradle
+++ b/build.gradle
@@ -114,6 +114,10 @@ project(":samza-core_$scalaVersion") {
     compile "org.clapper:grizzled-slf4j_$scalaVersion:$grizzledVersion"
     compile "net.sf.jopt-simple:jopt-simple:$joptSimpleVersion"
     compile "org.codehaus.jackson:jackson-jaxrs:$jacksonVersion"
+
+    // Temporary workaround to reduce config size via compression (SAMZA-337). Remove this
+    // once we figure out a clean way to do this.
+    compile "commons-codec:commons-codec:$commonsCodecVersion"
     testCompile "junit:junit:$junitVersion"
     testCompile "org.mockito:mockito-all:$mockitoVersion"
     testCompile "org.scalatest:scalatest_$scalaVersion:$scalaTestVersion"

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/5aef56f4/gradle/dependency-versions.gradle
----------------------------------------------------------------------
diff --git a/gradle/dependency-versions.gradle b/gradle/dependency-versions.gradle
index 7373582..c8bd830 100644
--- a/gradle/dependency-versions.gradle
+++ b/gradle/dependency-versions.gradle
@@ -32,4 +32,5 @@
   yarnVersion = "2.4.0"
   slf4jVersion = "1.6.2"
   guavaVersion = "17.0"
+  commonsCodecVersion = "1.9"
 }

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/5aef56f4/samza-core/src/main/scala/org/apache/samza/config/ShellCommandConfig.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/config/ShellCommandConfig.scala b/samza-core/src/main/scala/org/apache/samza/config/ShellCommandConfig.scala
index 4c2d365..0cdc0d1 100644
--- a/samza-core/src/main/scala/org/apache/samza/config/ShellCommandConfig.scala
+++ b/samza-core/src/main/scala/org/apache/samza/config/ShellCommandConfig.scala
@@ -41,8 +41,14 @@ object ShellCommandConfig {
    */
   val ENV_JAVA_OPTS = "JAVA_OPTS"
 
+  /**
+   * Specifies whether the config for ENV_CONFIG and ENV_SYSTEM_STREAMS are compressed or not.
+   */
+  val ENV_COMPRESS_CONFIG = "SAMZA_COMPRESS_CONFIG"
+
   val COMMAND_SHELL_EXECUTE = "task.execute"
   val TASK_JVM_OPTS = "task.opts"
+  val COMPRESS_ENV_CONFIG = "task.config.compress"
 
   implicit def Config2ShellCommand(config: Config) = new ShellCommandConfig(config)
 }
@@ -51,4 +57,6 @@ class ShellCommandConfig(config: Config) extends ScalaMapConfig(config) {
   def getCommand = getOption(ShellCommandConfig.COMMAND_SHELL_EXECUTE).getOrElse("bin/run-container.sh")
 
   def getTaskOpts = getOption(ShellCommandConfig.TASK_JVM_OPTS)
+
+  def isEnvConfigCompressed = getBoolean(ShellCommandConfig.COMPRESS_ENV_CONFIG, false)
 }

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/5aef56f4/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala b/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
index b303615..bff6000 100644
--- a/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
+++ b/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
@@ -65,13 +65,37 @@ import org.apache.samza.checkpoint.OffsetManager
 import org.apache.samza.system.StreamMetadataCache
 
 object SamzaContainer extends Logging {
+
+  /**
+   * Get the decompressed parameter value for the given parameter (if necessary)
+   * @param param The parameter which is to be decompressed (if necessary)
+   * @return A valid parameter value
+   */
+  def getParameter(param: String, isCompressed: Boolean) : String = {
+    if (isCompressed) {
+      info("Compression is ON !")
+      val decomressedParam = Util.decompress(param)
+      info("Got param = " + decomressedParam)
+      decomressedParam
+    } else {
+      info("Parameter is uncompressed. Using it as-is")
+      param
+    }
+  }
+
   def main(args: Array[String]) {
     val jmxServer = new JmxServer
     val containerName = System.getenv(ShellCommandConfig.ENV_CONTAINER_NAME)
-    val configStr = System.getenv(ShellCommandConfig.ENV_CONFIG)
-    val config = JsonConfigSerializer.fromJson(configStr)
-    val encodedStreamsAndPartitions = System.getenv(ShellCommandConfig.ENV_SYSTEM_STREAMS)
 
+    /**
+     * If the compressed option is enabled in config, de-compress the 'ENV_CONFIG' and 'ENV_SYSTEM_STREAMS'
+     * properties. Note: This is a temporary workaround to reduce the size of the config and hence size
+     * of the environment variable(s) exported while starting a Samza container (SAMZA-337)
+     */
+    val isCompressed = if (System.getenv(ShellCommandConfig.ENV_COMPRESS_CONFIG).equals("TRUE")) true else false
+    val configStr = getParameter(System.getenv(ShellCommandConfig.ENV_CONFIG), isCompressed)
+    val config = JsonConfigSerializer.fromJson(configStr)
+    val encodedStreamsAndPartitions = getParameter(System.getenv(ShellCommandConfig.ENV_SYSTEM_STREAMS), isCompressed)
     val partitions = Util.deserializeSSPSetFromJSON(encodedStreamsAndPartitions)
 
     if (partitions.isEmpty) {

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/5aef56f4/samza-core/src/main/scala/org/apache/samza/job/ShellCommandBuilder.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/job/ShellCommandBuilder.scala b/samza-core/src/main/scala/org/apache/samza/job/ShellCommandBuilder.scala
index f8865b1..4635bb2 100644
--- a/samza-core/src/main/scala/org/apache/samza/job/ShellCommandBuilder.scala
+++ b/samza-core/src/main/scala/org/apache/samza/job/ShellCommandBuilder.scala
@@ -29,12 +29,25 @@ class ShellCommandBuilder extends CommandBuilder {
   def buildCommand() = config.getCommand
 
   def buildEnvironment(): java.util.Map[String, String] = {
-    val streamsAndPartsString = Util.serializeSSPSetToJSON(systemStreamPartitions.toSet) // Java to Scala set conversion
-    
+    var streamsAndPartsString = Util.serializeSSPSetToJSON(systemStreamPartitions.toSet) // Java to Scala set conversion
+    var envConfig = JsonConfigSerializer.toJson(config)
+    val isCompressed = if(config.isEnvConfigCompressed) "TRUE" else "FALSE"
+
+    if (config.isEnvConfigCompressed) {
+      /**
+       * If the compressed option is enabled in config, compress the 'ENV_CONFIG' and 'ENV_SYSTEM_STREAMS'
+       * properties. Note: This is a temporary workaround to reduce the size of the config and hence size
+       * of the environment variable(s) exported while starting a Samza container (SAMZA-337)
+       */
+      streamsAndPartsString = Util.compress(streamsAndPartsString)
+      envConfig = Util.compress(envConfig)
+    }
+
     Map(
       ShellCommandConfig.ENV_CONTAINER_NAME -> name,
       ShellCommandConfig.ENV_SYSTEM_STREAMS -> streamsAndPartsString,
-      ShellCommandConfig.ENV_CONFIG -> JsonConfigSerializer.toJson(config),
-      ShellCommandConfig.ENV_JAVA_OPTS -> config.getTaskOpts.getOrElse(""))
+      ShellCommandConfig.ENV_CONFIG -> envConfig,
+      ShellCommandConfig.ENV_JAVA_OPTS -> config.getTaskOpts.getOrElse(""),
+      ShellCommandConfig.ENV_COMPRESS_CONFIG -> isCompressed)
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/5aef56f4/samza-core/src/main/scala/org/apache/samza/util/Util.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/util/Util.scala b/samza-core/src/main/scala/org/apache/samza/util/Util.scala
index 11c23d0..60d96c9 100644
--- a/samza-core/src/main/scala/org/apache/samza/util/Util.scala
+++ b/samza-core/src/main/scala/org/apache/samza/util/Util.scala
@@ -19,10 +19,12 @@
 
 package org.apache.samza.util
 
-import java.io.File
+import java.io._
 import java.lang.management.ManagementFactory
 import java.util.Random
+import java.util.zip.{GZIPInputStream, GZIPOutputStream}
 import grizzled.slf4j.Logging
+import org.apache.commons.codec.binary.Base64
 import org.apache.samza.{ Partition, SamzaException }
 import org.apache.samza.config.Config
 import org.apache.samza.config.SystemConfig.Config2System
@@ -180,4 +182,39 @@ object Util extends Logging {
   def getContainerPID(): String = {
     ManagementFactory.getRuntimeMXBean().getName()
   }
+
+  /**
+   * Returns a compressed + base64 encoded representation of the given string
+   * @param origStr Original string to be processed
+   * @return A processed string after compression (gzip) and encoding (base64)
+   */
+  def compress(origStr: String): String = {
+    val bos = new ByteArrayOutputStream()
+    val gzos = new GZIPOutputStream(bos)
+    gzos.write(origStr.getBytes())
+    gzos.close()
+    val compressedStr = Base64.encodeBase64String(bos.toByteArray)
+    bos.close()
+    compressedStr
+  }
+
+  /**
+   * Returns the original string from the given compressed string. This function assumes
+   * that the given string is compressed + base64 encode (using the above compress function).
+   * 
+   * @param compresedStr Specified string to decompress
+   * @return Original string after decoding (base64) and uncompressing (gzip)
+   */
+  def decompress(compresedStr: String) : String = {
+    val rawBytes = Base64.decodeBase64(compresedStr)
+    val gzis = new GZIPInputStream(new ByteArrayInputStream(rawBytes))
+    val br = new BufferedReader(new InputStreamReader(gzis))
+    val builder = new StringBuilder
+    var line = br.readLine()
+    while (line != null) {
+      builder.append(line)
+      line = br.readLine()
+    }
+    builder.toString()
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/5aef56f4/samza-core/src/test/scala/org/apache/samza/util/TestUtil.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/test/scala/org/apache/samza/util/TestUtil.scala b/samza-core/src/test/scala/org/apache/samza/util/TestUtil.scala
index a67ecdf..5ed167d 100644
--- a/samza-core/src/test/scala/org/apache/samza/util/TestUtil.scala
+++ b/samza-core/src/test/scala/org/apache/samza/util/TestUtil.scala
@@ -29,7 +29,11 @@ import org.apache.samza.system.SystemFactory
 import org.apache.samza.metrics.MetricsRegistry
 import org.apache.samza.config.Config
 
+import scala.util.Random
+
 class TestUtil {
+  val random = new Random(System.currentTimeMillis())
+
   @Test
   def testGetInputStreamPartitions {
     val expectedPartitionsPerStream = 1
@@ -94,6 +98,26 @@ class TestUtil {
     val backToStreamsAndParts = deserializeSSPSetFromJSON(asString)
     assertEquals(streamsAndParts, backToStreamsAndParts)
   }
+
+  /**
+   * Generate a random alphanumeric string of the specified length
+   * @param length Specifies length of the string to generate
+   * @return An alphanumeric string
+   */
+  def generateString (length : Int) : String = {
+    random.alphanumeric.take(length).mkString
+  }
+
+  @Test
+  def testCompressAndDecompressUtility() {
+    var len : Integer = 0
+    (10 until 1000).foreach(len => {
+      val sample = generateString(len)
+      val compressedStr = Util.compress(sample)
+      val deCompressedStr = Util.decompress(compressedStr)
+      assertEquals(sample, deCompressedStr)
+    })
+  }
 }
 
 /**