You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by pm...@apache.org on 2018/04/18 20:32:28 UTC
[2/3] samza git commit: Misc. Util cleanup
http://git-wip-us.apache.org/repos/asf/samza/blob/5d73ecda/samza-core/src/main/scala/org/apache/samza/util/FileUtil.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/util/FileUtil.scala b/samza-core/src/main/scala/org/apache/samza/util/FileUtil.scala
new file mode 100644
index 0000000..4b93543
--- /dev/null
+++ b/samza-core/src/main/scala/org/apache/samza/util/FileUtil.scala
@@ -0,0 +1,104 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.samza.util
+
+import java.io.{File, FileInputStream, FileOutputStream, ObjectInputStream, ObjectOutputStream}
+import java.util.zip.CRC32
+
+import org.apache.samza.util.Util.info
+
+object FileUtil {
+ /**
+ * Writes checksum & data to a file
+ * Checksum is pre-fixed to the data and is a 32-bit long type data.
+ * @param file The file handle to write to
+ * @param data The data to be written to the file
+ * */
+ def writeWithChecksum(file: File, data: String) = {
+ val checksum = getChecksum(data)
+ var oos: ObjectOutputStream = null
+ var fos: FileOutputStream = null
+ try {
+ fos = new FileOutputStream(file)
+ oos = new ObjectOutputStream(fos)
+ oos.writeLong(checksum)
+ oos.writeUTF(data)
+ } finally {
+ oos.close()
+ fos.close()
+ }
+ }
+
+ /**
+ * Reads from a file that has a checksum prepended to the data
+ * @param file The file handle to read from
+ * */
+ def readWithChecksum(file: File) = {
+ var fis: FileInputStream = null
+ var ois: ObjectInputStream = null
+ try {
+ fis = new FileInputStream(file)
+ ois = new ObjectInputStream(fis)
+ val checksumFromFile = ois.readLong()
+ val data = ois.readUTF()
+ if(checksumFromFile == getChecksum(data)) {
+ data
+ } else {
+ info("Checksum match failed. Data in file is corrupted. Skipping content.")
+ null
+ }
+ } finally {
+ ois.close()
+ fis.close()
+ }
+ }
+
+ /**
+ * Recursively remove a directory (or file), and all sub-directories. Equivalent
+ * to rm -rf.
+ */
+ def rm(file: File) {
+ if (file == null) {
+ return
+ } else if (file.isDirectory) {
+ val files = file.listFiles()
+ if (files != null) {
+ for (f <- files)
+ rm(f)
+ }
+ file.delete()
+ } else {
+ file.delete()
+ }
+ }
+
+ /**
+ * Generates the CRC32 checksum code for any given data
+ * @param data The string for which checksum has to be generated
+ * @return long type value representing the checksum
+ * */
+ def getChecksum(data: String) = {
+ val crc = new CRC32
+ crc.update(data.getBytes)
+ crc.getValue
+ }
+}
http://git-wip-us.apache.org/repos/asf/samza/blob/5d73ecda/samza-core/src/main/scala/org/apache/samza/util/HttpUtil.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/util/HttpUtil.scala b/samza-core/src/main/scala/org/apache/samza/util/HttpUtil.scala
new file mode 100644
index 0000000..ea5eb5a
--- /dev/null
+++ b/samza-core/src/main/scala/org/apache/samza/util/HttpUtil.scala
@@ -0,0 +1,89 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.samza.util
+
+import java.io.{BufferedReader, IOException, InputStream, InputStreamReader}
+import java.net.{HttpURLConnection, URL}
+
+import org.apache.samza.SamzaException
+import org.apache.samza.util.Util.{error, warn}
+
+object HttpUtil {
+
+ /**
+ * Reads a URL and returns the response body as a string. Retries in an exponential backoff, but does no other error handling.
+ *
+ * @param url HTTP URL to read from.
+ * @param timeout how long to wait before timing out when connecting to or reading from the HTTP server.
+ * @param retryBackoff instance of exponentialSleepStrategy that encapsulates info on how long to sleep and retry operation
+ * @return string payload of the body of the HTTP response.
+ */
+ def read(url: URL, timeout: Int = 60000, retryBackoff: ExponentialSleepStrategy = new ExponentialSleepStrategy): String = {
+ var httpConn = getHttpConnection(url, timeout)
+ retryBackoff.run(loop => {
+ if(httpConn.getResponseCode != 200)
+ {
+ warn("Error: " + httpConn.getResponseCode)
+ val errorContent = readStream(httpConn.getErrorStream)
+ warn("Error reading stream, failed with response %s" format errorContent)
+ httpConn = getHttpConnection(url, timeout)
+ }
+ else
+ {
+ loop.done
+ }
+ },
+ (exception, loop) => {
+ exception match {
+ case ioe: IOException => {
+ warn("Error getting response from Job coordinator server. received IOException: %s. Retrying..." format ioe.getClass)
+ httpConn = getHttpConnection(url, timeout)
+ }
+ case e: Exception =>
+ loop.done
+ error("Unable to connect to Job coordinator server, received exception", e)
+ throw e
+ }
+ })
+
+ if(httpConn.getResponseCode != 200) {
+ throw new SamzaException("Unable to read JobModel from Jobcoordinator HTTP server")
+ }
+ readStream(httpConn.getInputStream)
+ }
+
+ def getHttpConnection(url: URL, timeout: Int): HttpURLConnection = {
+ val conn = url.openConnection()
+ conn.setConnectTimeout(timeout)
+ conn.setReadTimeout(timeout)
+ conn.asInstanceOf[HttpURLConnection]
+ }
+
+ private def readStream(stream: InputStream): String = {
+ val br = new BufferedReader(new InputStreamReader(stream))
+ var line: String = null
+ val body = Iterator.continually(br.readLine()).takeWhile(_ != null).mkString
+ br.close
+ stream.close
+ body
+ }
+}
http://git-wip-us.apache.org/repos/asf/samza/blob/5d73ecda/samza-core/src/main/scala/org/apache/samza/util/LexicographicComparator.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/util/LexicographicComparator.scala b/samza-core/src/main/scala/org/apache/samza/util/LexicographicComparator.scala
deleted file mode 100644
index 93c5220..0000000
--- a/samza-core/src/main/scala/org/apache/samza/util/LexicographicComparator.scala
+++ /dev/null
@@ -1,39 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.samza.util;
-
-import java.util.Comparator
-
-/**
- * A comparator that applies a lexicographical comparison on byte arrays.
- */
-class LexicographicComparator extends Comparator[Array[Byte]] {
- def compare(k1: Array[Byte], k2: Array[Byte]): Int = {
- val l = math.min(k1.length, k2.length)
- var i = 0
- while (i < l) {
- if (k1(i) != k2(i))
- return (k1(i) & 0xff) - (k2(i) & 0xff)
- i += 1
- }
- // okay prefixes are equal, the shorter array is less
- k1.length - k2.length
- }
-}
http://git-wip-us.apache.org/repos/asf/samza/blob/5d73ecda/samza-core/src/main/scala/org/apache/samza/util/ScalaJavaUtil.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/util/ScalaJavaUtil.scala b/samza-core/src/main/scala/org/apache/samza/util/ScalaJavaUtil.scala
new file mode 100644
index 0000000..f3ba746
--- /dev/null
+++ b/samza-core/src/main/scala/org/apache/samza/util/ScalaJavaUtil.scala
@@ -0,0 +1,62 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.samza.util
+
+import scala.collection.immutable.Map
+import scala.collection.JavaConverters._
+import scala.runtime.AbstractFunction0
+
+object ScalaJavaUtil {
+
+ /**
+ * Convert a Java map to a Scala immutable Map
+ * */
+ def toScalaMap[K, V](javaMap: java.util.Map[K, V]): Map[K, V] = {
+ javaMap.asScala.toMap
+ }
+
+ /**
+ * Wraps the provided value in an Scala Function, e.g. for use in [[Option#getOrDefault]]
+ *
+ * @param value the value to be wrapped
+ * @tparam T type of the value
+ * @return an AbstractFunction0 that returns contained value when called
+ */
+ def defaultValue[T](value: T): AbstractFunction0[T] = {
+ new AbstractFunction0[T] {
+ override def apply(): T = value
+ }
+ }
+
+ /**
+ * Wraps the provided Java Supplier in an Scala Function, e.g. for use in [[Option#getOrDefault]]
+ *
+ * @param javaFunction the java Supplier function to be wrapped
+ * @tparam T type of the value
+ * @return an AbstractFunction0 that returns contained value when called
+ */
+ def toScalaFunction[T](javaFunction: java.util.function.Supplier[T]): AbstractFunction0[T] = {
+ new AbstractFunction0[T] {
+ override def apply(): T = javaFunction.get()
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/samza/blob/5d73ecda/samza-core/src/main/scala/org/apache/samza/util/TimerUtil.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/util/TimerUtil.scala b/samza-core/src/main/scala/org/apache/samza/util/TimerUtil.scala
new file mode 100644
index 0000000..b538145
--- /dev/null
+++ b/samza-core/src/main/scala/org/apache/samza/util/TimerUtil.scala
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.util
+
+import org.apache.samza.metrics.Timer
+
+/**
+ * A helper class to facilitate updating [[org.apache.samza.metrics.Timer]] metrics
+ */
+trait TimerUtil {
+ val clock: () => Long
+
+ /**
+ * A helper method to update the [[org.apache.samza.metrics.Timer]] metric.
+ * It accepts a [[org.apache.samza.metrics.Timer]] instance and a code block.
+ * It updates the Timer instance with the duration of running code block.
+ */
+ def updateTimer[T](timer: Timer)(runCodeBlock: => T): T = {
+ val startingTime = clock()
+ val returnValue = runCodeBlock
+ timer.update(clock() - startingTime)
+ returnValue
+ }
+
+ /**
+ * A helper method to update the [[org.apache.samza.metrics.Timer]] metrics.
+ * It accepts a [[org.apache.samza.metrics.Timer]] instance and a code block
+ * with no return value. It passes one Long parameter to code block that contains
+ * current time in nanoseconds. It updates the Timer instance with the duration of
+ * running code block and returns the same duration.
+ */
+ def updateTimerAndGetDuration(timer: Timer)(runCodeBlock: Long => Unit): Long = {
+ val startingTime = clock()
+ runCodeBlock(startingTime)
+ val duration = clock() - startingTime
+ timer.update(duration)
+ duration
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/samza/blob/5d73ecda/samza-core/src/main/scala/org/apache/samza/util/TimerUtils.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/util/TimerUtils.scala b/samza-core/src/main/scala/org/apache/samza/util/TimerUtils.scala
deleted file mode 100644
index 63935a7..0000000
--- a/samza-core/src/main/scala/org/apache/samza/util/TimerUtils.scala
+++ /dev/null
@@ -1,56 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.samza.util
-
-import org.apache.samza.metrics.Timer
-
-/**
- * a helper class to facilitate update {@link org.apache.samza.metrics.Timer} metric
- */
-trait TimerUtils {
- val clock: () => Long
-
- /**
- * A helper method to update the {@link org.apache.samza.metrics.Timer} metric.
- * It accepts a {@link org.apache.samza.metrics.Timer} instance and a code block.
- * It updates the Timer instance with the duration of running code block.
- */
- def updateTimer[T](timer: Timer)(runCodeBlock: => T): T = {
- val startingTime = clock()
- val returnValue = runCodeBlock
- timer.update(clock() - startingTime)
- returnValue
- }
-
- /**
- * A helper method to update the {@link org.apache.samza.metrics.Timer} metrics.
- * It accepts a {@link org.apache.samza.metrics.Timer} instance and a code block
- * with no return value. It passes one Long parameter to code block that contains
- * current time in nanoseconds. It updates the Timer instance with the duration of
- * running code block and returns the same duration.
- */
- def updateTimerAndGetDuration(timer: Timer)(runCodeBlock: Long => Unit): Long = {
- val startingTime = clock()
- runCodeBlock(startingTime)
- val duration = clock() - startingTime
- timer.update(duration)
- duration
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/samza/blob/5d73ecda/samza-core/src/main/scala/org/apache/samza/util/Util.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/util/Util.scala b/samza-core/src/main/scala/org/apache/samza/util/Util.scala
index e12c81a..059eb03 100644
--- a/samza-core/src/main/scala/org/apache/samza/util/Util.scala
+++ b/samza-core/src/main/scala/org/apache/samza/util/Util.scala
@@ -19,28 +19,25 @@
package org.apache.samza.util
-import java.io._
-import java.lang.management.ManagementFactory
-import java.net._
-import java.util.Random
-import java.util.zip.CRC32
import org.apache.samza.config.JobConfig.Config2Job
-import org.apache.samza.config.SystemConfig.Config2System
import org.apache.samza.config._
-import org.apache.samza.serializers._
-import org.apache.samza.system.{SystemFactory, SystemStream, SystemStreamPartition}
-import org.apache.samza.{Partition, SamzaException}
+import org.apache.samza.system.SystemStream
+import org.apache.samza.SamzaException
+
+import java.lang.management.ManagementFactory
+import java.net.Inet4Address
+import java.net.InetAddress
+import java.net.NetworkInterface
+import java.util.Random
import scala.collection.JavaConverters._
-import scala.collection.immutable.Map
object Util extends Logging {
val Random = new Random
val ThreadMxBean = ManagementFactory.getThreadMXBean
- def clock: Long = System.currentTimeMillis
/**
* Make an environment variable string safe to pass.
*/
@@ -53,28 +50,9 @@ object Util extends Logging {
startInclusive + Random.nextInt(endExclusive - startInclusive)
/**
- * Recursively remove a directory (or file), and all sub-directories. Equivalent
- * to rm -rf.
+ * Instantiate an object of type T from a given className.
*/
- def rm(file: File) {
- if (file == null) {
- return
- } else if (file.isDirectory) {
- val files = file.listFiles()
- if (files != null) {
- for (f <- files)
- rm(f)
- }
- file.delete()
- } else {
- file.delete()
- }
- }
-
- /**
- * Instantiate a class instance from a given className.
- */
- def getObj[T](className: String) = {
+ def getObj[T](className: String, clazz: Class[T]) = {
try {
Class
.forName(className)
@@ -82,7 +60,7 @@ object Util extends Logging {
.asInstanceOf[T]
} catch {
case e: Throwable => {
- error("Unable to instantiate a class instance for %s." format className, e)
+ error("Unable to create an instance for class %s." format className, e)
throw e
}
}
@@ -109,244 +87,22 @@ object Util extends Logging {
}
/**
- * Makes sure that an object is not null, and throws a NullPointerException
- * if it is.
- */
- def notNull[T](obj: T, msg: String) = if (obj == null) {
- throw new NullPointerException(msg)
- }
-
- /**
- * Returns the name representing the JVM. It usually contains the PID of the process plus some additional information
- * @return String that contains the name representing this JVM
- */
- def getContainerPID(): String = {
- ManagementFactory.getRuntimeMXBean().getName()
- }
-
- /**
- * Overriding read method defined below so that it can be accessed from Java classes with default values
- */
- def read(url: URL, timeout: Int): String = {
- read(url, timeout, new ExponentialSleepStrategy)
- }
-
- /**
- * Reads a URL and returns its body as a string. Does no error handling.
- *
- * @param url HTTP URL to read from.
- * @param timeout How long to wait before timing out when connecting to or reading from the HTTP server.
- * @param retryBackoff Instance of exponentialSleepStrategy that encapsulates info on how long to sleep and retry operation
- * @return String payload of the body of the HTTP response.
- */
- def read(url: URL, timeout: Int = 60000, retryBackoff: ExponentialSleepStrategy = new ExponentialSleepStrategy): String = {
- var httpConn = getHttpConnection(url, timeout)
- retryBackoff.run(loop => {
- if(httpConn.getResponseCode != 200)
- {
- warn("Error: " + httpConn.getResponseCode)
- val errorContent = readStream(httpConn.getErrorStream)
- warn("Error reading stream, failed with response %s" format errorContent)
- httpConn = getHttpConnection(url, timeout)
- }
- else
- {
- loop.done
- }
- },
- (exception, loop) => {
- exception match {
- case ioe: IOException => {
- warn("Error getting response from Job coordinator server. received IOException: %s. Retrying..." format ioe.getClass)
- httpConn = getHttpConnection(url, timeout)
- }
- case e: Exception =>
- loop.done
- error("Unable to connect to Job coordinator server, received exception", e)
- throw e
- }
- })
-
- if(httpConn.getResponseCode != 200) {
- throw new SamzaException("Unable to read JobModel from Jobcoordinator HTTP server")
- }
- readStream(httpConn.getInputStream)
- }
-
- def getHttpConnection(url: URL, timeout: Int): HttpURLConnection = {
- val conn = url.openConnection()
- conn.setConnectTimeout(timeout)
- conn.setReadTimeout(timeout)
- conn.asInstanceOf[HttpURLConnection]
- }
- private def readStream(stream: InputStream): String = {
- val br = new BufferedReader(new InputStreamReader(stream));
- var line: String = null;
- val body = Iterator.continually(br.readLine()).takeWhile(_ != null).mkString
- br.close
- stream.close
- body
- }
-
- /**
- * Generates a coordinator stream name based on the job name and job id
- * for the job. The format of the stream name will be:
- * __samza_coordinator_<JOBNAME>_<JOBID>.
- */
- def getCoordinatorStreamName(jobName: String, jobId: String) = {
- "__samza_coordinator_%s_%s" format (jobName.replaceAll("_", "-"), jobId.replaceAll("_", "-"))
- }
-
- /**
- * Get a job's name and ID given a config. Job ID is defaulted to 1 if not
- * defined in the config, and job name must be defined in config.
- *
- * @return A tuple of (jobName, jobId)
- */
- def getJobNameAndId(config: Config) = {
- (config.getName.getOrElse(throw new ConfigException("Missing required config: job.name")), config.getJobId.getOrElse("1"))
- }
-
- /**
- * Given a job's full config object, build a subset config which includes
- * only the job name, job id, and system config for the coordinator stream.
- */
- def buildCoordinatorStreamConfig(config: Config) = {
- val (jobName, jobId) = getJobNameAndId(config)
- // Build a map with just the system config and job.name/job.id. This is what's required to start the JobCoordinator.
- val map = config.subset(SystemConfig.SYSTEM_PREFIX format config.getCoordinatorSystemName, false).asScala ++
- Map[String, String](
- JobConfig.JOB_NAME -> jobName,
- JobConfig.JOB_ID -> jobId,
- JobConfig.JOB_COORDINATOR_SYSTEM -> config.getCoordinatorSystemName,
- JobConfig.MONITOR_PARTITION_CHANGE_FREQUENCY_MS -> String.valueOf(config.getMonitorPartitionChangeFrequency))
- new MapConfig(map.asJava)
- }
-
- /**
- * Get the coordinator system stream from the configuration
- * @param config
- * @return
- */
- def getCoordinatorSystemStream(config: Config) = {
- val systemName = config.getCoordinatorSystemName
- val (jobName, jobId) = Util.getJobNameAndId(config)
- val streamName = Util.getCoordinatorStreamName(jobName, jobId)
- new SystemStream(systemName, streamName)
- }
-
- /**
- * Get the coordinator system factory from the configuration
- * @param config
- * @return
- */
- def getCoordinatorSystemFactory(config: Config) = {
- val systemName = config.getCoordinatorSystemName
- val systemFactoryClassName = config
- .getSystemFactory(systemName)
- .getOrElse(throw new SamzaException("Missing configuration: " + SystemConfig.SYSTEM_FACTORY format systemName))
- Util.getObj[SystemFactory](systemFactoryClassName)
- }
-
- /**
- * The helper function converts a SSP to a string
- * @param ssp System stream partition
- * @return The string representation of the SSP
- */
- def sspToString(ssp: SystemStreamPartition): String = {
- ssp.getSystem() + "." + ssp.getStream() + "." + String.valueOf(ssp.getPartition().getPartitionId())
- }
-
- /**
- * The method converts the string SSP back to a SSP
- * @param ssp The string form of the SSP
- * @return An SSP typed object
- */
- def stringToSsp(ssp: String): SystemStreamPartition = {
- val idx = ssp.indexOf('.');
- val lastIdx = ssp.lastIndexOf('.')
- if (idx < 0 || lastIdx < 0) {
- throw new IllegalArgumentException("System stream partition expected in format 'system.stream.partition")
- }
- new SystemStreamPartition(new SystemStream(ssp.substring(0, idx), ssp.substring(idx + 1, lastIdx)),
- new Partition(Integer.parseInt(ssp.substring(lastIdx + 1))))
- }
-
- /**
- * Method to generate the CRC32 checksum code for any given data
- * @param data The string for which checksum has to be generated
- * @return long type value representing the checksum
- * */
- def getChecksumValue(data: String) = {
- val crc = new CRC32
- crc.update(data.getBytes)
- crc.getValue
- }
-
- /**
- * Method that always writes checksum & data to a file
- * Checksum is pre-fixed to the data and is a 32-bit long type data.
- * @param file The file handle to write to
- * @param data The data to be written to the file
- * */
- def writeDataToFile(file: File, data: String) = {
- val checksum = getChecksumValue(data)
- var oos: ObjectOutputStream = null
- var fos: FileOutputStream = null
- try {
- fos = new FileOutputStream(file)
- oos = new ObjectOutputStream(fos)
- oos.writeLong(checksum)
- oos.writeUTF(data)
- } finally {
- oos.close()
- fos.close()
- }
- }
-
- /**
- * Method to read from a file that has a checksum prepended to the data
- * @param file The file handle to read from
- * */
- def readDataFromFile(file: File) = {
- var fis: FileInputStream = null
- var ois: ObjectInputStream = null
- try {
- fis = new FileInputStream(file)
- ois = new ObjectInputStream(fis)
- val checksumFromFile = ois.readLong()
- val data = ois.readUTF()
- if(checksumFromFile == getChecksumValue(data)) {
- data
- } else {
- info("Checksum match failed. Data in file is corrupted. Skipping content.")
- null
- }
- } finally {
- ois.close()
- fis.close()
- }
- }
-
- /**
- * Convert a java map to a Scala map
- * */
- def javaMapAsScalaMap[T, K](javaMap: java.util.Map[T, K]): Map[T, K] = {
- javaMap.asScala.toMap
- }
-
- /**
- * Returns the the first host address which is not the loopback address, or {@link java.net.InetAddress#getLocalHost InetAddress.getLocalhost()} as a fallback
+ * Returns the the first host address which is not the loopback address, or [[java.net.InetAddress#getLocalHost]] as a fallback
*
- * @return the {@link java.net.InetAddress InetAddress} which represents the localhost
+ * @return the [[java.net.InetAddress]] which represents the localhost
*/
def getLocalHost: InetAddress = {
val localHost = InetAddress.getLocalHost
if (localHost.isLoopbackAddress) {
debug("Hostname %s resolves to a loopback address, trying to resolve an external IP address.".format(localHost.getHostName))
- val networkInterfaces = if (System.getProperty("os.name").startsWith("Windows")) NetworkInterface.getNetworkInterfaces.asScala.toList else NetworkInterface.getNetworkInterfaces.asScala.toList.reverse
+ val networkInterfaces = if (System.getProperty("os.name").startsWith("Windows")) {
+ NetworkInterface.getNetworkInterfaces.asScala.toList
+ } else {
+ NetworkInterface.getNetworkInterfaces.asScala.toList.reverse
+ }
for (networkInterface <- networkInterfaces) {
- val addresses = networkInterface.getInetAddresses.asScala.toList.filterNot(address => address.isLinkLocalAddress || address.isLoopbackAddress)
+ val addresses = networkInterface.getInetAddresses.asScala.toList
+ .filterNot(address => address.isLinkLocalAddress || address.isLoopbackAddress)
if (addresses.nonEmpty) {
val address = addresses.find(_.isInstanceOf[Inet4Address]).getOrElse(addresses.head)
debug("Found an external IP address %s which represents the localhost.".format(address.getHostAddress))
@@ -358,58 +114,6 @@ object Util extends Logging {
}
/**
- * A helper function which returns system's default serde factory class according to the
- * serde name. If not found, throw exception.
- */
- def defaultSerdeFactoryFromSerdeName(serdeName: String) = {
- info("looking for default serdes")
-
- val serde = serdeName match {
- case "byte" => classOf[ByteSerdeFactory].getCanonicalName
- case "bytebuffer" => classOf[ByteBufferSerdeFactory].getCanonicalName
- case "integer" => classOf[IntegerSerdeFactory].getCanonicalName
- case "json" => classOf[JsonSerdeFactory].getCanonicalName
- case "long" => classOf[LongSerdeFactory].getCanonicalName
- case "serializable" => classOf[SerializableSerdeFactory[java.io.Serializable]].getCanonicalName
- case "string" => classOf[StringSerdeFactory].getCanonicalName
- case "double" => classOf[DoubleSerdeFactory].getCanonicalName
- case _ => throw new SamzaException("defaultSerdeFactoryFromSerdeName: No class defined for serde %s" format serdeName)
- }
- info("use default serde %s for %s" format (serde, serdeName))
- serde
- }
-
- /**
- * Add the supplied arguments and handle overflow by clamping the resulting sum to
- * {@code Long.MinValue} if the sum would have been less than {@code Long.MinValue} or
- * {@code Long.MaxValue} if the sum would have been greater than {@code Long.MaxValue}.
- *
- * @param lhs left hand side of sum
- * @param rhs right hand side of sum
- * @return the sum if no overflow occurs, or the clamped extreme if it does.
- */
- def clampAdd(lhs: Long, rhs: Long): Long = {
- val sum = lhs + rhs
-
- // From "Hacker's Delight", overflow occurs IFF both operands have the same sign and the
- // sign of the sum differs from the operands. Here we're doing a basic bitwise check that
- // collapses 6 branches down to 2. The expression {@code lhs ^ rhs} will have the high-order
- // bit set to true IFF the signs are different.
- if ((~(lhs ^ rhs) & (lhs ^ sum)) < 0) {
- return if (lhs >= 0) Long.MaxValue else Long.MinValue
- }
-
- sum
- }
-
- /**
- * Implicitly convert the Java TimerClock to Scala clock function which returns long timestamp.
- * @param c Java TimeClock
- * @return Scala clock function
- */
- implicit def asScalaClock(c: HighResolutionClock): () => Long = () => c.nanoTime()
-
- /**
* Re-writes configuration using a ConfigRewriter, if one is defined. If
* there is no ConfigRewriter defined for the job, then this method is a
* no-op.
@@ -419,10 +123,10 @@ object Util extends Logging {
*/
def rewriteConfig(config: Config): Config = {
def rewrite(c: Config, rewriterName: String): Config = {
- val klass = config
+ val rewriterClassName = config
.getConfigRewriterClass(rewriterName)
.getOrElse(throw new SamzaException("Unable to find class config for config rewriter %s." format rewriterName))
- val rewriter = Util.getObj[ConfigRewriter](klass)
+ val rewriter = Util.getObj(rewriterClassName, classOf[ConfigRewriter])
info("Re-writing config with " + rewriter)
rewriter.rewrite(rewriterName, c)
}
http://git-wip-us.apache.org/repos/asf/samza/blob/5d73ecda/samza-core/src/test/java/org/apache/samza/coordinator/stream/MockCoordinatorStreamSystemFactory.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/coordinator/stream/MockCoordinatorStreamSystemFactory.java b/samza-core/src/test/java/org/apache/samza/coordinator/stream/MockCoordinatorStreamSystemFactory.java
index 6413413..e537a91 100644
--- a/samza-core/src/test/java/org/apache/samza/coordinator/stream/MockCoordinatorStreamSystemFactory.java
+++ b/samza-core/src/test/java/org/apache/samza/coordinator/stream/MockCoordinatorStreamSystemFactory.java
@@ -26,8 +26,8 @@ import org.apache.samza.coordinator.stream.messages.CoordinatorStreamMessage;
import org.apache.samza.metrics.MetricsRegistry;
import org.apache.samza.serializers.JsonSerde;
import org.apache.samza.system.*;
+import org.apache.samza.util.CoordinatorStreamUtil;
import org.apache.samza.util.SinglePartitionWithoutOffsetsSystemAdmin;
-import org.apache.samza.util.Util;
import java.io.IOException;
import java.util.ArrayList;
@@ -88,7 +88,7 @@ public class MockCoordinatorStreamSystemFactory implements SystemFactory {
if (jobId == null) {
jobId = "1";
}
- String streamName = Util.getCoordinatorStreamName(jobName, jobId);
+ String streamName = CoordinatorStreamUtil.getCoordinatorStreamName(jobName, jobId);
SystemStreamPartition systemStreamPartition = new SystemStreamPartition(systemName, streamName, new Partition(0));
mockConsumer = new MockCoordinatorStreamWrappedConsumer(systemStreamPartition, config);
return mockConsumer;
@@ -97,7 +97,7 @@ public class MockCoordinatorStreamSystemFactory implements SystemFactory {
private SystemStream getCoordinatorSystemStream(Config config) {
assertNotNull(config.get("job.coordinator.system"));
assertNotNull(config.get("job.name"));
- return new SystemStream(config.get("job.coordinator.system"), Util.getCoordinatorStreamName(config.get("job.name"),
+ return new SystemStream(config.get("job.coordinator.system"), CoordinatorStreamUtil.getCoordinatorStreamName(config.get("job.name"),
config.get("job.id") == null ? "1" : config.get("job.id")));
}
http://git-wip-us.apache.org/repos/asf/samza/blob/5d73ecda/samza-core/src/test/java/org/apache/samza/util/TestMathUtils.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/util/TestMathUtils.java b/samza-core/src/test/java/org/apache/samza/util/TestMathUtils.java
index 46e0735..2f95016 100644
--- a/samza-core/src/test/java/org/apache/samza/util/TestMathUtils.java
+++ b/samza-core/src/test/java/org/apache/samza/util/TestMathUtils.java
@@ -20,43 +20,58 @@
package org.apache.samza.util;
import com.google.common.collect.ImmutableList;
-import junit.framework.Assert;
-import org.apache.samza.operators.util.MathUtils;
-import org.junit.Test;
import java.util.Collections;
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
public class TestMathUtils {
@Test(expected = IllegalArgumentException.class)
public void testGcdWithNullInputs() {
- MathUtils.gcd(null);
+ MathUtil.gcd(null);
}
@Test(expected = IllegalArgumentException.class)
public void testGcdWithEmptyInputs() {
- MathUtils.gcd(Collections.emptyList());
+ MathUtil.gcd(Collections.emptyList());
}
@Test
public void testGcdWithValidInputs() {
// gcd(x, x) = x
- Assert.assertEquals(2, MathUtils.gcd(ImmutableList.of(2L, 2L)));
- Assert.assertEquals(15, MathUtils.gcd(ImmutableList.of(15L)));
- Assert.assertEquals(1, MathUtils.gcd(ImmutableList.of(1L)));
+ assertEquals(2, MathUtil.gcd(ImmutableList.of(2L, 2L)));
+ assertEquals(15, MathUtil.gcd(ImmutableList.of(15L)));
+ assertEquals(1, MathUtil.gcd(ImmutableList.of(1L)));
// gcd(0,x) = x
- Assert.assertEquals(2, MathUtils.gcd(ImmutableList.of(2L, 0L)));
+ assertEquals(2, MathUtil.gcd(ImmutableList.of(2L, 0L)));
// gcd(1,x) = 1
- Assert.assertEquals(1, MathUtils.gcd(ImmutableList.of(10L, 20L, 30L, 40L, 50L, 1L)));
+ assertEquals(1, MathUtil.gcd(ImmutableList.of(10L, 20L, 30L, 40L, 50L, 1L)));
// other happy path test cases
- Assert.assertEquals(10, MathUtils.gcd(ImmutableList.of(10L, 20L, 30L, 40L, 50L, 0L)));
- Assert.assertEquals(10, MathUtils.gcd(ImmutableList.of(10L, 20L, 30L, 40L, 50L)));
- Assert.assertEquals(5, MathUtils.gcd(ImmutableList.of(25L, 35L, 45L, 55L)));
+ assertEquals(10, MathUtil.gcd(ImmutableList.of(10L, 20L, 30L, 40L, 50L, 0L)));
+ assertEquals(10, MathUtil.gcd(ImmutableList.of(10L, 20L, 30L, 40L, 50L)));
+ assertEquals(5, MathUtil.gcd(ImmutableList.of(25L, 35L, 45L, 55L)));
- Assert.assertEquals(1, MathUtils.gcd(ImmutableList.of(25L, 35L, 45L, 55L, 13L)));
+ assertEquals(1, MathUtil.gcd(ImmutableList.of(25L, 35L, 45L, 55L, 13L)));
}
+ @Test
+ public void testClampAdd() {
+ assertEquals(0, MathUtil.clampAdd(0, 0));
+ assertEquals(2, MathUtil.clampAdd(1, 1));
+ assertEquals(-2, MathUtil.clampAdd(-1, -1));
+ assertEquals(Long.MAX_VALUE, MathUtil.clampAdd(Long.MAX_VALUE, 0));
+ assertEquals(Long.MAX_VALUE - 1, MathUtil.clampAdd(Long.MAX_VALUE, -1));
+ assertEquals(Long.MAX_VALUE, MathUtil.clampAdd(Long.MAX_VALUE, 1));
+ assertEquals(Long.MAX_VALUE, MathUtil.clampAdd(Long.MAX_VALUE, Long.MAX_VALUE));
+ assertEquals(Long.MIN_VALUE, MathUtil.clampAdd(Long.MIN_VALUE, 0));
+ assertEquals(Long.MIN_VALUE, MathUtil.clampAdd(Long.MIN_VALUE, -1));
+ assertEquals(Long.MIN_VALUE + 1, MathUtil.clampAdd(Long.MIN_VALUE, 1));
+ assertEquals(Long.MIN_VALUE, MathUtil.clampAdd(Long.MIN_VALUE, Long.MIN_VALUE));
+ assertEquals(-1, MathUtil.clampAdd(Long.MAX_VALUE, Long.MIN_VALUE));
+ }
}
http://git-wip-us.apache.org/repos/asf/samza/blob/5d73ecda/samza-core/src/test/scala/org/apache/samza/config/TestSerializerConfig.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/test/scala/org/apache/samza/config/TestSerializerConfig.scala b/samza-core/src/test/scala/org/apache/samza/config/TestSerializerConfig.scala
new file mode 100644
index 0000000..83c901c
--- /dev/null
+++ b/samza-core/src/test/scala/org/apache/samza/config/TestSerializerConfig.scala
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.config
+
+import org.apache.samza.SamzaException
+import org.apache.samza.serializers.ByteSerdeFactory
+import org.apache.samza.serializers.DoubleSerdeFactory
+import org.apache.samza.serializers.IntegerSerdeFactory
+import org.apache.samza.serializers.JsonSerdeFactory
+import org.apache.samza.serializers.LongSerdeFactory
+import org.apache.samza.config.SerializerConfig.getSerdeFactoryName
+import org.apache.samza.serializers.SerializableSerdeFactory
+import org.apache.samza.serializers.StringSerdeFactory
+import org.junit.Assert.assertEquals
+import org.junit.Assert.assertTrue
+import org.junit.Test
+
+class TestSerializerConfig {
+ @Test
+ def testGetSerdeFactoryName {
+ val config = new MapConfig
+ assertEquals(classOf[ByteSerdeFactory].getName, getSerdeFactoryName("byte"))
+ assertEquals(classOf[IntegerSerdeFactory].getName, getSerdeFactoryName("integer"))
+ assertEquals(classOf[JsonSerdeFactory].getName, getSerdeFactoryName("json"))
+ assertEquals(classOf[LongSerdeFactory].getName, getSerdeFactoryName("long"))
+ assertEquals(classOf[SerializableSerdeFactory[java.io.Serializable@unchecked]].getName, getSerdeFactoryName("serializable"))
+ assertEquals(classOf[StringSerdeFactory].getName, getSerdeFactoryName("string"))
+ assertEquals(classOf[DoubleSerdeFactory].getName, getSerdeFactoryName("double"))
+
+ // throw SamzaException if can not find the correct serde
+ var throwSamzaException = false
+ try {
+ getSerdeFactoryName("otherName")
+ } catch {
+ case e: SamzaException => throwSamzaException = true
+ case _: Exception =>
+ }
+ assertTrue(throwSamzaException)
+ }
+}
http://git-wip-us.apache.org/repos/asf/samza/blob/5d73ecda/samza-core/src/test/scala/org/apache/samza/coordinator/TestJobCoordinator.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/test/scala/org/apache/samza/coordinator/TestJobCoordinator.scala b/samza-core/src/test/scala/org/apache/samza/coordinator/TestJobCoordinator.scala
index cf05b3b..95a0a11 100644
--- a/samza-core/src/test/scala/org/apache/samza/coordinator/TestJobCoordinator.scala
+++ b/samza-core/src/test/scala/org/apache/samza/coordinator/TestJobCoordinator.scala
@@ -25,7 +25,7 @@ import org.apache.samza.checkpoint.TestCheckpointTool.MockCheckpointManagerFacto
import org.apache.samza.job.local.ProcessJobFactory
import org.apache.samza.job.local.ThreadJobFactory
import org.apache.samza.serializers.model.SamzaObjectMapper
-import org.apache.samza.util.Util
+import org.apache.samza.util.{HttpUtil, Util}
import org.junit.{After, Before, Test}
import org.junit.Assert._
@@ -119,8 +119,9 @@ class TestJobCoordinator extends FlatSpec with PrivateMethodTester {
assertEquals(new MapConfig(config.asJava), coordinator.jobModel.getConfig)
assertEquals(expectedJobModel, coordinator.jobModel)
+ val response = HttpUtil.read(coordinator.server.getUrl)
// Verify that the JobServlet is serving the correct jobModel
- val jobModelFromCoordinatorUrl = SamzaObjectMapper.getObjectMapper.readValue(Util.read(coordinator.server.getUrl), classOf[JobModel])
+ val jobModelFromCoordinatorUrl = SamzaObjectMapper.getObjectMapper.readValue(response, classOf[JobModel])
assertEquals(expectedJobModel, jobModelFromCoordinatorUrl)
coordinator.stop
@@ -245,7 +246,7 @@ class TestJobCoordinator extends FlatSpec with PrivateMethodTester {
val systemFactoryClassName = config
.getSystemFactory(systemName)
.getOrElse(throw new SamzaException("A stream uses system %s, which is missing from the configuration." format systemName))
- val systemFactory = Util.getObj[SystemFactory](systemFactoryClassName)
+ val systemFactory = Util.getObj(systemFactoryClassName, classOf[SystemFactory])
systemName -> systemFactory.getAdmin(systemName, config)
}).toMap
http://git-wip-us.apache.org/repos/asf/samza/blob/5d73ecda/samza-core/src/test/scala/org/apache/samza/coordinator/server/TestHttpServer.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/test/scala/org/apache/samza/coordinator/server/TestHttpServer.scala b/samza-core/src/test/scala/org/apache/samza/coordinator/server/TestHttpServer.scala
index f1dcc3d..6ca4070 100644
--- a/samza-core/src/test/scala/org/apache/samza/coordinator/server/TestHttpServer.scala
+++ b/samza-core/src/test/scala/org/apache/samza/coordinator/server/TestHttpServer.scala
@@ -19,11 +19,10 @@
package org.apache.samza.coordinator.server
-import org.apache.samza.util.Util
+import org.apache.samza.util.{HttpUtil, Util}
import org.junit.Assert._
import org.junit.Test
import java.net.URL
-import org.eclipse.jetty.server.Connector
class TestHttpServer {
@Test
@@ -32,9 +31,9 @@ class TestHttpServer {
try {
server.addServlet("/basic", new BasicServlet())
server.start
- val body = Util.read(new URL(server.getUrl + "/basic"))
+ val body = HttpUtil.read(new URL(server.getUrl + "/basic"))
assertEquals("{\"foo\":\"bar\"}", body)
- val css = Util.read(new URL(server.getUrl + "/css/ropa-sans.css"))
+ val css = HttpUtil.read(new URL(server.getUrl + "/css/ropa-sans.css"))
assertTrue(css.contains("RopaSans"))
} finally {
server.stop
http://git-wip-us.apache.org/repos/asf/samza/blob/5d73ecda/samza-core/src/test/scala/org/apache/samza/serializers/TestSerdeManager.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/test/scala/org/apache/samza/serializers/TestSerdeManager.scala b/samza-core/src/test/scala/org/apache/samza/serializers/TestSerdeManager.scala
index 774230c..cd6c5be 100644
--- a/samza-core/src/test/scala/org/apache/samza/serializers/TestSerdeManager.scala
+++ b/samza-core/src/test/scala/org/apache/samza/serializers/TestSerdeManager.scala
@@ -31,6 +31,9 @@ import org.apache.samza.system.SystemStream
import org.apache.samza.system.IncomingMessageEnvelope
import org.apache.samza.system.SystemStreamPartition
import org.apache.samza.Partition
+import org.apache.samza.SamzaException
+import org.apache.samza.config.MapConfig
+import org.apache.samza.util.Util
class TestSerdeManager {
@Test
http://git-wip-us.apache.org/repos/asf/samza/blob/5d73ecda/samza-core/src/test/scala/org/apache/samza/storage/TestTaskStorageManager.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/test/scala/org/apache/samza/storage/TestTaskStorageManager.scala b/samza-core/src/test/scala/org/apache/samza/storage/TestTaskStorageManager.scala
index 90a4c01..bbdb819 100644
--- a/samza-core/src/test/scala/org/apache/samza/storage/TestTaskStorageManager.scala
+++ b/samza-core/src/test/scala/org/apache/samza/storage/TestTaskStorageManager.scala
@@ -29,7 +29,7 @@ import org.apache.samza.container.TaskName
import org.apache.samza.storage.StoreProperties.StorePropertiesBuilder
import org.apache.samza.system.SystemStreamMetadata.SystemStreamPartitionMetadata
import org.apache.samza.system._
-import org.apache.samza.util.{SystemClock, Util}
+import org.apache.samza.util.{FileUtil, SystemClock}
import org.junit.Assert._
import org.junit.{After, Before, Test}
import org.mockito.Matchers._
@@ -56,8 +56,8 @@ class TestTaskStorageManager extends MockitoSugar {
@After
def tearDownTestDirs() {
- Util.rm(TaskStorageManagerBuilder.defaultStoreBaseDir)
- Util.rm(TaskStorageManagerBuilder.defaultLoggedStoreBaseDir)
+ FileUtil.rm(TaskStorageManagerBuilder.defaultStoreBaseDir)
+ FileUtil.rm(TaskStorageManagerBuilder.defaultLoggedStoreBaseDir)
}
/**
@@ -125,7 +125,7 @@ class TestTaskStorageManager extends MockitoSugar {
// Test 2: flush should update the offset file
taskManager.flush()
assertTrue(offsetFile.exists())
- assertEquals("50", Util.readDataFromFile(offsetFile))
+ assertEquals("50", FileUtil.readWithChecksum(offsetFile))
// Test 3: Update sspMetadata before shutdown and verify that offset file is updated correctly
metadata = new SystemStreamMetadata("testStream", new java.util.HashMap[Partition, SystemStreamPartitionMetadata]() {
@@ -142,7 +142,7 @@ class TestTaskStorageManager extends MockitoSugar {
taskManager.stop()
assertTrue(storeFile.exists())
assertTrue(offsetFile.exists())
- assertEquals("100", Util.readDataFromFile(offsetFile))
+ assertEquals("100", FileUtil.readWithChecksum(offsetFile))
// Test 4: Initialize again with an updated sspMetadata; Verify that it restores from the correct offset
@@ -274,7 +274,7 @@ class TestTaskStorageManager extends MockitoSugar {
@Test
def testLoggedStoreDirsWithOffsetFileAreNotDeletedInCleanBaseDirs() {
val offsetFilePath = new File(TaskStorageManager.getStorePartitionDir(TaskStorageManagerBuilder.defaultLoggedStoreBaseDir, loggedStore, taskName), "OFFSET")
- Util.writeDataToFile(offsetFilePath, "100")
+ FileUtil.writeWithChecksum(offsetFilePath, "100")
val taskStorageManager = new TaskStorageManagerBuilder()
.addStore(loggedStore, true)
@@ -296,7 +296,7 @@ class TestTaskStorageManager extends MockitoSugar {
val storeDirectory = TaskStorageManager.getStorePartitionDir(TaskStorageManagerBuilder.defaultLoggedStoreBaseDir, loggedStore, taskName)
val offsetFile = new File(storeDirectory, "OFFSET")
offsetFile.createNewFile()
- Util.writeDataToFile(offsetFile, "Test Offset Data")
+ FileUtil.writeWithChecksum(offsetFile, "Test Offset Data")
offsetFile.setLastModified(0)
val taskStorageManager = new TaskStorageManagerBuilder().addStore(store, false)
.addStore(loggedStore, true)
@@ -315,7 +315,7 @@ class TestTaskStorageManager extends MockitoSugar {
@Test
def testOffsetFileIsRemovedInCleanBaseDirsForInMemoryLoggedStore() {
val offsetFilePath = new File(TaskStorageManager.getStorePartitionDir(TaskStorageManagerBuilder.defaultLoggedStoreBaseDir, loggedStore, taskName), "OFFSET")
- Util.writeDataToFile(offsetFilePath, "100")
+ FileUtil.writeWithChecksum(offsetFilePath, "100")
val taskStorageManager = new TaskStorageManagerBuilder()
.addStore(loggedStore, false)
@@ -352,7 +352,7 @@ class TestTaskStorageManager extends MockitoSugar {
//Check conditions
assertTrue("Offset file doesn't exist!", offsetFilePath.exists())
- assertEquals("Found incorrect value in offset file!", "100", Util.readDataFromFile(offsetFilePath))
+ assertEquals("Found incorrect value in offset file!", "100", FileUtil.readWithChecksum(offsetFilePath))
}
/**
@@ -386,7 +386,7 @@ class TestTaskStorageManager extends MockitoSugar {
//Check conditions
assertTrue("Offset file doesn't exist!", offsetFilePath.exists())
- assertEquals("Found incorrect value in offset file!", "100", Util.readDataFromFile(offsetFilePath))
+ assertEquals("Found incorrect value in offset file!", "100", FileUtil.readWithChecksum(offsetFilePath))
assertTrue("Offset file got created for a store that is not persisted to the disk!!", !anotherOffsetPath.exists())
}
@@ -416,7 +416,7 @@ class TestTaskStorageManager extends MockitoSugar {
//Check conditions
assertTrue("Offset file doesn't exist!", offsetFilePath.exists())
- assertEquals("Found incorrect value in offset file!", "100", Util.readDataFromFile(offsetFilePath))
+ assertEquals("Found incorrect value in offset file!", "100", FileUtil.readWithChecksum(offsetFilePath))
//Invoke test method again
taskStorageManager.flush()
@@ -430,7 +430,7 @@ class TestTaskStorageManager extends MockitoSugar {
val partition = new Partition(0)
val offsetFilePath = new File(TaskStorageManager.getStorePartitionDir(TaskStorageManagerBuilder.defaultLoggedStoreBaseDir, loggedStore, taskName) + File.separator + "OFFSET")
- Util.writeDataToFile(offsetFilePath, "100")
+ FileUtil.writeWithChecksum(offsetFilePath, "100")
val mockSystemAdmin = mock[SystemAdmin]
var mockSspMetadata = Map("testStream" -> new SystemStreamMetadata("testStream" , Map(partition -> new SystemStreamPartitionMetadata("20", "139", "140")).asJava))
@@ -449,7 +449,7 @@ class TestTaskStorageManager extends MockitoSugar {
//Check conditions
assertTrue("Offset file doesn't exist!", offsetFilePath.exists())
- assertEquals("Found incorrect value in offset file!", "139", Util.readDataFromFile(offsetFilePath))
+ assertEquals("Found incorrect value in offset file!", "139", FileUtil.readWithChecksum(offsetFilePath))
// Flush again
mockSspMetadata = Map("testStream" -> new SystemStreamMetadata("testStream" , Map(partition -> new SystemStreamPartitionMetadata("20", "193", "194")).asJava))
@@ -461,7 +461,7 @@ class TestTaskStorageManager extends MockitoSugar {
//Check conditions
assertTrue("Offset file doesn't exist!", offsetFilePath.exists())
- assertEquals("Found incorrect value in offset file!", "193", Util.readDataFromFile(offsetFilePath))
+ assertEquals("Found incorrect value in offset file!", "193", FileUtil.readWithChecksum(offsetFilePath))
}
@Test
@@ -556,7 +556,7 @@ class TestTaskStorageManager extends MockitoSugar {
if (writeOffsetFile) {
val offsetFile = new File(storeDirectory, "OFFSET")
if (fileOffset != null) {
- Util.writeDataToFile(offsetFile, fileOffset)
+ FileUtil.writeWithChecksum(offsetFile, fileOffset)
} else {
// Write garbage to produce a null result when it's read
val fos = new FileOutputStream(offsetFile)
http://git-wip-us.apache.org/repos/asf/samza/blob/5d73ecda/samza-core/src/test/scala/org/apache/samza/util/TestFileUtil.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/test/scala/org/apache/samza/util/TestFileUtil.scala b/samza-core/src/test/scala/org/apache/samza/util/TestFileUtil.scala
new file mode 100644
index 0000000..5bb6da7
--- /dev/null
+++ b/samza-core/src/test/scala/org/apache/samza/util/TestFileUtil.scala
@@ -0,0 +1,84 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.samza.util
+
+import java.io.{File, FileInputStream, FileOutputStream, ObjectInputStream, ObjectOutputStream}
+
+import org.junit.Assert.{assertEquals, assertNull, assertTrue}
+import org.junit.Test
+
+class TestFileUtil {
+ val data = "100"
+ val checksum = FileUtil.getChecksum(data)
+ val file = new File(System.getProperty("java.io.tmpdir"), "test")
+
+ @Test
+ def testWriteDataToFile() {
+ // Invoke test
+ FileUtil.writeWithChecksum(file, data)
+
+ // Check that file exists
+ assertTrue("File was not created!", file.exists())
+ val fis = new FileInputStream(file)
+ val ois = new ObjectInputStream(fis)
+
+ // Check content of the file is as expected
+ assertEquals(checksum, ois.readLong())
+ assertEquals(data, ois.readUTF())
+ ois.close()
+ fis.close()
+ }
+
+ @Test
+ def testReadDataFromFile() {
+ // Setup
+ val fos = new FileOutputStream(file)
+ val oos = new ObjectOutputStream(fos)
+ oos.writeLong(checksum)
+ oos.writeUTF(data)
+ oos.close()
+ fos.close()
+
+ // Invoke test
+ val result = FileUtil.readWithChecksum(file)
+
+ // Check data returned
+ assertEquals(data, result)
+ }
+
+ @Test
+ def testReadInvalidDataFromFile() {
+ // Write garbage to produce a null result when it's read
+ val fos = new FileOutputStream(file)
+ val oos = new ObjectOutputStream(fos)
+ oos.writeLong(1)
+ oos.writeUTF("Junk Data")
+ oos.close()
+ fos.close()
+
+ // Invoke test
+ val result = FileUtil.readWithChecksum(file)
+
+ // Check data returned
+ assertNull(result)
+ }
+}
http://git-wip-us.apache.org/repos/asf/samza/blob/5d73ecda/samza-core/src/test/scala/org/apache/samza/util/TestUtil.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/test/scala/org/apache/samza/util/TestUtil.scala b/samza-core/src/test/scala/org/apache/samza/util/TestUtil.scala
index fae735b..f0b8a17 100644
--- a/samza-core/src/test/scala/org/apache/samza/util/TestUtil.scala
+++ b/samza-core/src/test/scala/org/apache/samza/util/TestUtil.scala
@@ -19,7 +19,6 @@
package org.apache.samza.util
-import java.io._
import org.junit.Assert._
import org.junit.Test
import org.apache.samza.config.MapConfig
@@ -27,116 +26,21 @@ import org.apache.samza.serializers._
import org.apache.samza.SamzaException
class TestUtil {
-
- val data = "100"
- val checksum = Util.getChecksumValue(data)
- val file = new File(System.getProperty("java.io.tmpdir"), "test")
-
- @Test
- def testWriteDataToFile() {
- // Invoke test
- Util.writeDataToFile(file, data)
-
- // Check that file exists
- assertTrue("File was not created!", file.exists())
- val fis = new FileInputStream(file)
- val ois = new ObjectInputStream(fis)
-
- // Check content of the file is as expected
- assertEquals(checksum, ois.readLong())
- assertEquals(data, ois.readUTF())
- ois.close()
- fis.close()
- }
-
- @Test
- def testReadDataFromFile() {
- // Setup
- val fos = new FileOutputStream(file)
- val oos = new ObjectOutputStream(fos)
- oos.writeLong(checksum)
- oos.writeUTF(data)
- oos.close()
- fos.close()
-
- // Invoke test
- val result = Util.readDataFromFile(file)
-
- // Check data returned
- assertEquals(data, result)
- }
-
- @Test
- def testReadInvalidDataFromFile() {
- // Write garbage to produce a null result when it's read
- val fos = new FileOutputStream(file)
- val oos = new ObjectOutputStream(fos)
- oos.writeLong(1)
- oos.writeUTF("Junk Data")
- oos.close()
- fos.close()
-
- // Invoke test
- val result = Util.readDataFromFile(file)
-
- // Check data returned
- assertNull(result)
- }
-
@Test
def testGetLocalHost(): Unit = {
assertNotNull(Util.getLocalHost)
}
@Test
- def testDefaultSerdeFactoryFromSerdeName {
- import Util._
- val config = new MapConfig
- assertEquals(classOf[ByteSerdeFactory].getName, defaultSerdeFactoryFromSerdeName("byte"))
- assertEquals(classOf[IntegerSerdeFactory].getName, defaultSerdeFactoryFromSerdeName("integer"))
- assertEquals(classOf[JsonSerdeFactory].getName, defaultSerdeFactoryFromSerdeName("json"))
- assertEquals(classOf[LongSerdeFactory].getName, defaultSerdeFactoryFromSerdeName("long"))
- assertEquals(classOf[SerializableSerdeFactory[java.io.Serializable@unchecked]].getName, defaultSerdeFactoryFromSerdeName("serializable"))
- assertEquals(classOf[StringSerdeFactory].getName, defaultSerdeFactoryFromSerdeName("string"))
- assertEquals(classOf[DoubleSerdeFactory].getName, defaultSerdeFactoryFromSerdeName("double"))
-
- // throw SamzaException if can not find the correct serde
- var throwSamzaException = false
- try {
- defaultSerdeFactoryFromSerdeName("otherName")
- } catch {
- case e: SamzaException => throwSamzaException = true
- case _: Exception =>
- }
- assertTrue(throwSamzaException)
- }
-
- @Test
- def testClampAdd() {
- assertEquals(0, Util.clampAdd(0, 0))
- assertEquals(2, Util.clampAdd(1, 1))
- assertEquals(-2, Util.clampAdd(-1, -1))
- assertEquals(Long.MaxValue, Util.clampAdd(Long.MaxValue, 0))
- assertEquals(Long.MaxValue - 1, Util.clampAdd(Long.MaxValue, -1))
- assertEquals(Long.MaxValue, Util.clampAdd(Long.MaxValue, 1))
- assertEquals(Long.MaxValue, Util.clampAdd(Long.MaxValue, Long.MaxValue))
- assertEquals(Long.MinValue, Util.clampAdd(Long.MinValue, 0))
- assertEquals(Long.MinValue, Util.clampAdd(Long.MinValue, -1))
- assertEquals(Long.MinValue + 1, Util.clampAdd(Long.MinValue, 1))
- assertEquals(Long.MinValue, Util.clampAdd(Long.MinValue, Long.MinValue))
- assertEquals(-1, Util.clampAdd(Long.MaxValue, Long.MinValue))
- }
-
- @Test
def testGetObjExistingClass() {
- val obj = Util.getObj[MapConfig]("org.apache.samza.config.MapConfig")
+ val obj = Util.getObj("org.apache.samza.config.MapConfig", classOf[MapConfig])
assertNotNull(obj)
assertEquals(classOf[MapConfig], obj.getClass())
}
@Test(expected = classOf[ClassNotFoundException])
def testGetObjNonexistentClass() {
- Util.getObj("this.class.does.NotExist")
+ Util.getObj("this.class.does.NotExist", classOf[Object])
assert(false, "This should not get hit.")
}
}
http://git-wip-us.apache.org/repos/asf/samza/blob/5d73ecda/samza-elasticsearch/src/main/java/org/apache/samza/system/elasticsearch/ElasticsearchSystemFactory.java
----------------------------------------------------------------------
diff --git a/samza-elasticsearch/src/main/java/org/apache/samza/system/elasticsearch/ElasticsearchSystemFactory.java b/samza-elasticsearch/src/main/java/org/apache/samza/system/elasticsearch/ElasticsearchSystemFactory.java
index 074323f..5c8328c 100644
--- a/samza-elasticsearch/src/main/java/org/apache/samza/system/elasticsearch/ElasticsearchSystemFactory.java
+++ b/samza-elasticsearch/src/main/java/org/apache/samza/system/elasticsearch/ElasticsearchSystemFactory.java
@@ -82,7 +82,7 @@ public class ElasticsearchSystemFactory implements SystemFactory {
protected static IndexRequestFactory getIndexRequestFactory(ElasticsearchConfig config) {
if (config.getIndexRequestFactoryClassName().isPresent()) {
- return (IndexRequestFactory) Util.getObj(config.getIndexRequestFactoryClassName().get());
+ return Util.getObj(config.getIndexRequestFactoryClassName().get(), IndexRequestFactory.class);
} else {
return new DefaultIndexRequestFactory();
}
http://git-wip-us.apache.org/repos/asf/samza/blob/5d73ecda/samza-hdfs/src/main/scala/org/apache/samza/system/hdfs/HdfsSystemProducer.scala
----------------------------------------------------------------------
diff --git a/samza-hdfs/src/main/scala/org/apache/samza/system/hdfs/HdfsSystemProducer.scala b/samza-hdfs/src/main/scala/org/apache/samza/system/hdfs/HdfsSystemProducer.scala
index 79bca5b..16de121 100644
--- a/samza-hdfs/src/main/scala/org/apache/samza/system/hdfs/HdfsSystemProducer.scala
+++ b/samza-hdfs/src/main/scala/org/apache/samza/system/hdfs/HdfsSystemProducer.scala
@@ -24,14 +24,14 @@ import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.FileSystem
import org.apache.samza.system.hdfs.writer.HdfsWriter
import org.apache.samza.system.{OutgoingMessageEnvelope, SystemProducer}
-import org.apache.samza.util.{Logging, TimerUtils}
+import org.apache.samza.util.{Logging, TimerUtil}
import scala.collection.mutable.{Map => MMap}
class HdfsSystemProducer(
systemName: String, clientId: String, config: HdfsConfig, metrics: HdfsSystemProducerMetrics,
- val clock: () => Long = () => System.currentTimeMillis) extends SystemProducer with Logging with TimerUtils {
+ val clock: () => Long = () => System.currentTimeMillis) extends SystemProducer with Logging with TimerUtil {
val dfs = FileSystem.newInstance(new Configuration(true))
val writers: MMap[String, HdfsWriter[_]] = MMap.empty[String, HdfsWriter[_]]
private val lock = new Object //synchronization lock for thread safe access
http://git-wip-us.apache.org/repos/asf/samza/blob/5d73ecda/samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManagerFactory.scala
----------------------------------------------------------------------
diff --git a/samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManagerFactory.scala b/samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManagerFactory.scala
index 48d6671..8d4098f 100644
--- a/samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManagerFactory.scala
+++ b/samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManagerFactory.scala
@@ -42,7 +42,7 @@ class KafkaCheckpointManagerFactory extends CheckpointManagerFactory with Loggin
.getSystemFactory(checkpointSystemName)
.getOrElse(throw new SamzaException("Missing configuration: " + SystemConfig.SYSTEM_FACTORY format checkpointSystemName))
- val checkpointSystemFactory = Util.getObj[SystemFactory](checkpointSystemFactoryName)
+ val checkpointSystemFactory = Util.getObj(checkpointSystemFactoryName, classOf[SystemFactory])
val checkpointTopic = KafkaUtil.getCheckpointTopic(jobName, jobId, config)
info(s"Creating a KafkaCheckpointManager to consume from $checkpointTopic")
http://git-wip-us.apache.org/repos/asf/samza/blob/5d73ecda/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemProducer.scala
----------------------------------------------------------------------
diff --git a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemProducer.scala b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemProducer.scala
index 9eaf895..2a17df8 100644
--- a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemProducer.scala
+++ b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemProducer.scala
@@ -34,14 +34,14 @@ import org.apache.samza.system.SystemProducerException
import org.apache.samza.util.ExponentialSleepStrategy
import org.apache.samza.util.KafkaUtil
import org.apache.samza.util.Logging
-import org.apache.samza.util.TimerUtils
+import org.apache.samza.util.TimerUtil
class KafkaSystemProducer(systemName: String,
retryBackoff: ExponentialSleepStrategy = new ExponentialSleepStrategy,
getProducer: () => Producer[Array[Byte], Array[Byte]],
metrics: KafkaSystemProducerMetrics,
val clock: () => Long = () => System.nanoTime,
- val dropProducerExceptions: Boolean = false) extends SystemProducer with Logging with TimerUtils {
+ val dropProducerExceptions: Boolean = false) extends SystemProducer with Logging with TimerUtil {
// Represents a fatal error that caused the producer to close.
val fatalException: AtomicReference[SystemProducerException] = new AtomicReference[SystemProducerException]()
http://git-wip-us.apache.org/repos/asf/samza/blob/5d73ecda/samza-kafka/src/test/java/org/apache/samza/system/kafka/TestKafkaSystemAdminJava.java
----------------------------------------------------------------------
diff --git a/samza-kafka/src/test/java/org/apache/samza/system/kafka/TestKafkaSystemAdminJava.java b/samza-kafka/src/test/java/org/apache/samza/system/kafka/TestKafkaSystemAdminJava.java
index 51af518..125cf61 100644
--- a/samza-kafka/src/test/java/org/apache/samza/system/kafka/TestKafkaSystemAdminJava.java
+++ b/samza-kafka/src/test/java/org/apache/samza/system/kafka/TestKafkaSystemAdminJava.java
@@ -27,7 +27,7 @@ import kafka.api.TopicMetadata;
import org.apache.samza.system.StreamSpec;
import org.apache.samza.system.StreamValidationException;
import org.apache.samza.system.SystemAdmin;
-import org.apache.samza.util.Util;
+import org.apache.samza.util.ScalaJavaUtil;
import org.junit.Test;
import org.mockito.Mockito;
@@ -58,7 +58,7 @@ public class TestKafkaSystemAdminJava extends TestKafkaSystemAdmin {
Properties coordProps = new Properties();
Map<String, ChangelogInfo> changeLogMap = new HashMap<>();
- KafkaSystemAdmin admin = Mockito.spy(createSystemAdmin(coordProps, 1, Util.javaMapAsScalaMap(changeLogMap)));
+ KafkaSystemAdmin admin = Mockito.spy(createSystemAdmin(coordProps, 1, ScalaJavaUtil.toScalaMap(changeLogMap)));
StreamSpec spec = StreamSpec.createCoordinatorStreamSpec(STREAM, SYSTEM());
Mockito.doAnswer(invocationOnMock -> {
@@ -90,7 +90,7 @@ public class TestKafkaSystemAdminJava extends TestKafkaSystemAdmin {
Map<String, ChangelogInfo> changeLogMap = new HashMap<>();
changeLogMap.put(STREAM, new ChangelogInfo(REP_FACTOR, changeLogProps));
- KafkaSystemAdmin admin = Mockito.spy(createSystemAdmin(coordProps, 1, Util.javaMapAsScalaMap(changeLogMap)));
+ KafkaSystemAdmin admin = Mockito.spy(createSystemAdmin(coordProps, 1, ScalaJavaUtil.toScalaMap(changeLogMap)));
StreamSpec spec = StreamSpec.createChangeLogStreamSpec(STREAM, SYSTEM(), PARTITIONS);
Mockito.doAnswer(invocationOnMock -> {
@@ -123,7 +123,7 @@ public class TestKafkaSystemAdminJava extends TestKafkaSystemAdmin {
Map<String, ChangelogInfo> changeLogMap = new HashMap<>();
changeLogMap.put(STREAM, new ChangelogInfo(REP_FACTOR, changeLogProps));
- KafkaSystemAdmin admin = Mockito.spy(createSystemAdmin(coordProps, 1, Util.javaMapAsScalaMap(changeLogMap)));
+ KafkaSystemAdmin admin = Mockito.spy(createSystemAdmin(coordProps, 1, ScalaJavaUtil.toScalaMap(changeLogMap)));
StreamSpec spec = StreamSpec.createChangeLogStreamSpec(STREAM, SYSTEM(), PARTITIONS);
Mockito.doAnswer(invocationOnMock -> {
StreamSpec internalSpec = (StreamSpec) invocationOnMock.callRealMethod();
http://git-wip-us.apache.org/repos/asf/samza/blob/5d73ecda/samza-kafka/src/test/scala/org/apache/samza/checkpoint/kafka/TestKafkaCheckpointManager.scala
----------------------------------------------------------------------
diff --git a/samza-kafka/src/test/scala/org/apache/samza/checkpoint/kafka/TestKafkaCheckpointManager.scala b/samza-kafka/src/test/scala/org/apache/samza/checkpoint/kafka/TestKafkaCheckpointManager.scala
index 03b0d2c..71718b0 100644
--- a/samza-kafka/src/test/scala/org/apache/samza/checkpoint/kafka/TestKafkaCheckpointManager.scala
+++ b/samza-kafka/src/test/scala/org/apache/samza/checkpoint/kafka/TestKafkaCheckpointManager.scala
@@ -191,7 +191,7 @@ class TestKafkaCheckpointManager extends KafkaServerTestHarness {
.getSystemFactory(systemName)
.getOrElse(throw new SamzaException("Missing configuration: " + SystemConfig.SYSTEM_FACTORY format systemName))
- val systemFactory = Util.getObj[SystemFactory](systemFactoryClassName)
+ val systemFactory = Util.getObj(systemFactoryClassName, classOf[SystemFactory])
val spec = new KafkaStreamSpec("id", cpTopic, checkpointSystemName, 1, 1, false, props)
new KafkaCheckpointManager(spec, systemFactory, failOnTopicValidation, config, new NoOpMetricsRegistry, serde)
http://git-wip-us.apache.org/repos/asf/samza/blob/5d73ecda/samza-kv-rocksdb/src/main/java/org/apache/samza/storage/kv/RocksDbKeyValueReader.java
----------------------------------------------------------------------
diff --git a/samza-kv-rocksdb/src/main/java/org/apache/samza/storage/kv/RocksDbKeyValueReader.java b/samza-kv-rocksdb/src/main/java/org/apache/samza/storage/kv/RocksDbKeyValueReader.java
index 1fa78f8..9dca23c 100644
--- a/samza-kv-rocksdb/src/main/java/org/apache/samza/storage/kv/RocksDbKeyValueReader.java
+++ b/samza-kv-rocksdb/src/main/java/org/apache/samza/storage/kv/RocksDbKeyValueReader.java
@@ -25,6 +25,7 @@ import org.apache.samza.SamzaException;
import org.apache.samza.config.Config;
import org.apache.samza.config.JavaSerializerConfig;
import org.apache.samza.config.JavaStorageConfig;
+import org.apache.samza.config.SerializerConfig$;
import org.apache.samza.container.SamzaContainerContext;
import org.apache.samza.container.TaskName;
import org.apache.samza.metrics.MetricsRegistryMap;
@@ -125,8 +126,8 @@ public class RocksDbKeyValueReader {
private Serde<Object> getSerdeFromName(String name, JavaSerializerConfig serializerConfig) {
String serdeClassName = serializerConfig.getSerdeClass(name);
if (serdeClassName == null) {
- serdeClassName = Util.defaultSerdeFactoryFromSerdeName(name);
+ serdeClassName = SerializerConfig$.MODULE$.getSerdeFactoryName(name);
}
- return Util.<SerdeFactory<Object>> getObj(serdeClassName).getSerde(name, serializerConfig);
+ return Util.getObj(serdeClassName, SerdeFactory.class).getSerde(name, serializerConfig);
}
}
http://git-wip-us.apache.org/repos/asf/samza/blob/5d73ecda/samza-kv-rocksdb/src/main/scala/org/apache/samza/storage/kv/RocksDbKeyValueStore.scala
----------------------------------------------------------------------
diff --git a/samza-kv-rocksdb/src/main/scala/org/apache/samza/storage/kv/RocksDbKeyValueStore.scala b/samza-kv-rocksdb/src/main/scala/org/apache/samza/storage/kv/RocksDbKeyValueStore.scala
index eae7da2..856cc4e 100644
--- a/samza-kv-rocksdb/src/main/scala/org/apache/samza/storage/kv/RocksDbKeyValueStore.scala
+++ b/samza-kv-rocksdb/src/main/scala/org/apache/samza/storage/kv/RocksDbKeyValueStore.scala
@@ -20,12 +20,13 @@
package org.apache.samza.storage.kv
import java.io.File
+import java.util.Comparator
import java.util.concurrent.TimeUnit
import java.util.concurrent.locks.ReentrantReadWriteLock
import org.apache.samza.SamzaException
import org.apache.samza.config.Config
-import org.apache.samza.util.{LexicographicComparator, Logging}
+import org.apache.samza.util.Logging
import org.rocksdb.{TtlDB, _}
object RocksDbKeyValueStore extends Logging {
@@ -301,4 +302,21 @@ class RocksDbKeyValueStore(
super.hasNext() && comparator.compare(peekKey(), to) < 0
}
}
+
+ /**
+ * A comparator that applies a lexicographical comparison on byte arrays.
+ */
+ class LexicographicComparator extends Comparator[Array[Byte]] {
+ def compare(k1: Array[Byte], k2: Array[Byte]): Int = {
+ val l = math.min(k1.length, k2.length)
+ var i = 0
+ while (i < l) {
+ if (k1(i) != k2(i))
+ return (k1(i) & 0xff) - (k2(i) & 0xff)
+ i += 1
+ }
+ // okay prefixes are equal, the shorter array is less
+ k1.length - k2.length
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/samza/blob/5d73ecda/samza-kv/src/main/scala/org/apache/samza/storage/kv/BaseKeyValueStorageEngineFactory.scala
----------------------------------------------------------------------
diff --git a/samza-kv/src/main/scala/org/apache/samza/storage/kv/BaseKeyValueStorageEngineFactory.scala b/samza-kv/src/main/scala/org/apache/samza/storage/kv/BaseKeyValueStorageEngineFactory.scala
index e3a2970..da80560 100644
--- a/samza-kv/src/main/scala/org/apache/samza/storage/kv/BaseKeyValueStorageEngineFactory.scala
+++ b/samza-kv/src/main/scala/org/apache/samza/storage/kv/BaseKeyValueStorageEngineFactory.scala
@@ -25,12 +25,11 @@ import org.apache.samza.SamzaException
import org.apache.samza.container.SamzaContainerContext
import org.apache.samza.metrics.MetricsRegistry
import org.apache.samza.serializers.Serde
-import org.apache.samza.storage.{StoreProperties, StorageEngine, StorageEngineFactory}
+import org.apache.samza.storage.{StorageEngine, StorageEngineFactory, StoreProperties}
import org.apache.samza.system.SystemStreamPartition
import org.apache.samza.task.MessageCollector
import org.apache.samza.config.MetricsConfig.Config2Metrics
-import org.apache.samza.util.HighResolutionClock
-import org.apache.samza.util.Util.asScalaClock
+import org.apache.samza.util.{HighResolutionClock, ScalaJavaUtil}
/**
* A key value storage engine factory implementation
@@ -152,7 +151,8 @@ trait BaseKeyValueStorageEngineFactory[K, V] extends StorageEngineFactory[K, V]
}
}
- new KeyValueStorageEngine(storePropertiesBuilder.build(), nullSafeStore, rawStore, keyValueStorageEngineMetrics, batchSize, clock)
+ new KeyValueStorageEngine(storePropertiesBuilder.build(), nullSafeStore, rawStore,
+ keyValueStorageEngineMetrics, batchSize, () => clock.nanoTime())
}
}
http://git-wip-us.apache.org/repos/asf/samza/blob/5d73ecda/samza-kv/src/main/scala/org/apache/samza/storage/kv/KeyValueStorageEngine.scala
----------------------------------------------------------------------
diff --git a/samza-kv/src/main/scala/org/apache/samza/storage/kv/KeyValueStorageEngine.scala b/samza-kv/src/main/scala/org/apache/samza/storage/kv/KeyValueStorageEngine.scala
index 373e18a..5f7bbd8 100644
--- a/samza-kv/src/main/scala/org/apache/samza/storage/kv/KeyValueStorageEngine.scala
+++ b/samza-kv/src/main/scala/org/apache/samza/storage/kv/KeyValueStorageEngine.scala
@@ -22,7 +22,7 @@ package org.apache.samza.storage.kv
import org.apache.samza.util.Logging
import org.apache.samza.storage.{StoreProperties, StorageEngine}
import org.apache.samza.system.IncomingMessageEnvelope
-import org.apache.samza.util.TimerUtils
+import org.apache.samza.util.TimerUtil
import scala.collection.JavaConverters._
@@ -37,7 +37,7 @@ class KeyValueStorageEngine[K, V](
rawStore: KeyValueStore[Array[Byte], Array[Byte]],
metrics: KeyValueStorageEngineMetrics = new KeyValueStorageEngineMetrics,
batchSize: Int = 500,
- val clock: () => Long = { System.nanoTime }) extends StorageEngine with KeyValueStore[K, V] with TimerUtils with Logging {
+ val clock: () => Long = { System.nanoTime }) extends StorageEngine with KeyValueStore[K, V] with TimerUtil with Logging {
var count = 0
http://git-wip-us.apache.org/repos/asf/samza/blob/5d73ecda/samza-kv/src/main/scala/org/apache/samza/storage/kv/NullSafeKeyValueStore.scala
----------------------------------------------------------------------
diff --git a/samza-kv/src/main/scala/org/apache/samza/storage/kv/NullSafeKeyValueStore.scala b/samza-kv/src/main/scala/org/apache/samza/storage/kv/NullSafeKeyValueStore.scala
index 3cc35d3..7adffa9 100644
--- a/samza-kv/src/main/scala/org/apache/samza/storage/kv/NullSafeKeyValueStore.scala
+++ b/samza-kv/src/main/scala/org/apache/samza/storage/kv/NullSafeKeyValueStore.scala
@@ -19,8 +19,6 @@
package org.apache.samza.storage.kv
-import org.apache.samza.util.Util.notNull
-
import scala.collection.JavaConverters._
object NullSafeKeyValueStore {
@@ -85,4 +83,10 @@ class NullSafeKeyValueStore[K, V](store: KeyValueStore[K, V]) extends KeyValueSt
def close {
store.close
}
+
+ private def notNull[T](obj: T, msg: String) = {
+ if (obj == null) {
+ throw new NullPointerException(msg)
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/samza/blob/5d73ecda/samza-log4j/src/main/java/org/apache/samza/logging/log4j/StreamAppender.java
----------------------------------------------------------------------
diff --git a/samza-log4j/src/main/java/org/apache/samza/logging/log4j/StreamAppender.java b/samza-log4j/src/main/java/org/apache/samza/logging/log4j/StreamAppender.java
index ec63358..ab29b71 100644
--- a/samza-log4j/src/main/java/org/apache/samza/logging/log4j/StreamAppender.java
+++ b/samza-log4j/src/main/java/org/apache/samza/logging/log4j/StreamAppender.java
@@ -50,6 +50,8 @@ import org.apache.samza.system.SystemAdmin;
import org.apache.samza.system.SystemFactory;
import org.apache.samza.system.SystemProducer;
import org.apache.samza.system.SystemStream;
+import org.apache.samza.util.ExponentialSleepStrategy;
+import org.apache.samza.util.HttpUtil;
import org.apache.samza.util.Util;
/**
@@ -267,9 +269,8 @@ public class StreamAppender extends AppenderSkeleton {
config = JobModelManager.currentJobModelManager().jobModel().getConfig();
} else {
String url = System.getenv(ShellCommandConfig.ENV_COORDINATOR_URL());
- config = SamzaObjectMapper.getObjectMapper()
- .readValue(Util.read(new URL(url), 30000), JobModel.class)
- .getConfig();
+ String response = HttpUtil.read(new URL(url), 30000, new ExponentialSleepStrategy());
+ config = SamzaObjectMapper.getObjectMapper().readValue(response, JobModel.class).getConfig();
}
} catch (IOException e) {
throw new SamzaException("can not read the config", e);
@@ -294,7 +295,7 @@ public class StreamAppender extends AppenderSkeleton {
String systemName = log4jSystemConfig.getSystemName();
String systemFactoryName = log4jSystemConfig.getSystemFactory(systemName);
if (systemFactoryName != null) {
- systemFactory = Util.<SystemFactory>getObj(systemFactoryName);
+ systemFactory = Util.getObj(systemFactoryName, SystemFactory.class);
} else {
throw new SamzaException("Could not figure out \"" + systemName + "\" system factory for log4j StreamAppender to use");
}
@@ -388,7 +389,7 @@ public class StreamAppender extends AppenderSkeleton {
}
if (serdeClass != null) {
- SerdeFactory<LoggingEvent> serdeFactory = Util.<SerdeFactory<LoggingEvent>>getObj(serdeClass);
+ SerdeFactory<LoggingEvent> serdeFactory = Util.getObj(serdeClass, SerdeFactory.class);
serde = serdeFactory.getSerde(systemName, config);
} else {
String serdeKey = String.format(SerializerConfig.SERDE_FACTORY_CLASS(), serdeName);
http://git-wip-us.apache.org/repos/asf/samza/blob/5d73ecda/samza-rest/src/main/java/org/apache/samza/monitor/MonitorLoader.java
----------------------------------------------------------------------
diff --git a/samza-rest/src/main/java/org/apache/samza/monitor/MonitorLoader.java b/samza-rest/src/main/java/org/apache/samza/monitor/MonitorLoader.java
index ff1268c..7ca9b35 100644
--- a/samza-rest/src/main/java/org/apache/samza/monitor/MonitorLoader.java
+++ b/samza-rest/src/main/java/org/apache/samza/monitor/MonitorLoader.java
@@ -19,7 +19,7 @@
package org.apache.samza.monitor;
import org.apache.samza.metrics.MetricsRegistry;
-import org.apache.samza.util.ClassLoaderHelper;
+import org.apache.samza.util.Util;
/**
* Helper class that instantiates the Monitor.
@@ -30,7 +30,7 @@ public class MonitorLoader {
throws InstantiationException {
String factoryClass = monitorConfig.getMonitorFactoryClass();
try {
- MonitorFactory monitorFactory = ClassLoaderHelper.fromClassName(factoryClass);
+ MonitorFactory monitorFactory = Util.getObj(factoryClass, MonitorFactory.class);
return monitorFactory.getMonitorInstance(monitorName, monitorConfig, metricsRegistry);
} catch (Exception e) {
throw (InstantiationException)
http://git-wip-us.apache.org/repos/asf/samza/blob/5d73ecda/samza-rest/src/main/java/org/apache/samza/rest/SamzaRestApplication.java
----------------------------------------------------------------------
diff --git a/samza-rest/src/main/java/org/apache/samza/rest/SamzaRestApplication.java b/samza-rest/src/main/java/org/apache/samza/rest/SamzaRestApplication.java
index a6e0bb0..45b6a39 100644
--- a/samza-rest/src/main/java/org/apache/samza/rest/SamzaRestApplication.java
+++ b/samza-rest/src/main/java/org/apache/samza/rest/SamzaRestApplication.java
@@ -24,7 +24,7 @@ import org.apache.samza.SamzaException;
import org.apache.samza.config.Config;
import org.apache.samza.rest.resources.DefaultResourceFactory;
import org.apache.samza.rest.resources.ResourceFactory;
-import org.apache.samza.util.ClassLoaderHelper;
+import org.apache.samza.util.Util;
import org.codehaus.jackson.jaxrs.JacksonJsonProvider;
import org.glassfish.jersey.server.ResourceConfig;
import org.slf4j.Logger;
@@ -84,7 +84,7 @@ public class SamzaRestApplication extends ResourceConfig {
private Collection<? extends Object> instantiateFactoryResources(String factoryClassName, Config config)
throws InstantiationException {
try {
- ResourceFactory factory = ClassLoaderHelper.<ResourceFactory>fromClassName(factoryClassName);
+ ResourceFactory factory = Util.getObj(factoryClassName, ResourceFactory.class);
return factory.getResourceInstances(config);
} catch (Exception e) {
throw (InstantiationException)
http://git-wip-us.apache.org/repos/asf/samza/blob/5d73ecda/samza-rest/src/main/java/org/apache/samza/rest/proxy/job/AbstractJobProxy.java
----------------------------------------------------------------------
diff --git a/samza-rest/src/main/java/org/apache/samza/rest/proxy/job/AbstractJobProxy.java b/samza-rest/src/main/java/org/apache/samza/rest/proxy/job/AbstractJobProxy.java
index 492385f..19e006f 100644
--- a/samza-rest/src/main/java/org/apache/samza/rest/proxy/job/AbstractJobProxy.java
+++ b/samza-rest/src/main/java/org/apache/samza/rest/proxy/job/AbstractJobProxy.java
@@ -27,7 +27,7 @@ import org.apache.samza.SamzaException;
import org.apache.samza.rest.model.Job;
import org.apache.samza.rest.model.JobStatus;
import org.apache.samza.rest.resources.JobsResourceConfig;
-import org.apache.samza.util.ClassLoaderHelper;
+import org.apache.samza.util.Util;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -47,10 +47,10 @@ public abstract class AbstractJobProxy implements JobProxy {
* @return the JobProxy produced by the factory.
*/
public static JobProxy fromFactory(JobsResourceConfig config) {
- String jobProxyFactory = config.getJobProxyFactory();
- if (jobProxyFactory != null && !jobProxyFactory.isEmpty()) {
+ String jobProxyFactoryClassName = config.getJobProxyFactory();
+ if (jobProxyFactoryClassName != null && !jobProxyFactoryClassName.isEmpty()) {
try {
- JobProxyFactory factory = ClassLoaderHelper.fromClassName(jobProxyFactory);
+ JobProxyFactory factory = Util.getObj(jobProxyFactoryClassName, JobProxyFactory.class);
return factory.getJobProxy(config);
} catch (Exception e) {
throw new SamzaException(e);
http://git-wip-us.apache.org/repos/asf/samza/blob/5d73ecda/samza-rest/src/main/java/org/apache/samza/rest/proxy/job/SimpleYarnJobProxy.java
----------------------------------------------------------------------
diff --git a/samza-rest/src/main/java/org/apache/samza/rest/proxy/job/SimpleYarnJobProxy.java b/samza-rest/src/main/java/org/apache/samza/rest/proxy/job/SimpleYarnJobProxy.java
index fbddb30..fd8709f 100644
--- a/samza-rest/src/main/java/org/apache/samza/rest/proxy/job/SimpleYarnJobProxy.java
+++ b/samza-rest/src/main/java/org/apache/samza/rest/proxy/job/SimpleYarnJobProxy.java
@@ -20,13 +20,14 @@ package org.apache.samza.rest.proxy.job;
import java.util.Set;
import org.apache.samza.SamzaException;
+import org.apache.samza.config.ConfigFactory;
import org.apache.samza.rest.model.JobStatus;
import org.apache.samza.rest.model.yarn.YarnApplicationInfo;
import org.apache.samza.rest.proxy.installation.InstallationFinder;
import org.apache.samza.rest.proxy.installation.InstallationRecord;
import org.apache.samza.rest.proxy.installation.SimpleInstallationFinder;
import org.apache.samza.rest.resources.JobsResourceConfig;
-import org.apache.samza.util.ClassLoaderHelper;
+import org.apache.samza.util.Util;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -49,7 +50,7 @@ public class SimpleYarnJobProxy extends ScriptJobProxy {
public SimpleYarnJobProxy(JobsResourceConfig config) throws Exception {
super(config);
this.installFinder = new SimpleInstallationFinder(config.getInstallationsPath(),
- ClassLoaderHelper.fromClassName(config.getJobConfigFactory()));
+ Util.getObj(config.getJobConfigFactory(), ConfigFactory.class));
this.statusProvider = new YarnRestJobStatusProvider(config);
}