You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by pm...@apache.org on 2018/04/18 20:32:28 UTC

[2/3] samza git commit: Misc. Util cleanup

http://git-wip-us.apache.org/repos/asf/samza/blob/5d73ecda/samza-core/src/main/scala/org/apache/samza/util/FileUtil.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/util/FileUtil.scala b/samza-core/src/main/scala/org/apache/samza/util/FileUtil.scala
new file mode 100644
index 0000000..4b93543
--- /dev/null
+++ b/samza-core/src/main/scala/org/apache/samza/util/FileUtil.scala
@@ -0,0 +1,104 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.samza.util
+
+import java.io.{File, FileInputStream, FileOutputStream, ObjectInputStream, ObjectOutputStream}
+import java.util.zip.CRC32
+
+import org.apache.samza.util.Util.info
+
+object FileUtil {
+  /**
+    * Writes checksum & data to a file
+    * Checksum is pre-fixed to the data and is a 32-bit long type data.
+    * @param file The file handle to write to
+    * @param data The data to be written to the file
+    * */
+  def writeWithChecksum(file: File, data: String) = {
+    val checksum = getChecksum(data)
+    var oos: ObjectOutputStream = null
+    var fos: FileOutputStream = null
+    try {
+      fos = new FileOutputStream(file)
+      oos = new ObjectOutputStream(fos)
+      oos.writeLong(checksum)
+      oos.writeUTF(data)
+    } finally {
+      oos.close()
+      fos.close()
+    }
+  }
+
+  /**
+    * Reads from a file that has a checksum prepended to the data
+    * @param file The file handle to read from
+    * */
+  def readWithChecksum(file: File) = {
+    var fis: FileInputStream = null
+    var ois: ObjectInputStream = null
+    try {
+      fis = new FileInputStream(file)
+      ois = new ObjectInputStream(fis)
+      val checksumFromFile = ois.readLong()
+      val data = ois.readUTF()
+      if(checksumFromFile == getChecksum(data)) {
+        data
+      } else {
+        info("Checksum match failed. Data in file is corrupted. Skipping content.")
+        null
+      }
+    } finally {
+      ois.close()
+      fis.close()
+    }
+  }
+
+  /**
+    * Recursively remove a directory (or file), and all sub-directories. Equivalent
+    * to rm -rf.
+    */
+  def rm(file: File) {
+    if (file == null) {
+      return
+    } else if (file.isDirectory) {
+      val files = file.listFiles()
+      if (files != null) {
+        for (f <- files)
+          rm(f)
+      }
+      file.delete()
+    } else {
+      file.delete()
+    }
+  }
+
+  /**
+    * Generates the CRC32 checksum code for any given data
+    * @param data The string for which checksum has to be generated
+    * @return long type value representing the checksum
+    * */
+  def getChecksum(data: String) = {
+    val crc = new CRC32
+    crc.update(data.getBytes)
+    crc.getValue
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/5d73ecda/samza-core/src/main/scala/org/apache/samza/util/HttpUtil.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/util/HttpUtil.scala b/samza-core/src/main/scala/org/apache/samza/util/HttpUtil.scala
new file mode 100644
index 0000000..ea5eb5a
--- /dev/null
+++ b/samza-core/src/main/scala/org/apache/samza/util/HttpUtil.scala
@@ -0,0 +1,89 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.samza.util
+
+import java.io.{BufferedReader, IOException, InputStream, InputStreamReader}
+import java.net.{HttpURLConnection, URL}
+
+import org.apache.samza.SamzaException
+import org.apache.samza.util.Util.{error, warn}
+
+object HttpUtil {
+
+  /**
+    * Reads a URL and returns the response body as a string. Retries in an exponential backoff, but does no other error handling.
+    *
+    * @param url HTTP URL to read from.
+    * @param timeout how long to wait before timing out when connecting to or reading from the HTTP server.
+    * @param retryBackoff instance of exponentialSleepStrategy that encapsulates info on how long to sleep and retry operation
+    * @return string payload of the body of the HTTP response.
+    */
+  def read(url: URL, timeout: Int = 60000, retryBackoff: ExponentialSleepStrategy = new ExponentialSleepStrategy): String = {
+    var httpConn = getHttpConnection(url, timeout)
+    retryBackoff.run(loop => {
+      if(httpConn.getResponseCode != 200)
+      {
+        warn("Error: " + httpConn.getResponseCode)
+        val errorContent = readStream(httpConn.getErrorStream)
+        warn("Error reading stream, failed with response %s" format errorContent)
+        httpConn = getHttpConnection(url, timeout)
+      }
+      else
+      {
+        loop.done
+      }
+    },
+      (exception, loop) => {
+        exception match {
+          case ioe: IOException => {
+            warn("Error getting response from Job coordinator server. received IOException: %s. Retrying..." format ioe.getClass)
+            httpConn = getHttpConnection(url, timeout)
+          }
+          case e: Exception =>
+            loop.done
+            error("Unable to connect to Job coordinator server, received exception", e)
+            throw e
+        }
+      })
+
+    if(httpConn.getResponseCode != 200) {
+      throw new SamzaException("Unable to read JobModel from Jobcoordinator HTTP server")
+    }
+    readStream(httpConn.getInputStream)
+  }
+
+  def getHttpConnection(url: URL, timeout: Int): HttpURLConnection = {
+    val conn = url.openConnection()
+    conn.setConnectTimeout(timeout)
+    conn.setReadTimeout(timeout)
+    conn.asInstanceOf[HttpURLConnection]
+  }
+
+  private def readStream(stream: InputStream): String = {
+    val br = new BufferedReader(new InputStreamReader(stream))
+    var line: String = null
+    val body = Iterator.continually(br.readLine()).takeWhile(_ != null).mkString
+    br.close
+    stream.close
+    body
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/5d73ecda/samza-core/src/main/scala/org/apache/samza/util/LexicographicComparator.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/util/LexicographicComparator.scala b/samza-core/src/main/scala/org/apache/samza/util/LexicographicComparator.scala
deleted file mode 100644
index 93c5220..0000000
--- a/samza-core/src/main/scala/org/apache/samza/util/LexicographicComparator.scala
+++ /dev/null
@@ -1,39 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.samza.util;
-
-import java.util.Comparator
-
-/**
- * A comparator that applies a lexicographical comparison on byte arrays.
- */
-class LexicographicComparator extends Comparator[Array[Byte]] {
-  def compare(k1: Array[Byte], k2: Array[Byte]): Int = {
-    val l = math.min(k1.length, k2.length)
-    var i = 0
-    while (i < l) {
-      if (k1(i) != k2(i))
-        return (k1(i) & 0xff) - (k2(i) & 0xff)
-      i += 1
-    }
-    // okay prefixes are equal, the shorter array is less
-    k1.length - k2.length
-  }
-}

http://git-wip-us.apache.org/repos/asf/samza/blob/5d73ecda/samza-core/src/main/scala/org/apache/samza/util/ScalaJavaUtil.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/util/ScalaJavaUtil.scala b/samza-core/src/main/scala/org/apache/samza/util/ScalaJavaUtil.scala
new file mode 100644
index 0000000..f3ba746
--- /dev/null
+++ b/samza-core/src/main/scala/org/apache/samza/util/ScalaJavaUtil.scala
@@ -0,0 +1,62 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.samza.util
+
+import scala.collection.immutable.Map
+import scala.collection.JavaConverters._
+import scala.runtime.AbstractFunction0
+
+object ScalaJavaUtil {
+
+  /**
+    * Convert a Java map to a Scala immutable Map
+    * */
+  def toScalaMap[K, V](javaMap: java.util.Map[K, V]): Map[K, V] = {
+    javaMap.asScala.toMap
+  }
+
+  /**
+    * Wraps the provided value in an Scala Function, e.g. for use in [[Option#getOrDefault]]
+    *
+    * @param value the value to be wrapped
+    * @tparam T type of the value
+    * @return an AbstractFunction0 that returns contained value when called
+    */
+  def defaultValue[T](value: T): AbstractFunction0[T] = {
+    new AbstractFunction0[T] {
+      override def apply(): T = value
+    }
+  }
+
+  /**
+    * Wraps the provided Java Supplier in an Scala Function, e.g. for use in [[Option#getOrDefault]]
+    *
+    * @param javaFunction the java Supplier function to be wrapped
+    * @tparam T type of the value
+    * @return an AbstractFunction0 that returns contained value when called
+    */
+  def toScalaFunction[T](javaFunction: java.util.function.Supplier[T]): AbstractFunction0[T] = {
+    new AbstractFunction0[T] {
+      override def apply(): T = javaFunction.get()
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/5d73ecda/samza-core/src/main/scala/org/apache/samza/util/TimerUtil.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/util/TimerUtil.scala b/samza-core/src/main/scala/org/apache/samza/util/TimerUtil.scala
new file mode 100644
index 0000000..b538145
--- /dev/null
+++ b/samza-core/src/main/scala/org/apache/samza/util/TimerUtil.scala
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.util
+
+import org.apache.samza.metrics.Timer
+
+/**
+ * A helper class to facilitate updating [[org.apache.samza.metrics.Timer]] metrics
+ */
+trait TimerUtil {
+  val clock: () => Long
+
+  /**
+   * A helper method to update the [[org.apache.samza.metrics.Timer]] metric.
+   * It accepts a [[org.apache.samza.metrics.Timer]] instance and a code block.
+   * It updates the Timer instance with the duration of running code block.
+   */
+  def updateTimer[T](timer: Timer)(runCodeBlock: => T): T = {
+    val startingTime = clock()
+    val returnValue = runCodeBlock
+    timer.update(clock() - startingTime)
+    returnValue
+  }
+
+  /**
+   * A helper method to update the [[org.apache.samza.metrics.Timer]] metrics.
+   * It accepts a [[org.apache.samza.metrics.Timer]] instance and a code block
+   * with no return value. It passes one Long parameter to code block that contains
+   * current time in nanoseconds. It updates the Timer instance with the duration of
+   * running code block and returns the same duration.
+   */
+  def updateTimerAndGetDuration(timer: Timer)(runCodeBlock: Long => Unit): Long = {
+    val startingTime = clock()
+    runCodeBlock(startingTime)
+    val duration = clock() - startingTime
+    timer.update(duration)
+    duration
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/samza/blob/5d73ecda/samza-core/src/main/scala/org/apache/samza/util/TimerUtils.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/util/TimerUtils.scala b/samza-core/src/main/scala/org/apache/samza/util/TimerUtils.scala
deleted file mode 100644
index 63935a7..0000000
--- a/samza-core/src/main/scala/org/apache/samza/util/TimerUtils.scala
+++ /dev/null
@@ -1,56 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.samza.util
-
-import org.apache.samza.metrics.Timer
-
-/**
- * a helper class to facilitate update {@link org.apache.samza.metrics.Timer} metric
- */
-trait TimerUtils {
-  val clock: () => Long
-
-  /**
-   * A helper method to update the {@link org.apache.samza.metrics.Timer} metric.
-   * It accepts a {@link org.apache.samza.metrics.Timer} instance and a code block.
-   * It updates the Timer instance with the duration of running code block.
-   */
-  def updateTimer[T](timer: Timer)(runCodeBlock: => T): T = {
-    val startingTime = clock()
-    val returnValue = runCodeBlock
-    timer.update(clock() - startingTime)
-    returnValue
-  }
-
-  /**
-   * A helper method to update the {@link org.apache.samza.metrics.Timer} metrics.
-   * It accepts a {@link org.apache.samza.metrics.Timer} instance and a code block
-   * with no return value. It passes one Long parameter to code block that contains
-   * current time in nanoseconds. It updates the Timer instance with the duration of
-   * running code block and returns the same duration.
-   */
-  def updateTimerAndGetDuration(timer: Timer)(runCodeBlock: Long => Unit): Long = {
-    val startingTime = clock()
-    runCodeBlock(startingTime)
-    val duration = clock() - startingTime
-    timer.update(duration)
-    duration
-  }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/samza/blob/5d73ecda/samza-core/src/main/scala/org/apache/samza/util/Util.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/util/Util.scala b/samza-core/src/main/scala/org/apache/samza/util/Util.scala
index e12c81a..059eb03 100644
--- a/samza-core/src/main/scala/org/apache/samza/util/Util.scala
+++ b/samza-core/src/main/scala/org/apache/samza/util/Util.scala
@@ -19,28 +19,25 @@
 
 package org.apache.samza.util
 
-import java.io._
-import java.lang.management.ManagementFactory
-import java.net._
-import java.util.Random
-import java.util.zip.CRC32
 
 import org.apache.samza.config.JobConfig.Config2Job
-import org.apache.samza.config.SystemConfig.Config2System
 import org.apache.samza.config._
-import org.apache.samza.serializers._
-import org.apache.samza.system.{SystemFactory, SystemStream, SystemStreamPartition}
-import org.apache.samza.{Partition, SamzaException}
+import org.apache.samza.system.SystemStream
+import org.apache.samza.SamzaException
+
+import java.lang.management.ManagementFactory
+import java.net.Inet4Address
+import java.net.InetAddress
+import java.net.NetworkInterface
+import java.util.Random
 
 import scala.collection.JavaConverters._
-import scala.collection.immutable.Map
 
 
 object Util extends Logging {
   val Random = new Random
   val ThreadMxBean = ManagementFactory.getThreadMXBean
 
-  def clock: Long = System.currentTimeMillis
   /**
    * Make an environment variable string safe to pass.
    */
@@ -53,28 +50,9 @@ object Util extends Logging {
     startInclusive + Random.nextInt(endExclusive - startInclusive)
 
   /**
-   * Recursively remove a directory (or file), and all sub-directories. Equivalent
-   * to rm -rf.
+   * Instantiate an object of type T from a given className.
    */
-  def rm(file: File) {
-    if (file == null) {
-      return
-    } else if (file.isDirectory) {
-      val files = file.listFiles()
-      if (files != null) {
-        for (f <- files)
-          rm(f)
-      }
-      file.delete()
-    } else {
-      file.delete()
-    }
-  }
-
-  /**
-   * Instantiate a class instance from a given className.
-   */
-  def getObj[T](className: String) = {
+  def getObj[T](className: String, clazz: Class[T]) = {
     try {
       Class
         .forName(className)
@@ -82,7 +60,7 @@ object Util extends Logging {
         .asInstanceOf[T]
     } catch {
       case e: Throwable => {
-        error("Unable to instantiate a class instance for %s." format className, e)
+        error("Unable to create an instance for class %s." format className, e)
         throw e
       }
     }
@@ -109,244 +87,22 @@ object Util extends Logging {
   }
 
   /**
-   * Makes sure that an object is not null, and throws a NullPointerException
-   * if it is.
-   */
-  def notNull[T](obj: T, msg: String) = if (obj == null) {
-    throw new NullPointerException(msg)
-  }
-
-  /**
-   * Returns the name representing the JVM. It usually contains the PID of the process plus some additional information
-   * @return String that contains the name representing this JVM
-   */
-  def getContainerPID(): String = {
-    ManagementFactory.getRuntimeMXBean().getName()
-  }
-
-  /**
-   * Overriding read method defined below so that it can be accessed from Java classes with default values
-   */
-  def read(url: URL, timeout: Int): String = {
-    read(url, timeout, new ExponentialSleepStrategy)
-  }
-
-  /**
-   * Reads a URL and returns its body as a string. Does no error handling.
-   *
-   * @param url HTTP URL to read from.
-   * @param timeout How long to wait before timing out when connecting to or reading from the HTTP server.
-   * @param retryBackoff Instance of exponentialSleepStrategy that encapsulates info on how long to sleep and retry operation
-   * @return String payload of the body of the HTTP response.
-   */
-  def read(url: URL, timeout: Int = 60000, retryBackoff: ExponentialSleepStrategy = new ExponentialSleepStrategy): String = {
-    var httpConn = getHttpConnection(url, timeout)
-    retryBackoff.run(loop => {
-      if(httpConn.getResponseCode != 200)
-      {
-        warn("Error: " + httpConn.getResponseCode)
-        val errorContent = readStream(httpConn.getErrorStream)
-        warn("Error reading stream, failed with response %s" format errorContent)
-        httpConn = getHttpConnection(url, timeout)
-      }
-      else
-      {
-        loop.done
-      }
-    },
-    (exception, loop) => {
-      exception match {
-        case ioe: IOException => {
-          warn("Error getting response from Job coordinator server. received IOException: %s. Retrying..." format ioe.getClass)
-          httpConn = getHttpConnection(url, timeout)
-        }
-        case e: Exception =>
-          loop.done
-          error("Unable to connect to Job coordinator server, received exception", e)
-          throw e
-      }
-    })
-
-    if(httpConn.getResponseCode != 200) {
-      throw new SamzaException("Unable to read JobModel from Jobcoordinator HTTP server")
-    }
-    readStream(httpConn.getInputStream)
-  }
-
-  def getHttpConnection(url: URL, timeout: Int): HttpURLConnection = {
-    val conn = url.openConnection()
-    conn.setConnectTimeout(timeout)
-    conn.setReadTimeout(timeout)
-    conn.asInstanceOf[HttpURLConnection]
-  }
-  private def readStream(stream: InputStream): String = {
-    val br = new BufferedReader(new InputStreamReader(stream));
-    var line: String = null;
-    val body = Iterator.continually(br.readLine()).takeWhile(_ != null).mkString
-    br.close
-    stream.close
-    body
-  }
-
-  /**
-   * Generates a coordinator stream name based on the job name and job id
-   * for the job. The format of the stream name will be:
-   * &#95;&#95;samza_coordinator_&lt;JOBNAME&gt;_&lt;JOBID&gt;.
-   */
-  def getCoordinatorStreamName(jobName: String, jobId: String) = {
-    "__samza_coordinator_%s_%s" format (jobName.replaceAll("_", "-"), jobId.replaceAll("_", "-"))
-  }
-
-  /**
-   * Get a job's name and ID given a config. Job ID is defaulted to 1 if not
-   * defined in the config, and job name must be defined in config.
-   *
-   * @return A tuple of (jobName, jobId)
-   */
-  def getJobNameAndId(config: Config) = {
-    (config.getName.getOrElse(throw new ConfigException("Missing required config: job.name")), config.getJobId.getOrElse("1"))
-  }
-
-  /**
-   * Given a job's full config object, build a subset config which includes
-   * only the job name, job id, and system config for the coordinator stream.
-   */
-  def buildCoordinatorStreamConfig(config: Config) = {
-    val (jobName, jobId) = getJobNameAndId(config)
-    // Build a map with just the system config and job.name/job.id. This is what's required to start the JobCoordinator.
-    val map = config.subset(SystemConfig.SYSTEM_PREFIX format config.getCoordinatorSystemName, false).asScala ++
-      Map[String, String](
-        JobConfig.JOB_NAME -> jobName,
-        JobConfig.JOB_ID -> jobId,
-        JobConfig.JOB_COORDINATOR_SYSTEM -> config.getCoordinatorSystemName,
-        JobConfig.MONITOR_PARTITION_CHANGE_FREQUENCY_MS -> String.valueOf(config.getMonitorPartitionChangeFrequency))
-    new MapConfig(map.asJava)
-  }
-
-  /**
-   * Get the coordinator system stream from the configuration
-   * @param config
-   * @return
-   */
-  def getCoordinatorSystemStream(config: Config) = {
-    val systemName = config.getCoordinatorSystemName
-    val (jobName, jobId) = Util.getJobNameAndId(config)
-    val streamName = Util.getCoordinatorStreamName(jobName, jobId)
-    new SystemStream(systemName, streamName)
-  }
-
-  /**
-    * Get the coordinator system factory from the configuration
-    * @param config
-    * @return
-    */
-  def getCoordinatorSystemFactory(config: Config) = {
-    val systemName = config.getCoordinatorSystemName
-    val systemFactoryClassName = config
-      .getSystemFactory(systemName)
-      .getOrElse(throw new SamzaException("Missing configuration: " + SystemConfig.SYSTEM_FACTORY format systemName))
-    Util.getObj[SystemFactory](systemFactoryClassName)
-  }
-
-  /**
-   * The helper function converts a SSP to a string
-   * @param ssp System stream partition
-   * @return The string representation of the SSP
-   */
-  def sspToString(ssp: SystemStreamPartition): String = {
-     ssp.getSystem() + "." + ssp.getStream() + "." + String.valueOf(ssp.getPartition().getPartitionId())
-  }
-
-  /**
-   * The method converts the string SSP back to a SSP
-   * @param ssp The string form of the SSP
-   * @return An SSP typed object
-   */
-  def stringToSsp(ssp: String): SystemStreamPartition = {
-     val idx = ssp.indexOf('.');
-     val lastIdx = ssp.lastIndexOf('.')
-     if (idx < 0 || lastIdx < 0) {
-       throw new IllegalArgumentException("System stream partition expected in format 'system.stream.partition")
-     }
-     new SystemStreamPartition(new SystemStream(ssp.substring(0, idx), ssp.substring(idx + 1, lastIdx)),
-                               new Partition(Integer.parseInt(ssp.substring(lastIdx + 1))))
-  }
-
-  /**
-   * Method to generate the CRC32 checksum code for any given data
-   * @param data The string for which checksum has to be generated
-   * @return long type value representing the checksum
-   * */
-  def getChecksumValue(data: String) = {
-    val crc = new CRC32
-    crc.update(data.getBytes)
-    crc.getValue
-  }
-
-  /**
-   * Method that always writes checksum & data to a file
-   * Checksum is pre-fixed to the data and is a 32-bit long type data.
-   * @param file The file handle to write to
-   * @param data The data to be written to the file
-   * */
-  def writeDataToFile(file: File, data: String) = {
-    val checksum = getChecksumValue(data)
-    var oos: ObjectOutputStream = null
-    var fos: FileOutputStream = null
-    try {
-      fos = new FileOutputStream(file)
-      oos = new ObjectOutputStream(fos)
-      oos.writeLong(checksum)
-      oos.writeUTF(data)
-    } finally {
-      oos.close()
-      fos.close()
-    }
-  }
-
-  /**
-   * Method to read from a file that has a checksum prepended to the data
-   * @param file The file handle to read from
-   * */
-  def readDataFromFile(file: File) = {
-    var fis: FileInputStream = null
-    var ois: ObjectInputStream = null
-    try {
-      fis = new FileInputStream(file)
-      ois = new ObjectInputStream(fis)
-      val checksumFromFile = ois.readLong()
-      val data = ois.readUTF()
-      if(checksumFromFile == getChecksumValue(data)) {
-        data
-      } else {
-        info("Checksum match failed. Data in file is corrupted. Skipping content.")
-        null
-      }
-    } finally {
-      ois.close()
-      fis.close()
-    }
-  }
-
-  /**
-   * Convert a java map to a Scala map
-   * */
-  def javaMapAsScalaMap[T, K](javaMap: java.util.Map[T, K]): Map[T, K] = {
-    javaMap.asScala.toMap
-  }
-
-  /**
-   * Returns the the first host address which is not the loopback address, or {@link java.net.InetAddress#getLocalHost InetAddress.getLocalhost()} as a fallback
+   * Returns the the first host address which is not the loopback address, or [[java.net.InetAddress#getLocalHost]] as a fallback
    *
-   * @return the {@link java.net.InetAddress InetAddress} which represents the localhost
+   * @return the [[java.net.InetAddress]] which represents the localhost
    */
   def getLocalHost: InetAddress = {
     val localHost = InetAddress.getLocalHost
     if (localHost.isLoopbackAddress) {
       debug("Hostname %s resolves to a loopback address, trying to resolve an external IP address.".format(localHost.getHostName))
-      val networkInterfaces = if (System.getProperty("os.name").startsWith("Windows")) NetworkInterface.getNetworkInterfaces.asScala.toList else NetworkInterface.getNetworkInterfaces.asScala.toList.reverse
+      val networkInterfaces = if (System.getProperty("os.name").startsWith("Windows")) {
+        NetworkInterface.getNetworkInterfaces.asScala.toList
+      } else {
+        NetworkInterface.getNetworkInterfaces.asScala.toList.reverse
+      }
       for (networkInterface <- networkInterfaces) {
-        val addresses = networkInterface.getInetAddresses.asScala.toList.filterNot(address => address.isLinkLocalAddress || address.isLoopbackAddress)
+        val addresses = networkInterface.getInetAddresses.asScala.toList
+          .filterNot(address => address.isLinkLocalAddress || address.isLoopbackAddress)
         if (addresses.nonEmpty) {
           val address = addresses.find(_.isInstanceOf[Inet4Address]).getOrElse(addresses.head)
           debug("Found an external IP address %s which represents the localhost.".format(address.getHostAddress))
@@ -358,58 +114,6 @@ object Util extends Logging {
   }
 
   /**
-   * A helper function which returns system's default serde factory class according to the
-   * serde name. If not found, throw exception.
-   */
-  def defaultSerdeFactoryFromSerdeName(serdeName: String) = {
-    info("looking for default serdes")
-
-    val serde = serdeName match {
-      case "byte" => classOf[ByteSerdeFactory].getCanonicalName
-      case "bytebuffer" => classOf[ByteBufferSerdeFactory].getCanonicalName
-      case "integer" => classOf[IntegerSerdeFactory].getCanonicalName
-      case "json" => classOf[JsonSerdeFactory].getCanonicalName
-      case "long" => classOf[LongSerdeFactory].getCanonicalName
-      case "serializable" => classOf[SerializableSerdeFactory[java.io.Serializable]].getCanonicalName
-      case "string" => classOf[StringSerdeFactory].getCanonicalName
-      case "double" => classOf[DoubleSerdeFactory].getCanonicalName
-      case _ => throw new SamzaException("defaultSerdeFactoryFromSerdeName: No class defined for serde %s" format serdeName)
-    }
-    info("use default serde %s for %s" format (serde, serdeName))
-    serde
-  }
-
-  /**
-   * Add the supplied arguments and handle overflow by clamping the resulting sum to
-   * {@code Long.MinValue} if the sum would have been less than {@code Long.MinValue} or
-   * {@code Long.MaxValue} if the sum would have been greater than {@code Long.MaxValue}.
-   *
-   * @param lhs left hand side of sum
-   * @param rhs right hand side of sum
-   * @return the sum if no overflow occurs, or the clamped extreme if it does.
-   */
-  def clampAdd(lhs: Long, rhs: Long): Long = {
-    val sum = lhs + rhs
-
-    // From "Hacker's Delight", overflow occurs IFF both operands have the same sign and the
-    // sign of the sum differs from the operands. Here we're doing a basic bitwise check that
-    // collapses 6 branches down to 2. The expression {@code lhs ^ rhs} will have the high-order
-    // bit set to true IFF the signs are different.
-    if ((~(lhs ^ rhs) & (lhs ^ sum)) < 0) {
-      return if (lhs >= 0) Long.MaxValue else Long.MinValue
-    }
-
-    sum
-  }
-
-  /**
-   * Implicitly convert the Java TimerClock to Scala clock function which returns long timestamp.
-   * @param c Java TimeClock
-   * @return Scala clock function
-   */
-  implicit def asScalaClock(c: HighResolutionClock): () => Long = () => c.nanoTime()
-
-  /**
    * Re-writes configuration using a ConfigRewriter, if one is defined. If
    * there is no ConfigRewriter defined for the job, then this method is a
    * no-op.
@@ -419,10 +123,10 @@ object Util extends Logging {
    */
   def rewriteConfig(config: Config): Config = {
     def rewrite(c: Config, rewriterName: String): Config = {
-      val klass = config
+      val rewriterClassName = config
               .getConfigRewriterClass(rewriterName)
               .getOrElse(throw new SamzaException("Unable to find class config for config rewriter %s." format rewriterName))
-      val rewriter = Util.getObj[ConfigRewriter](klass)
+      val rewriter = Util.getObj(rewriterClassName, classOf[ConfigRewriter])
       info("Re-writing config with " + rewriter)
       rewriter.rewrite(rewriterName, c)
     }

http://git-wip-us.apache.org/repos/asf/samza/blob/5d73ecda/samza-core/src/test/java/org/apache/samza/coordinator/stream/MockCoordinatorStreamSystemFactory.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/coordinator/stream/MockCoordinatorStreamSystemFactory.java b/samza-core/src/test/java/org/apache/samza/coordinator/stream/MockCoordinatorStreamSystemFactory.java
index 6413413..e537a91 100644
--- a/samza-core/src/test/java/org/apache/samza/coordinator/stream/MockCoordinatorStreamSystemFactory.java
+++ b/samza-core/src/test/java/org/apache/samza/coordinator/stream/MockCoordinatorStreamSystemFactory.java
@@ -26,8 +26,8 @@ import org.apache.samza.coordinator.stream.messages.CoordinatorStreamMessage;
 import org.apache.samza.metrics.MetricsRegistry;
 import org.apache.samza.serializers.JsonSerde;
 import org.apache.samza.system.*;
+import org.apache.samza.util.CoordinatorStreamUtil;
 import org.apache.samza.util.SinglePartitionWithoutOffsetsSystemAdmin;
-import org.apache.samza.util.Util;
 
 import java.io.IOException;
 import java.util.ArrayList;
@@ -88,7 +88,7 @@ public class MockCoordinatorStreamSystemFactory implements SystemFactory {
     if (jobId == null) {
       jobId = "1";
     }
-    String streamName = Util.getCoordinatorStreamName(jobName, jobId);
+    String streamName = CoordinatorStreamUtil.getCoordinatorStreamName(jobName, jobId);
     SystemStreamPartition systemStreamPartition = new SystemStreamPartition(systemName, streamName, new Partition(0));
     mockConsumer = new MockCoordinatorStreamWrappedConsumer(systemStreamPartition, config);
     return mockConsumer;
@@ -97,7 +97,7 @@ public class MockCoordinatorStreamSystemFactory implements SystemFactory {
   private SystemStream getCoordinatorSystemStream(Config config) {
     assertNotNull(config.get("job.coordinator.system"));
     assertNotNull(config.get("job.name"));
-    return new SystemStream(config.get("job.coordinator.system"), Util.getCoordinatorStreamName(config.get("job.name"),
+    return new SystemStream(config.get("job.coordinator.system"), CoordinatorStreamUtil.getCoordinatorStreamName(config.get("job.name"),
         config.get("job.id") == null ? "1" : config.get("job.id")));
   }
 

http://git-wip-us.apache.org/repos/asf/samza/blob/5d73ecda/samza-core/src/test/java/org/apache/samza/util/TestMathUtils.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/util/TestMathUtils.java b/samza-core/src/test/java/org/apache/samza/util/TestMathUtils.java
index 46e0735..2f95016 100644
--- a/samza-core/src/test/java/org/apache/samza/util/TestMathUtils.java
+++ b/samza-core/src/test/java/org/apache/samza/util/TestMathUtils.java
@@ -20,43 +20,58 @@
 package org.apache.samza.util;
 
 import com.google.common.collect.ImmutableList;
-import junit.framework.Assert;
-import org.apache.samza.operators.util.MathUtils;
-import org.junit.Test;
 
 import java.util.Collections;
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
 
 public class TestMathUtils {
 
   @Test(expected = IllegalArgumentException.class)
   public void testGcdWithNullInputs() {
-    MathUtils.gcd(null);
+    MathUtil.gcd(null);
   }
 
   @Test(expected = IllegalArgumentException.class)
   public void testGcdWithEmptyInputs() {
-    MathUtils.gcd(Collections.emptyList());
+    MathUtil.gcd(Collections.emptyList());
   }
 
   @Test
   public void testGcdWithValidInputs() {
     // gcd(x, x) = x
-    Assert.assertEquals(2, MathUtils.gcd(ImmutableList.of(2L, 2L)));
-    Assert.assertEquals(15, MathUtils.gcd(ImmutableList.of(15L)));
-    Assert.assertEquals(1, MathUtils.gcd(ImmutableList.of(1L)));
+    assertEquals(2, MathUtil.gcd(ImmutableList.of(2L, 2L)));
+    assertEquals(15, MathUtil.gcd(ImmutableList.of(15L)));
+    assertEquals(1, MathUtil.gcd(ImmutableList.of(1L)));
 
     // gcd(0,x) = x
-    Assert.assertEquals(2, MathUtils.gcd(ImmutableList.of(2L, 0L)));
+    assertEquals(2, MathUtil.gcd(ImmutableList.of(2L, 0L)));
 
     // gcd(1,x) = 1
-    Assert.assertEquals(1, MathUtils.gcd(ImmutableList.of(10L, 20L, 30L, 40L, 50L, 1L)));
+    assertEquals(1, MathUtil.gcd(ImmutableList.of(10L, 20L, 30L, 40L, 50L, 1L)));
 
     // other happy path test cases
-    Assert.assertEquals(10, MathUtils.gcd(ImmutableList.of(10L, 20L, 30L, 40L, 50L, 0L)));
-    Assert.assertEquals(10, MathUtils.gcd(ImmutableList.of(10L, 20L, 30L, 40L, 50L)));
-    Assert.assertEquals(5, MathUtils.gcd(ImmutableList.of(25L, 35L, 45L, 55L)));
+    assertEquals(10, MathUtil.gcd(ImmutableList.of(10L, 20L, 30L, 40L, 50L, 0L)));
+    assertEquals(10, MathUtil.gcd(ImmutableList.of(10L, 20L, 30L, 40L, 50L)));
+    assertEquals(5, MathUtil.gcd(ImmutableList.of(25L, 35L, 45L, 55L)));
 
-    Assert.assertEquals(1, MathUtils.gcd(ImmutableList.of(25L, 35L, 45L, 55L, 13L)));
+    assertEquals(1, MathUtil.gcd(ImmutableList.of(25L, 35L, 45L, 55L, 13L)));
   }
 
+  @Test
+  public void testClampAdd() {
+    assertEquals(0, MathUtil.clampAdd(0, 0));
+    assertEquals(2, MathUtil.clampAdd(1, 1));
+    assertEquals(-2, MathUtil.clampAdd(-1, -1));
+    assertEquals(Long.MAX_VALUE, MathUtil.clampAdd(Long.MAX_VALUE, 0));
+    assertEquals(Long.MAX_VALUE - 1, MathUtil.clampAdd(Long.MAX_VALUE, -1));
+    assertEquals(Long.MAX_VALUE, MathUtil.clampAdd(Long.MAX_VALUE, 1));
+    assertEquals(Long.MAX_VALUE, MathUtil.clampAdd(Long.MAX_VALUE, Long.MAX_VALUE));
+    assertEquals(Long.MIN_VALUE, MathUtil.clampAdd(Long.MIN_VALUE, 0));
+    assertEquals(Long.MIN_VALUE, MathUtil.clampAdd(Long.MIN_VALUE, -1));
+    assertEquals(Long.MIN_VALUE + 1, MathUtil.clampAdd(Long.MIN_VALUE, 1));
+    assertEquals(Long.MIN_VALUE, MathUtil.clampAdd(Long.MIN_VALUE, Long.MIN_VALUE));
+    assertEquals(-1, MathUtil.clampAdd(Long.MAX_VALUE, Long.MIN_VALUE));
+  }
 }

http://git-wip-us.apache.org/repos/asf/samza/blob/5d73ecda/samza-core/src/test/scala/org/apache/samza/config/TestSerializerConfig.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/test/scala/org/apache/samza/config/TestSerializerConfig.scala b/samza-core/src/test/scala/org/apache/samza/config/TestSerializerConfig.scala
new file mode 100644
index 0000000..83c901c
--- /dev/null
+++ b/samza-core/src/test/scala/org/apache/samza/config/TestSerializerConfig.scala
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.config
+
+import org.apache.samza.SamzaException
+import org.apache.samza.serializers.ByteSerdeFactory
+import org.apache.samza.serializers.DoubleSerdeFactory
+import org.apache.samza.serializers.IntegerSerdeFactory
+import org.apache.samza.serializers.JsonSerdeFactory
+import org.apache.samza.serializers.LongSerdeFactory
+import org.apache.samza.config.SerializerConfig.getSerdeFactoryName
+import org.apache.samza.serializers.SerializableSerdeFactory
+import org.apache.samza.serializers.StringSerdeFactory
+import org.junit.Assert.assertEquals
+import org.junit.Assert.assertTrue
+import org.junit.Test
+
+class TestSerializerConfig {
+  @Test
+  def testGetSerdeFactoryName {
+    val config = new MapConfig
+    assertEquals(classOf[ByteSerdeFactory].getName, getSerdeFactoryName("byte"))
+    assertEquals(classOf[IntegerSerdeFactory].getName, getSerdeFactoryName("integer"))
+    assertEquals(classOf[JsonSerdeFactory].getName, getSerdeFactoryName("json"))
+    assertEquals(classOf[LongSerdeFactory].getName, getSerdeFactoryName("long"))
+    assertEquals(classOf[SerializableSerdeFactory[java.io.Serializable@unchecked]].getName, getSerdeFactoryName("serializable"))
+    assertEquals(classOf[StringSerdeFactory].getName, getSerdeFactoryName("string"))
+    assertEquals(classOf[DoubleSerdeFactory].getName, getSerdeFactoryName("double"))
+
+    // throw SamzaException if can not find the correct serde
+    var throwSamzaException = false
+    try {
+      getSerdeFactoryName("otherName")
+    } catch {
+      case e: SamzaException => throwSamzaException = true
+      case _: Exception =>
+    }
+    assertTrue(throwSamzaException)
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/5d73ecda/samza-core/src/test/scala/org/apache/samza/coordinator/TestJobCoordinator.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/test/scala/org/apache/samza/coordinator/TestJobCoordinator.scala b/samza-core/src/test/scala/org/apache/samza/coordinator/TestJobCoordinator.scala
index cf05b3b..95a0a11 100644
--- a/samza-core/src/test/scala/org/apache/samza/coordinator/TestJobCoordinator.scala
+++ b/samza-core/src/test/scala/org/apache/samza/coordinator/TestJobCoordinator.scala
@@ -25,7 +25,7 @@ import org.apache.samza.checkpoint.TestCheckpointTool.MockCheckpointManagerFacto
 import org.apache.samza.job.local.ProcessJobFactory
 import org.apache.samza.job.local.ThreadJobFactory
 import org.apache.samza.serializers.model.SamzaObjectMapper
-import org.apache.samza.util.Util
+import org.apache.samza.util.{HttpUtil, Util}
 import org.junit.{After, Before, Test}
 import org.junit.Assert._
 
@@ -119,8 +119,9 @@ class TestJobCoordinator extends FlatSpec with PrivateMethodTester {
     assertEquals(new MapConfig(config.asJava), coordinator.jobModel.getConfig)
     assertEquals(expectedJobModel, coordinator.jobModel)
 
+    val response = HttpUtil.read(coordinator.server.getUrl)
     // Verify that the JobServlet is serving the correct jobModel
-    val jobModelFromCoordinatorUrl = SamzaObjectMapper.getObjectMapper.readValue(Util.read(coordinator.server.getUrl), classOf[JobModel])
+    val jobModelFromCoordinatorUrl = SamzaObjectMapper.getObjectMapper.readValue(response, classOf[JobModel])
     assertEquals(expectedJobModel, jobModelFromCoordinatorUrl)
 
     coordinator.stop
@@ -245,7 +246,7 @@ class TestJobCoordinator extends FlatSpec with PrivateMethodTester {
       val systemFactoryClassName = config
         .getSystemFactory(systemName)
         .getOrElse(throw new SamzaException("A stream uses system %s, which is missing from the configuration." format systemName))
-      val systemFactory = Util.getObj[SystemFactory](systemFactoryClassName)
+      val systemFactory = Util.getObj(systemFactoryClassName, classOf[SystemFactory])
       systemName -> systemFactory.getAdmin(systemName, config)
     }).toMap
 

http://git-wip-us.apache.org/repos/asf/samza/blob/5d73ecda/samza-core/src/test/scala/org/apache/samza/coordinator/server/TestHttpServer.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/test/scala/org/apache/samza/coordinator/server/TestHttpServer.scala b/samza-core/src/test/scala/org/apache/samza/coordinator/server/TestHttpServer.scala
index f1dcc3d..6ca4070 100644
--- a/samza-core/src/test/scala/org/apache/samza/coordinator/server/TestHttpServer.scala
+++ b/samza-core/src/test/scala/org/apache/samza/coordinator/server/TestHttpServer.scala
@@ -19,11 +19,10 @@
 
 package org.apache.samza.coordinator.server
 
-import org.apache.samza.util.Util
+import org.apache.samza.util.{HttpUtil, Util}
 import org.junit.Assert._
 import org.junit.Test
 import java.net.URL
-import org.eclipse.jetty.server.Connector
 
 class TestHttpServer {
   @Test
@@ -32,9 +31,9 @@ class TestHttpServer {
     try {
       server.addServlet("/basic", new BasicServlet())
       server.start
-      val body = Util.read(new URL(server.getUrl + "/basic"))
+      val body = HttpUtil.read(new URL(server.getUrl + "/basic"))
       assertEquals("{\"foo\":\"bar\"}", body)
-      val css = Util.read(new URL(server.getUrl + "/css/ropa-sans.css"))
+      val css = HttpUtil.read(new URL(server.getUrl + "/css/ropa-sans.css"))
       assertTrue(css.contains("RopaSans"))
     } finally {
       server.stop

http://git-wip-us.apache.org/repos/asf/samza/blob/5d73ecda/samza-core/src/test/scala/org/apache/samza/serializers/TestSerdeManager.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/test/scala/org/apache/samza/serializers/TestSerdeManager.scala b/samza-core/src/test/scala/org/apache/samza/serializers/TestSerdeManager.scala
index 774230c..cd6c5be 100644
--- a/samza-core/src/test/scala/org/apache/samza/serializers/TestSerdeManager.scala
+++ b/samza-core/src/test/scala/org/apache/samza/serializers/TestSerdeManager.scala
@@ -31,6 +31,9 @@ import org.apache.samza.system.SystemStream
 import org.apache.samza.system.IncomingMessageEnvelope
 import org.apache.samza.system.SystemStreamPartition
 import org.apache.samza.Partition
+import org.apache.samza.SamzaException
+import org.apache.samza.config.MapConfig
+import org.apache.samza.util.Util
 
 class TestSerdeManager {
   @Test

http://git-wip-us.apache.org/repos/asf/samza/blob/5d73ecda/samza-core/src/test/scala/org/apache/samza/storage/TestTaskStorageManager.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/test/scala/org/apache/samza/storage/TestTaskStorageManager.scala b/samza-core/src/test/scala/org/apache/samza/storage/TestTaskStorageManager.scala
index 90a4c01..bbdb819 100644
--- a/samza-core/src/test/scala/org/apache/samza/storage/TestTaskStorageManager.scala
+++ b/samza-core/src/test/scala/org/apache/samza/storage/TestTaskStorageManager.scala
@@ -29,7 +29,7 @@ import org.apache.samza.container.TaskName
 import org.apache.samza.storage.StoreProperties.StorePropertiesBuilder
 import org.apache.samza.system.SystemStreamMetadata.SystemStreamPartitionMetadata
 import org.apache.samza.system._
-import org.apache.samza.util.{SystemClock, Util}
+import org.apache.samza.util.{FileUtil, SystemClock}
 import org.junit.Assert._
 import org.junit.{After, Before, Test}
 import org.mockito.Matchers._
@@ -56,8 +56,8 @@ class TestTaskStorageManager extends MockitoSugar {
 
   @After
   def tearDownTestDirs() {
-    Util.rm(TaskStorageManagerBuilder.defaultStoreBaseDir)
-    Util.rm(TaskStorageManagerBuilder.defaultLoggedStoreBaseDir)
+    FileUtil.rm(TaskStorageManagerBuilder.defaultStoreBaseDir)
+    FileUtil.rm(TaskStorageManagerBuilder.defaultLoggedStoreBaseDir)
   }
 
   /**
@@ -125,7 +125,7 @@ class TestTaskStorageManager extends MockitoSugar {
     // Test 2: flush should update the offset file
     taskManager.flush()
     assertTrue(offsetFile.exists())
-    assertEquals("50", Util.readDataFromFile(offsetFile))
+    assertEquals("50", FileUtil.readWithChecksum(offsetFile))
 
     // Test 3: Update sspMetadata before shutdown and verify that offset file is updated correctly
     metadata = new SystemStreamMetadata("testStream", new java.util.HashMap[Partition, SystemStreamPartitionMetadata]() {
@@ -142,7 +142,7 @@ class TestTaskStorageManager extends MockitoSugar {
     taskManager.stop()
     assertTrue(storeFile.exists())
     assertTrue(offsetFile.exists())
-    assertEquals("100", Util.readDataFromFile(offsetFile))
+    assertEquals("100", FileUtil.readWithChecksum(offsetFile))
 
 
     // Test 4: Initialize again with an updated sspMetadata; Verify that it restores from the correct offset
@@ -274,7 +274,7 @@ class TestTaskStorageManager extends MockitoSugar {
   @Test
   def testLoggedStoreDirsWithOffsetFileAreNotDeletedInCleanBaseDirs() {
     val offsetFilePath = new File(TaskStorageManager.getStorePartitionDir(TaskStorageManagerBuilder.defaultLoggedStoreBaseDir, loggedStore, taskName), "OFFSET")
-    Util.writeDataToFile(offsetFilePath, "100")
+    FileUtil.writeWithChecksum(offsetFilePath, "100")
 
     val taskStorageManager = new TaskStorageManagerBuilder()
       .addStore(loggedStore, true)
@@ -296,7 +296,7 @@ class TestTaskStorageManager extends MockitoSugar {
     val storeDirectory = TaskStorageManager.getStorePartitionDir(TaskStorageManagerBuilder.defaultLoggedStoreBaseDir, loggedStore, taskName)
     val offsetFile = new File(storeDirectory, "OFFSET")
     offsetFile.createNewFile()
-    Util.writeDataToFile(offsetFile, "Test Offset Data")
+    FileUtil.writeWithChecksum(offsetFile, "Test Offset Data")
     offsetFile.setLastModified(0)
     val taskStorageManager = new TaskStorageManagerBuilder().addStore(store, false)
       .addStore(loggedStore, true)
@@ -315,7 +315,7 @@ class TestTaskStorageManager extends MockitoSugar {
   @Test
   def testOffsetFileIsRemovedInCleanBaseDirsForInMemoryLoggedStore() {
     val offsetFilePath = new File(TaskStorageManager.getStorePartitionDir(TaskStorageManagerBuilder.defaultLoggedStoreBaseDir, loggedStore, taskName), "OFFSET")
-    Util.writeDataToFile(offsetFilePath, "100")
+    FileUtil.writeWithChecksum(offsetFilePath, "100")
 
     val taskStorageManager = new TaskStorageManagerBuilder()
       .addStore(loggedStore, false)
@@ -352,7 +352,7 @@ class TestTaskStorageManager extends MockitoSugar {
 
     //Check conditions
     assertTrue("Offset file doesn't exist!", offsetFilePath.exists())
-    assertEquals("Found incorrect value in offset file!", "100", Util.readDataFromFile(offsetFilePath))
+    assertEquals("Found incorrect value in offset file!", "100", FileUtil.readWithChecksum(offsetFilePath))
   }
 
   /**
@@ -386,7 +386,7 @@ class TestTaskStorageManager extends MockitoSugar {
 
     //Check conditions
     assertTrue("Offset file doesn't exist!", offsetFilePath.exists())
-    assertEquals("Found incorrect value in offset file!", "100", Util.readDataFromFile(offsetFilePath))
+    assertEquals("Found incorrect value in offset file!", "100", FileUtil.readWithChecksum(offsetFilePath))
 
     assertTrue("Offset file got created for a store that is not persisted to the disk!!", !anotherOffsetPath.exists())
   }
@@ -416,7 +416,7 @@ class TestTaskStorageManager extends MockitoSugar {
 
     //Check conditions
     assertTrue("Offset file doesn't exist!", offsetFilePath.exists())
-    assertEquals("Found incorrect value in offset file!", "100", Util.readDataFromFile(offsetFilePath))
+    assertEquals("Found incorrect value in offset file!", "100", FileUtil.readWithChecksum(offsetFilePath))
 
     //Invoke test method again
     taskStorageManager.flush()
@@ -430,7 +430,7 @@ class TestTaskStorageManager extends MockitoSugar {
     val partition = new Partition(0)
 
     val offsetFilePath = new File(TaskStorageManager.getStorePartitionDir(TaskStorageManagerBuilder.defaultLoggedStoreBaseDir, loggedStore, taskName) + File.separator + "OFFSET")
-    Util.writeDataToFile(offsetFilePath, "100")
+    FileUtil.writeWithChecksum(offsetFilePath, "100")
 
     val mockSystemAdmin = mock[SystemAdmin]
     var mockSspMetadata = Map("testStream" -> new SystemStreamMetadata("testStream" , Map(partition -> new SystemStreamPartitionMetadata("20", "139", "140")).asJava))
@@ -449,7 +449,7 @@ class TestTaskStorageManager extends MockitoSugar {
 
     //Check conditions
     assertTrue("Offset file doesn't exist!", offsetFilePath.exists())
-    assertEquals("Found incorrect value in offset file!", "139", Util.readDataFromFile(offsetFilePath))
+    assertEquals("Found incorrect value in offset file!", "139", FileUtil.readWithChecksum(offsetFilePath))
 
     // Flush again
     mockSspMetadata = Map("testStream" -> new SystemStreamMetadata("testStream" , Map(partition -> new SystemStreamPartitionMetadata("20", "193", "194")).asJava))
@@ -461,7 +461,7 @@ class TestTaskStorageManager extends MockitoSugar {
 
     //Check conditions
     assertTrue("Offset file doesn't exist!", offsetFilePath.exists())
-    assertEquals("Found incorrect value in offset file!", "193", Util.readDataFromFile(offsetFilePath))
+    assertEquals("Found incorrect value in offset file!", "193", FileUtil.readWithChecksum(offsetFilePath))
   }
 
   @Test
@@ -556,7 +556,7 @@ class TestTaskStorageManager extends MockitoSugar {
     if (writeOffsetFile) {
       val offsetFile = new File(storeDirectory, "OFFSET")
       if (fileOffset != null) {
-        Util.writeDataToFile(offsetFile, fileOffset)
+        FileUtil.writeWithChecksum(offsetFile, fileOffset)
       } else {
         // Write garbage to produce a null result when it's read
         val fos = new FileOutputStream(offsetFile)

http://git-wip-us.apache.org/repos/asf/samza/blob/5d73ecda/samza-core/src/test/scala/org/apache/samza/util/TestFileUtil.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/test/scala/org/apache/samza/util/TestFileUtil.scala b/samza-core/src/test/scala/org/apache/samza/util/TestFileUtil.scala
new file mode 100644
index 0000000..5bb6da7
--- /dev/null
+++ b/samza-core/src/test/scala/org/apache/samza/util/TestFileUtil.scala
@@ -0,0 +1,84 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.samza.util
+
+import java.io.{File, FileInputStream, FileOutputStream, ObjectInputStream, ObjectOutputStream}
+
+import org.junit.Assert.{assertEquals, assertNull, assertTrue}
+import org.junit.Test
+
+class TestFileUtil {
+  val data = "100"
+  val checksum = FileUtil.getChecksum(data)
+  val file = new File(System.getProperty("java.io.tmpdir"), "test")
+
+  @Test
+  def testWriteDataToFile() {
+    // Invoke test
+    FileUtil.writeWithChecksum(file, data)
+
+    // Check that file exists
+    assertTrue("File was not created!", file.exists())
+    val fis = new FileInputStream(file)
+    val ois = new ObjectInputStream(fis)
+
+    // Check content of the file is as expected
+    assertEquals(checksum, ois.readLong())
+    assertEquals(data, ois.readUTF())
+    ois.close()
+    fis.close()
+  }
+
+  @Test
+  def testReadDataFromFile() {
+    // Setup
+    val fos = new FileOutputStream(file)
+    val oos = new ObjectOutputStream(fos)
+    oos.writeLong(checksum)
+    oos.writeUTF(data)
+    oos.close()
+    fos.close()
+
+    // Invoke test
+    val result = FileUtil.readWithChecksum(file)
+
+    // Check data returned
+    assertEquals(data, result)
+  }
+
+  @Test
+  def testReadInvalidDataFromFile() {
+    // Write garbage to produce a null result when it's read
+    val fos = new FileOutputStream(file)
+    val oos = new ObjectOutputStream(fos)
+    oos.writeLong(1)
+    oos.writeUTF("Junk Data")
+    oos.close()
+    fos.close()
+
+    // Invoke test
+    val result = FileUtil.readWithChecksum(file)
+
+    // Check data returned
+    assertNull(result)
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/5d73ecda/samza-core/src/test/scala/org/apache/samza/util/TestUtil.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/test/scala/org/apache/samza/util/TestUtil.scala b/samza-core/src/test/scala/org/apache/samza/util/TestUtil.scala
index fae735b..f0b8a17 100644
--- a/samza-core/src/test/scala/org/apache/samza/util/TestUtil.scala
+++ b/samza-core/src/test/scala/org/apache/samza/util/TestUtil.scala
@@ -19,7 +19,6 @@
 
 package org.apache.samza.util
 
-import java.io._
 import org.junit.Assert._
 import org.junit.Test
 import org.apache.samza.config.MapConfig
@@ -27,116 +26,21 @@ import org.apache.samza.serializers._
 import org.apache.samza.SamzaException
 
 class TestUtil {
-
-  val data = "100"
-  val checksum = Util.getChecksumValue(data)
-  val file = new File(System.getProperty("java.io.tmpdir"), "test")
-
-  @Test
-  def testWriteDataToFile() {
-    // Invoke test
-    Util.writeDataToFile(file, data)
-
-    // Check that file exists
-    assertTrue("File was not created!", file.exists())
-    val fis = new FileInputStream(file)
-    val ois = new ObjectInputStream(fis)
-
-    // Check content of the file is as expected
-    assertEquals(checksum, ois.readLong())
-    assertEquals(data, ois.readUTF())
-    ois.close()
-    fis.close()
-  }
-
-  @Test
-  def testReadDataFromFile() {
-    // Setup
-    val fos = new FileOutputStream(file)
-    val oos = new ObjectOutputStream(fos)
-    oos.writeLong(checksum)
-    oos.writeUTF(data)
-    oos.close()
-    fos.close()
-
-    // Invoke test
-    val result = Util.readDataFromFile(file)
-
-    // Check data returned
-    assertEquals(data, result)
-  }
-
-  @Test
-  def testReadInvalidDataFromFile() {
-    // Write garbage to produce a null result when it's read
-    val fos = new FileOutputStream(file)
-    val oos = new ObjectOutputStream(fos)
-    oos.writeLong(1)
-    oos.writeUTF("Junk Data")
-    oos.close()
-    fos.close()
-
-    // Invoke test
-    val result = Util.readDataFromFile(file)
-
-    // Check data returned
-    assertNull(result)
-  }
-
   @Test
   def testGetLocalHost(): Unit = {
     assertNotNull(Util.getLocalHost)
   }
 
   @Test
-  def testDefaultSerdeFactoryFromSerdeName {
-    import Util._
-    val config = new MapConfig
-    assertEquals(classOf[ByteSerdeFactory].getName, defaultSerdeFactoryFromSerdeName("byte"))
-    assertEquals(classOf[IntegerSerdeFactory].getName, defaultSerdeFactoryFromSerdeName("integer"))
-    assertEquals(classOf[JsonSerdeFactory].getName, defaultSerdeFactoryFromSerdeName("json"))
-    assertEquals(classOf[LongSerdeFactory].getName, defaultSerdeFactoryFromSerdeName("long"))
-    assertEquals(classOf[SerializableSerdeFactory[java.io.Serializable@unchecked]].getName, defaultSerdeFactoryFromSerdeName("serializable"))
-    assertEquals(classOf[StringSerdeFactory].getName, defaultSerdeFactoryFromSerdeName("string"))
-    assertEquals(classOf[DoubleSerdeFactory].getName, defaultSerdeFactoryFromSerdeName("double"))
-
-    // throw SamzaException if can not find the correct serde
-    var throwSamzaException = false
-    try {
-      defaultSerdeFactoryFromSerdeName("otherName")
-    } catch {
-      case e: SamzaException => throwSamzaException = true
-      case _: Exception =>
-    }
-    assertTrue(throwSamzaException)
-  }
-
-  @Test
-  def testClampAdd() {
-    assertEquals(0, Util.clampAdd(0, 0))
-    assertEquals(2, Util.clampAdd(1, 1))
-    assertEquals(-2, Util.clampAdd(-1, -1))
-    assertEquals(Long.MaxValue, Util.clampAdd(Long.MaxValue, 0))
-    assertEquals(Long.MaxValue - 1, Util.clampAdd(Long.MaxValue, -1))
-    assertEquals(Long.MaxValue, Util.clampAdd(Long.MaxValue, 1))
-    assertEquals(Long.MaxValue, Util.clampAdd(Long.MaxValue, Long.MaxValue))
-    assertEquals(Long.MinValue, Util.clampAdd(Long.MinValue, 0))
-    assertEquals(Long.MinValue, Util.clampAdd(Long.MinValue, -1))
-    assertEquals(Long.MinValue + 1, Util.clampAdd(Long.MinValue, 1))
-    assertEquals(Long.MinValue, Util.clampAdd(Long.MinValue, Long.MinValue))
-    assertEquals(-1, Util.clampAdd(Long.MaxValue, Long.MinValue))
-  }
-
-  @Test
   def testGetObjExistingClass() {
-    val obj = Util.getObj[MapConfig]("org.apache.samza.config.MapConfig")
+    val obj = Util.getObj("org.apache.samza.config.MapConfig", classOf[MapConfig])
     assertNotNull(obj)
     assertEquals(classOf[MapConfig], obj.getClass())
   }
 
   @Test(expected = classOf[ClassNotFoundException])
   def testGetObjNonexistentClass() {
-    Util.getObj("this.class.does.NotExist")
+    Util.getObj("this.class.does.NotExist", classOf[Object])
     assert(false, "This should not get hit.")
   }
 }

http://git-wip-us.apache.org/repos/asf/samza/blob/5d73ecda/samza-elasticsearch/src/main/java/org/apache/samza/system/elasticsearch/ElasticsearchSystemFactory.java
----------------------------------------------------------------------
diff --git a/samza-elasticsearch/src/main/java/org/apache/samza/system/elasticsearch/ElasticsearchSystemFactory.java b/samza-elasticsearch/src/main/java/org/apache/samza/system/elasticsearch/ElasticsearchSystemFactory.java
index 074323f..5c8328c 100644
--- a/samza-elasticsearch/src/main/java/org/apache/samza/system/elasticsearch/ElasticsearchSystemFactory.java
+++ b/samza-elasticsearch/src/main/java/org/apache/samza/system/elasticsearch/ElasticsearchSystemFactory.java
@@ -82,7 +82,7 @@ public class ElasticsearchSystemFactory implements SystemFactory {
 
   protected static IndexRequestFactory getIndexRequestFactory(ElasticsearchConfig config) {
     if (config.getIndexRequestFactoryClassName().isPresent()) {
-      return (IndexRequestFactory) Util.getObj(config.getIndexRequestFactoryClassName().get());
+      return Util.getObj(config.getIndexRequestFactoryClassName().get(), IndexRequestFactory.class);
     } else {
       return new DefaultIndexRequestFactory();
     }

http://git-wip-us.apache.org/repos/asf/samza/blob/5d73ecda/samza-hdfs/src/main/scala/org/apache/samza/system/hdfs/HdfsSystemProducer.scala
----------------------------------------------------------------------
diff --git a/samza-hdfs/src/main/scala/org/apache/samza/system/hdfs/HdfsSystemProducer.scala b/samza-hdfs/src/main/scala/org/apache/samza/system/hdfs/HdfsSystemProducer.scala
index 79bca5b..16de121 100644
--- a/samza-hdfs/src/main/scala/org/apache/samza/system/hdfs/HdfsSystemProducer.scala
+++ b/samza-hdfs/src/main/scala/org/apache/samza/system/hdfs/HdfsSystemProducer.scala
@@ -24,14 +24,14 @@ import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.fs.FileSystem
 import org.apache.samza.system.hdfs.writer.HdfsWriter
 import org.apache.samza.system.{OutgoingMessageEnvelope, SystemProducer}
-import org.apache.samza.util.{Logging, TimerUtils}
+import org.apache.samza.util.{Logging, TimerUtil}
 
 import scala.collection.mutable.{Map => MMap}
 
 
 class HdfsSystemProducer(
   systemName: String, clientId: String, config: HdfsConfig, metrics: HdfsSystemProducerMetrics,
-  val clock: () => Long = () => System.currentTimeMillis) extends SystemProducer with Logging with TimerUtils {
+  val clock: () => Long = () => System.currentTimeMillis) extends SystemProducer with Logging with TimerUtil {
   val dfs = FileSystem.newInstance(new Configuration(true))
   val writers: MMap[String, HdfsWriter[_]] = MMap.empty[String, HdfsWriter[_]]
   private val lock = new Object //synchronization lock for thread safe access

http://git-wip-us.apache.org/repos/asf/samza/blob/5d73ecda/samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManagerFactory.scala
----------------------------------------------------------------------
diff --git a/samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManagerFactory.scala b/samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManagerFactory.scala
index 48d6671..8d4098f 100644
--- a/samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManagerFactory.scala
+++ b/samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManagerFactory.scala
@@ -42,7 +42,7 @@ class KafkaCheckpointManagerFactory extends CheckpointManagerFactory with Loggin
       .getSystemFactory(checkpointSystemName)
       .getOrElse(throw new SamzaException("Missing configuration: " + SystemConfig.SYSTEM_FACTORY format checkpointSystemName))
 
-    val checkpointSystemFactory = Util.getObj[SystemFactory](checkpointSystemFactoryName)
+    val checkpointSystemFactory = Util.getObj(checkpointSystemFactoryName, classOf[SystemFactory])
     val checkpointTopic = KafkaUtil.getCheckpointTopic(jobName, jobId, config)
 
     info(s"Creating a KafkaCheckpointManager to consume from $checkpointTopic")

http://git-wip-us.apache.org/repos/asf/samza/blob/5d73ecda/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemProducer.scala
----------------------------------------------------------------------
diff --git a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemProducer.scala b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemProducer.scala
index 9eaf895..2a17df8 100644
--- a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemProducer.scala
+++ b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemProducer.scala
@@ -34,14 +34,14 @@ import org.apache.samza.system.SystemProducerException
 import org.apache.samza.util.ExponentialSleepStrategy
 import org.apache.samza.util.KafkaUtil
 import org.apache.samza.util.Logging
-import org.apache.samza.util.TimerUtils
+import org.apache.samza.util.TimerUtil
 
 class KafkaSystemProducer(systemName: String,
                           retryBackoff: ExponentialSleepStrategy = new ExponentialSleepStrategy,
                           getProducer: () => Producer[Array[Byte], Array[Byte]],
                           metrics: KafkaSystemProducerMetrics,
                           val clock: () => Long = () => System.nanoTime,
-                          val dropProducerExceptions: Boolean = false) extends SystemProducer with Logging with TimerUtils {
+                          val dropProducerExceptions: Boolean = false) extends SystemProducer with Logging with TimerUtil {
 
   // Represents a fatal error that caused the producer to close.
   val fatalException: AtomicReference[SystemProducerException] = new AtomicReference[SystemProducerException]()

http://git-wip-us.apache.org/repos/asf/samza/blob/5d73ecda/samza-kafka/src/test/java/org/apache/samza/system/kafka/TestKafkaSystemAdminJava.java
----------------------------------------------------------------------
diff --git a/samza-kafka/src/test/java/org/apache/samza/system/kafka/TestKafkaSystemAdminJava.java b/samza-kafka/src/test/java/org/apache/samza/system/kafka/TestKafkaSystemAdminJava.java
index 51af518..125cf61 100644
--- a/samza-kafka/src/test/java/org/apache/samza/system/kafka/TestKafkaSystemAdminJava.java
+++ b/samza-kafka/src/test/java/org/apache/samza/system/kafka/TestKafkaSystemAdminJava.java
@@ -27,7 +27,7 @@ import kafka.api.TopicMetadata;
 import org.apache.samza.system.StreamSpec;
 import org.apache.samza.system.StreamValidationException;
 import org.apache.samza.system.SystemAdmin;
-import org.apache.samza.util.Util;
+import org.apache.samza.util.ScalaJavaUtil;
 import org.junit.Test;
 import org.mockito.Mockito;
 
@@ -58,7 +58,7 @@ public class TestKafkaSystemAdminJava extends TestKafkaSystemAdmin {
     Properties coordProps = new Properties();
     Map<String, ChangelogInfo> changeLogMap = new HashMap<>();
 
-    KafkaSystemAdmin admin = Mockito.spy(createSystemAdmin(coordProps, 1, Util.javaMapAsScalaMap(changeLogMap)));
+    KafkaSystemAdmin admin = Mockito.spy(createSystemAdmin(coordProps, 1, ScalaJavaUtil.toScalaMap(changeLogMap)));
     StreamSpec spec = StreamSpec.createCoordinatorStreamSpec(STREAM, SYSTEM());
 
     Mockito.doAnswer(invocationOnMock -> {
@@ -90,7 +90,7 @@ public class TestKafkaSystemAdminJava extends TestKafkaSystemAdmin {
     Map<String, ChangelogInfo> changeLogMap = new HashMap<>();
     changeLogMap.put(STREAM, new ChangelogInfo(REP_FACTOR, changeLogProps));
 
-    KafkaSystemAdmin admin = Mockito.spy(createSystemAdmin(coordProps, 1, Util.javaMapAsScalaMap(changeLogMap)));
+    KafkaSystemAdmin admin = Mockito.spy(createSystemAdmin(coordProps, 1, ScalaJavaUtil.toScalaMap(changeLogMap)));
     StreamSpec spec = StreamSpec.createChangeLogStreamSpec(STREAM, SYSTEM(), PARTITIONS);
 
     Mockito.doAnswer(invocationOnMock -> {
@@ -123,7 +123,7 @@ public class TestKafkaSystemAdminJava extends TestKafkaSystemAdmin {
     Map<String, ChangelogInfo> changeLogMap = new HashMap<>();
     changeLogMap.put(STREAM, new ChangelogInfo(REP_FACTOR, changeLogProps));
 
-    KafkaSystemAdmin admin = Mockito.spy(createSystemAdmin(coordProps, 1, Util.javaMapAsScalaMap(changeLogMap)));
+    KafkaSystemAdmin admin = Mockito.spy(createSystemAdmin(coordProps, 1, ScalaJavaUtil.toScalaMap(changeLogMap)));
     StreamSpec spec = StreamSpec.createChangeLogStreamSpec(STREAM, SYSTEM(), PARTITIONS);
     Mockito.doAnswer(invocationOnMock -> {
       StreamSpec internalSpec = (StreamSpec) invocationOnMock.callRealMethod();

http://git-wip-us.apache.org/repos/asf/samza/blob/5d73ecda/samza-kafka/src/test/scala/org/apache/samza/checkpoint/kafka/TestKafkaCheckpointManager.scala
----------------------------------------------------------------------
diff --git a/samza-kafka/src/test/scala/org/apache/samza/checkpoint/kafka/TestKafkaCheckpointManager.scala b/samza-kafka/src/test/scala/org/apache/samza/checkpoint/kafka/TestKafkaCheckpointManager.scala
index 03b0d2c..71718b0 100644
--- a/samza-kafka/src/test/scala/org/apache/samza/checkpoint/kafka/TestKafkaCheckpointManager.scala
+++ b/samza-kafka/src/test/scala/org/apache/samza/checkpoint/kafka/TestKafkaCheckpointManager.scala
@@ -191,7 +191,7 @@ class TestKafkaCheckpointManager extends KafkaServerTestHarness {
       .getSystemFactory(systemName)
       .getOrElse(throw new SamzaException("Missing configuration: " + SystemConfig.SYSTEM_FACTORY format systemName))
 
-    val systemFactory = Util.getObj[SystemFactory](systemFactoryClassName)
+    val systemFactory = Util.getObj(systemFactoryClassName, classOf[SystemFactory])
 
     val spec = new KafkaStreamSpec("id", cpTopic, checkpointSystemName, 1, 1, false, props)
     new KafkaCheckpointManager(spec, systemFactory, failOnTopicValidation, config, new NoOpMetricsRegistry, serde)

http://git-wip-us.apache.org/repos/asf/samza/blob/5d73ecda/samza-kv-rocksdb/src/main/java/org/apache/samza/storage/kv/RocksDbKeyValueReader.java
----------------------------------------------------------------------
diff --git a/samza-kv-rocksdb/src/main/java/org/apache/samza/storage/kv/RocksDbKeyValueReader.java b/samza-kv-rocksdb/src/main/java/org/apache/samza/storage/kv/RocksDbKeyValueReader.java
index 1fa78f8..9dca23c 100644
--- a/samza-kv-rocksdb/src/main/java/org/apache/samza/storage/kv/RocksDbKeyValueReader.java
+++ b/samza-kv-rocksdb/src/main/java/org/apache/samza/storage/kv/RocksDbKeyValueReader.java
@@ -25,6 +25,7 @@ import org.apache.samza.SamzaException;
 import org.apache.samza.config.Config;
 import org.apache.samza.config.JavaSerializerConfig;
 import org.apache.samza.config.JavaStorageConfig;
+import org.apache.samza.config.SerializerConfig$;
 import org.apache.samza.container.SamzaContainerContext;
 import org.apache.samza.container.TaskName;
 import org.apache.samza.metrics.MetricsRegistryMap;
@@ -125,8 +126,8 @@ public class RocksDbKeyValueReader {
   private Serde<Object> getSerdeFromName(String name, JavaSerializerConfig serializerConfig) {
     String serdeClassName = serializerConfig.getSerdeClass(name);
     if (serdeClassName == null) {
-      serdeClassName = Util.defaultSerdeFactoryFromSerdeName(name);
+      serdeClassName = SerializerConfig$.MODULE$.getSerdeFactoryName(name);
     }
-    return Util.<SerdeFactory<Object>> getObj(serdeClassName).getSerde(name, serializerConfig);
+    return Util.getObj(serdeClassName, SerdeFactory.class).getSerde(name, serializerConfig);
   }
 }

http://git-wip-us.apache.org/repos/asf/samza/blob/5d73ecda/samza-kv-rocksdb/src/main/scala/org/apache/samza/storage/kv/RocksDbKeyValueStore.scala
----------------------------------------------------------------------
diff --git a/samza-kv-rocksdb/src/main/scala/org/apache/samza/storage/kv/RocksDbKeyValueStore.scala b/samza-kv-rocksdb/src/main/scala/org/apache/samza/storage/kv/RocksDbKeyValueStore.scala
index eae7da2..856cc4e 100644
--- a/samza-kv-rocksdb/src/main/scala/org/apache/samza/storage/kv/RocksDbKeyValueStore.scala
+++ b/samza-kv-rocksdb/src/main/scala/org/apache/samza/storage/kv/RocksDbKeyValueStore.scala
@@ -20,12 +20,13 @@
 package org.apache.samza.storage.kv
 
 import java.io.File
+import java.util.Comparator
 import java.util.concurrent.TimeUnit
 import java.util.concurrent.locks.ReentrantReadWriteLock
 
 import org.apache.samza.SamzaException
 import org.apache.samza.config.Config
-import org.apache.samza.util.{LexicographicComparator, Logging}
+import org.apache.samza.util.Logging
 import org.rocksdb.{TtlDB, _}
 
 object RocksDbKeyValueStore extends Logging {
@@ -301,4 +302,21 @@ class RocksDbKeyValueStore(
       super.hasNext() && comparator.compare(peekKey(), to) < 0
     }
   }
+
+  /**
+    * A comparator that applies a lexicographical comparison on byte arrays.
+    */
+  class LexicographicComparator extends Comparator[Array[Byte]] {
+    def compare(k1: Array[Byte], k2: Array[Byte]): Int = {
+      val l = math.min(k1.length, k2.length)
+      var i = 0
+      while (i < l) {
+        if (k1(i) != k2(i))
+          return (k1(i) & 0xff) - (k2(i) & 0xff)
+        i += 1
+      }
+      // okay prefixes are equal, the shorter array is less
+      k1.length - k2.length
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/samza/blob/5d73ecda/samza-kv/src/main/scala/org/apache/samza/storage/kv/BaseKeyValueStorageEngineFactory.scala
----------------------------------------------------------------------
diff --git a/samza-kv/src/main/scala/org/apache/samza/storage/kv/BaseKeyValueStorageEngineFactory.scala b/samza-kv/src/main/scala/org/apache/samza/storage/kv/BaseKeyValueStorageEngineFactory.scala
index e3a2970..da80560 100644
--- a/samza-kv/src/main/scala/org/apache/samza/storage/kv/BaseKeyValueStorageEngineFactory.scala
+++ b/samza-kv/src/main/scala/org/apache/samza/storage/kv/BaseKeyValueStorageEngineFactory.scala
@@ -25,12 +25,11 @@ import org.apache.samza.SamzaException
 import org.apache.samza.container.SamzaContainerContext
 import org.apache.samza.metrics.MetricsRegistry
 import org.apache.samza.serializers.Serde
-import org.apache.samza.storage.{StoreProperties, StorageEngine, StorageEngineFactory}
+import org.apache.samza.storage.{StorageEngine, StorageEngineFactory, StoreProperties}
 import org.apache.samza.system.SystemStreamPartition
 import org.apache.samza.task.MessageCollector
 import org.apache.samza.config.MetricsConfig.Config2Metrics
-import org.apache.samza.util.HighResolutionClock
-import org.apache.samza.util.Util.asScalaClock
+import org.apache.samza.util.{HighResolutionClock, ScalaJavaUtil}
 
 /**
  * A key value storage engine factory implementation
@@ -152,7 +151,8 @@ trait BaseKeyValueStorageEngineFactory[K, V] extends StorageEngineFactory[K, V]
       }
     }
 
-    new KeyValueStorageEngine(storePropertiesBuilder.build(), nullSafeStore, rawStore, keyValueStorageEngineMetrics, batchSize, clock)
+    new KeyValueStorageEngine(storePropertiesBuilder.build(), nullSafeStore, rawStore,
+      keyValueStorageEngineMetrics, batchSize, () => clock.nanoTime())
   }
 
 }

http://git-wip-us.apache.org/repos/asf/samza/blob/5d73ecda/samza-kv/src/main/scala/org/apache/samza/storage/kv/KeyValueStorageEngine.scala
----------------------------------------------------------------------
diff --git a/samza-kv/src/main/scala/org/apache/samza/storage/kv/KeyValueStorageEngine.scala b/samza-kv/src/main/scala/org/apache/samza/storage/kv/KeyValueStorageEngine.scala
index 373e18a..5f7bbd8 100644
--- a/samza-kv/src/main/scala/org/apache/samza/storage/kv/KeyValueStorageEngine.scala
+++ b/samza-kv/src/main/scala/org/apache/samza/storage/kv/KeyValueStorageEngine.scala
@@ -22,7 +22,7 @@ package org.apache.samza.storage.kv
 import org.apache.samza.util.Logging
 import org.apache.samza.storage.{StoreProperties, StorageEngine}
 import org.apache.samza.system.IncomingMessageEnvelope
-import org.apache.samza.util.TimerUtils
+import org.apache.samza.util.TimerUtil
 
 import scala.collection.JavaConverters._
 
@@ -37,7 +37,7 @@ class KeyValueStorageEngine[K, V](
   rawStore: KeyValueStore[Array[Byte], Array[Byte]],
   metrics: KeyValueStorageEngineMetrics = new KeyValueStorageEngineMetrics,
   batchSize: Int = 500,
-  val clock: () => Long = { System.nanoTime }) extends StorageEngine with KeyValueStore[K, V] with TimerUtils with Logging {
+  val clock: () => Long = { System.nanoTime }) extends StorageEngine with KeyValueStore[K, V] with TimerUtil with Logging {
 
   var count = 0
 

http://git-wip-us.apache.org/repos/asf/samza/blob/5d73ecda/samza-kv/src/main/scala/org/apache/samza/storage/kv/NullSafeKeyValueStore.scala
----------------------------------------------------------------------
diff --git a/samza-kv/src/main/scala/org/apache/samza/storage/kv/NullSafeKeyValueStore.scala b/samza-kv/src/main/scala/org/apache/samza/storage/kv/NullSafeKeyValueStore.scala
index 3cc35d3..7adffa9 100644
--- a/samza-kv/src/main/scala/org/apache/samza/storage/kv/NullSafeKeyValueStore.scala
+++ b/samza-kv/src/main/scala/org/apache/samza/storage/kv/NullSafeKeyValueStore.scala
@@ -19,8 +19,6 @@
 
 package org.apache.samza.storage.kv
 
-import org.apache.samza.util.Util.notNull
-
 import scala.collection.JavaConverters._
 
 object NullSafeKeyValueStore {
@@ -85,4 +83,10 @@ class NullSafeKeyValueStore[K, V](store: KeyValueStore[K, V]) extends KeyValueSt
   def close {
     store.close
   }
+
+  private def notNull[T](obj: T, msg: String) = {
+    if (obj == null) {
+      throw new NullPointerException(msg)
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/samza/blob/5d73ecda/samza-log4j/src/main/java/org/apache/samza/logging/log4j/StreamAppender.java
----------------------------------------------------------------------
diff --git a/samza-log4j/src/main/java/org/apache/samza/logging/log4j/StreamAppender.java b/samza-log4j/src/main/java/org/apache/samza/logging/log4j/StreamAppender.java
index ec63358..ab29b71 100644
--- a/samza-log4j/src/main/java/org/apache/samza/logging/log4j/StreamAppender.java
+++ b/samza-log4j/src/main/java/org/apache/samza/logging/log4j/StreamAppender.java
@@ -50,6 +50,8 @@ import org.apache.samza.system.SystemAdmin;
 import org.apache.samza.system.SystemFactory;
 import org.apache.samza.system.SystemProducer;
 import org.apache.samza.system.SystemStream;
+import org.apache.samza.util.ExponentialSleepStrategy;
+import org.apache.samza.util.HttpUtil;
 import org.apache.samza.util.Util;
 
 /**
@@ -267,9 +269,8 @@ public class StreamAppender extends AppenderSkeleton {
         config = JobModelManager.currentJobModelManager().jobModel().getConfig();
       } else {
         String url = System.getenv(ShellCommandConfig.ENV_COORDINATOR_URL());
-        config = SamzaObjectMapper.getObjectMapper()
-            .readValue(Util.read(new URL(url), 30000), JobModel.class)
-            .getConfig();
+        String response = HttpUtil.read(new URL(url), 30000, new ExponentialSleepStrategy());
+        config = SamzaObjectMapper.getObjectMapper().readValue(response, JobModel.class).getConfig();
       }
     } catch (IOException e) {
       throw new SamzaException("can not read the config", e);
@@ -294,7 +295,7 @@ public class StreamAppender extends AppenderSkeleton {
     String systemName = log4jSystemConfig.getSystemName();
     String systemFactoryName = log4jSystemConfig.getSystemFactory(systemName);
     if (systemFactoryName != null) {
-      systemFactory = Util.<SystemFactory>getObj(systemFactoryName);
+      systemFactory = Util.getObj(systemFactoryName, SystemFactory.class);
     } else {
       throw new SamzaException("Could not figure out \"" + systemName + "\" system factory for log4j StreamAppender to use");
     }
@@ -388,7 +389,7 @@ public class StreamAppender extends AppenderSkeleton {
     }
 
     if (serdeClass != null) {
-      SerdeFactory<LoggingEvent> serdeFactory = Util.<SerdeFactory<LoggingEvent>>getObj(serdeClass);
+      SerdeFactory<LoggingEvent> serdeFactory = Util.getObj(serdeClass, SerdeFactory.class);
       serde = serdeFactory.getSerde(systemName, config);
     } else {
       String serdeKey = String.format(SerializerConfig.SERDE_FACTORY_CLASS(), serdeName);

http://git-wip-us.apache.org/repos/asf/samza/blob/5d73ecda/samza-rest/src/main/java/org/apache/samza/monitor/MonitorLoader.java
----------------------------------------------------------------------
diff --git a/samza-rest/src/main/java/org/apache/samza/monitor/MonitorLoader.java b/samza-rest/src/main/java/org/apache/samza/monitor/MonitorLoader.java
index ff1268c..7ca9b35 100644
--- a/samza-rest/src/main/java/org/apache/samza/monitor/MonitorLoader.java
+++ b/samza-rest/src/main/java/org/apache/samza/monitor/MonitorLoader.java
@@ -19,7 +19,7 @@
 package org.apache.samza.monitor;
 
 import org.apache.samza.metrics.MetricsRegistry;
-import org.apache.samza.util.ClassLoaderHelper;
+import org.apache.samza.util.Util;
 
 /**
  * Helper class that instantiates the Monitor.
@@ -30,7 +30,7 @@ public class MonitorLoader {
       throws InstantiationException {
       String factoryClass = monitorConfig.getMonitorFactoryClass();
       try {
-        MonitorFactory monitorFactory = ClassLoaderHelper.fromClassName(factoryClass);
+        MonitorFactory monitorFactory = Util.getObj(factoryClass, MonitorFactory.class);
         return monitorFactory.getMonitorInstance(monitorName, monitorConfig, metricsRegistry);
       } catch (Exception e) {
         throw (InstantiationException)

http://git-wip-us.apache.org/repos/asf/samza/blob/5d73ecda/samza-rest/src/main/java/org/apache/samza/rest/SamzaRestApplication.java
----------------------------------------------------------------------
diff --git a/samza-rest/src/main/java/org/apache/samza/rest/SamzaRestApplication.java b/samza-rest/src/main/java/org/apache/samza/rest/SamzaRestApplication.java
index a6e0bb0..45b6a39 100644
--- a/samza-rest/src/main/java/org/apache/samza/rest/SamzaRestApplication.java
+++ b/samza-rest/src/main/java/org/apache/samza/rest/SamzaRestApplication.java
@@ -24,7 +24,7 @@ import org.apache.samza.SamzaException;
 import org.apache.samza.config.Config;
 import org.apache.samza.rest.resources.DefaultResourceFactory;
 import org.apache.samza.rest.resources.ResourceFactory;
-import org.apache.samza.util.ClassLoaderHelper;
+import org.apache.samza.util.Util;
 import org.codehaus.jackson.jaxrs.JacksonJsonProvider;
 import org.glassfish.jersey.server.ResourceConfig;
 import org.slf4j.Logger;
@@ -84,7 +84,7 @@ public class SamzaRestApplication extends ResourceConfig {
   private Collection<? extends Object> instantiateFactoryResources(String factoryClassName, Config config)
       throws InstantiationException {
     try {
-      ResourceFactory factory = ClassLoaderHelper.<ResourceFactory>fromClassName(factoryClassName);
+      ResourceFactory factory = Util.getObj(factoryClassName, ResourceFactory.class);
       return factory.getResourceInstances(config);
     } catch (Exception e) {
       throw (InstantiationException)

http://git-wip-us.apache.org/repos/asf/samza/blob/5d73ecda/samza-rest/src/main/java/org/apache/samza/rest/proxy/job/AbstractJobProxy.java
----------------------------------------------------------------------
diff --git a/samza-rest/src/main/java/org/apache/samza/rest/proxy/job/AbstractJobProxy.java b/samza-rest/src/main/java/org/apache/samza/rest/proxy/job/AbstractJobProxy.java
index 492385f..19e006f 100644
--- a/samza-rest/src/main/java/org/apache/samza/rest/proxy/job/AbstractJobProxy.java
+++ b/samza-rest/src/main/java/org/apache/samza/rest/proxy/job/AbstractJobProxy.java
@@ -27,7 +27,7 @@ import org.apache.samza.SamzaException;
 import org.apache.samza.rest.model.Job;
 import org.apache.samza.rest.model.JobStatus;
 import org.apache.samza.rest.resources.JobsResourceConfig;
-import org.apache.samza.util.ClassLoaderHelper;
+import org.apache.samza.util.Util;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -47,10 +47,10 @@ public abstract class AbstractJobProxy implements JobProxy {
    * @return        the JobProxy produced by the factory.
    */
   public static JobProxy fromFactory(JobsResourceConfig config) {
-    String jobProxyFactory = config.getJobProxyFactory();
-    if (jobProxyFactory != null && !jobProxyFactory.isEmpty()) {
+    String jobProxyFactoryClassName = config.getJobProxyFactory();
+    if (jobProxyFactoryClassName != null && !jobProxyFactoryClassName.isEmpty()) {
       try {
-        JobProxyFactory factory = ClassLoaderHelper.fromClassName(jobProxyFactory);
+        JobProxyFactory factory = Util.getObj(jobProxyFactoryClassName, JobProxyFactory.class);
         return factory.getJobProxy(config);
       } catch (Exception e) {
         throw new SamzaException(e);

http://git-wip-us.apache.org/repos/asf/samza/blob/5d73ecda/samza-rest/src/main/java/org/apache/samza/rest/proxy/job/SimpleYarnJobProxy.java
----------------------------------------------------------------------
diff --git a/samza-rest/src/main/java/org/apache/samza/rest/proxy/job/SimpleYarnJobProxy.java b/samza-rest/src/main/java/org/apache/samza/rest/proxy/job/SimpleYarnJobProxy.java
index fbddb30..fd8709f 100644
--- a/samza-rest/src/main/java/org/apache/samza/rest/proxy/job/SimpleYarnJobProxy.java
+++ b/samza-rest/src/main/java/org/apache/samza/rest/proxy/job/SimpleYarnJobProxy.java
@@ -20,13 +20,14 @@ package org.apache.samza.rest.proxy.job;
 
 import java.util.Set;
 import org.apache.samza.SamzaException;
+import org.apache.samza.config.ConfigFactory;
 import org.apache.samza.rest.model.JobStatus;
 import org.apache.samza.rest.model.yarn.YarnApplicationInfo;
 import org.apache.samza.rest.proxy.installation.InstallationFinder;
 import org.apache.samza.rest.proxy.installation.InstallationRecord;
 import org.apache.samza.rest.proxy.installation.SimpleInstallationFinder;
 import org.apache.samza.rest.resources.JobsResourceConfig;
-import org.apache.samza.util.ClassLoaderHelper;
+import org.apache.samza.util.Util;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -49,7 +50,7 @@ public class SimpleYarnJobProxy extends ScriptJobProxy {
   public SimpleYarnJobProxy(JobsResourceConfig config) throws Exception {
     super(config);
     this.installFinder = new SimpleInstallationFinder(config.getInstallationsPath(),
-                                                      ClassLoaderHelper.fromClassName(config.getJobConfigFactory()));
+        Util.getObj(config.getJobConfigFactory(), ConfigFactory.class));
     this.statusProvider = new YarnRestJobStatusProvider(config);
   }