You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by ja...@apache.org on 2018/03/04 00:31:02 UTC

samza git commit: Misc. minor cleanup.

Repository: samza
Updated Branches:
  refs/heads/master bc0a47b7c -> 1dfc5cecd


Misc. minor cleanup.

1. Added a meaningful name for the container thread pool threads.
2. Made the thread names for framework threads consistent.
3. Made a couple of monitoring/metrics threads daemon.
4. Fixed a few checkstyle warning about missing param/throws documentation.

Author: Prateek Maheshwari <pm...@linkedin.com>

Reviewers: Jagadish <ja...@apache.org>, Jacob M <jm...@apache.org>

Closes #433 from prateekm/container-thread-pool-name


Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/1dfc5cec
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/1dfc5cec
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/1dfc5cec

Branch: refs/heads/master
Commit: 1dfc5cecdfa358851f01b4ef105ccb0f62e9891c
Parents: bc0a47b
Author: Prateek Maheshwari <pm...@linkedin.com>
Authored: Sat Mar 3 16:30:59 2018 -0800
Committer: Jagadish <jv...@linkedin.com>
Committed: Sat Mar 3 16:30:59 2018 -0800

----------------------------------------------------------------------
 .../disk/PollingScanDiskSpaceMonitor.java       | 20 ++++------
 .../container/host/StatisticsMonitorImpl.java   | 20 ++--------
 .../stream/CoordinatorStreamManager.java        |  2 +-
 .../apache/samza/container/SamzaContainer.scala | 11 ++++--
 .../org/apache/samza/metrics/JvmMetrics.scala   | 24 +++++-------
 .../reporter/MetricsSnapshotReporter.scala      | 31 ++++++----------
 .../filereader/FileReaderSystemConsumer.scala   | 27 ++++++--------
 .../apache/samza/util/DaemonThreadFactory.scala | 39 --------------------
 .../samza/util/TestDaemonThreadFactory.scala    | 37 -------------------
 .../apache/samza/system/kafka/BrokerProxy.scala | 12 +-----
 .../interfaces/RelSchemaProviderFactory.java    |  3 +-
 .../interfaces/SamzaRelConverterFactory.java    |  7 ++--
 .../apache/samza/tools/GenerateKafkaEvents.java |  4 +-
 .../job/yarn/YarnClusterResourceManager.java    |  2 +-
 .../yarn/SamzaAppMasterSecurityManager.scala    | 29 +++++++++------
 .../yarn/SamzaContainerSecurityManager.scala    | 34 ++++++++++-------
 16 files changed, 101 insertions(+), 201 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/samza/blob/1dfc5cec/samza-core/src/main/java/org/apache/samza/container/disk/PollingScanDiskSpaceMonitor.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/container/disk/PollingScanDiskSpaceMonitor.java b/samza-core/src/main/java/org/apache/samza/container/disk/PollingScanDiskSpaceMonitor.java
index 75e461d..2ae8545 100644
--- a/samza-core/src/main/java/org/apache/samza/container/disk/PollingScanDiskSpaceMonitor.java
+++ b/samza-core/src/main/java/org/apache/samza/container/disk/PollingScanDiskSpaceMonitor.java
@@ -18,6 +18,8 @@
  */
 package org.apache.samza.container.disk;
 
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -34,9 +36,7 @@ import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.ThreadFactory;
 import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicInteger;
 
 /**
  * An implementation of {@link DiskSpaceMonitor} that polls for disk usage based on a specified
@@ -47,7 +47,6 @@ import java.util.concurrent.atomic.AtomicInteger;
 public class PollingScanDiskSpaceMonitor implements DiskSpaceMonitor {
   private enum State { INIT, RUNNING, STOPPED }
 
-  private static final ThreadFactory THREAD_FACTORY = new ThreadFactoryImpl();
   private static final Logger log = LoggerFactory.getLogger(PollingScanDiskSpaceMonitor.class);
 
   // Note: we use this as a set where the value is always Boolean.TRUE.
@@ -57,7 +56,11 @@ public class PollingScanDiskSpaceMonitor implements DiskSpaceMonitor {
   private final Object lock = new Object();
 
   private final ScheduledExecutorService schedulerService =
-      Executors.newSingleThreadScheduledExecutor(THREAD_FACTORY);
+      Executors.newSingleThreadScheduledExecutor(
+          new ThreadFactoryBuilder()
+              .setNameFormat("Samza PollingScanDiskSpaceMonitor Thread-%d")
+              .setDaemon(true)
+              .build());
   private final Set<Path> watchPaths;
   private final long pollingIntervalMillis;
 
@@ -197,13 +200,4 @@ public class PollingScanDiskSpaceMonitor implements DiskSpaceMonitor {
       }
     }
   }
-
-  private static class ThreadFactoryImpl implements ThreadFactory  {
-    private static final String PREFIX = "Samza-" + PollingScanDiskSpaceMonitor.class.getSimpleName() + "-";
-    private static final AtomicInteger INSTANCE_NUM = new AtomicInteger();
-
-    public Thread newThread(Runnable runnable) {
-      return new Thread(runnable, PREFIX + INSTANCE_NUM.getAndIncrement());
-    }
-  }
 }

http://git-wip-us.apache.org/repos/asf/samza/blob/1dfc5cec/samza-core/src/main/java/org/apache/samza/container/host/StatisticsMonitorImpl.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/container/host/StatisticsMonitorImpl.java b/samza-core/src/main/java/org/apache/samza/container/host/StatisticsMonitorImpl.java
index 3dfdf36..23704b4 100644
--- a/samza-core/src/main/java/org/apache/samza/container/host/StatisticsMonitorImpl.java
+++ b/samza-core/src/main/java/org/apache/samza/container/host/StatisticsMonitorImpl.java
@@ -18,6 +18,8 @@
  */
 package org.apache.samza.container.host;
 
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -25,9 +27,7 @@ import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.ThreadFactory;
 import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicInteger;
 
 /**
  * An implementation of {@link SystemStatisticsMonitor} for unix and mac platforms. Users can implement their own
@@ -42,8 +42,6 @@ import java.util.concurrent.atomic.AtomicInteger;
  * This class is thread-safe.
  */
 public class StatisticsMonitorImpl implements SystemStatisticsMonitor {
-
-  private static final ThreadFactory THREAD_FACTORY = new StatisticsMonitorThreadFactory();
   private static final Logger LOG = LoggerFactory.getLogger(StatisticsMonitorImpl.class);
 
   /**
@@ -60,7 +58,8 @@ public class StatisticsMonitorImpl implements SystemStatisticsMonitor {
 
   // Single threaded executor to handle callback invocations.
   private final ScheduledExecutorService schedulerService =
-      Executors.newSingleThreadScheduledExecutor(THREAD_FACTORY);
+      Executors.newSingleThreadScheduledExecutor(
+          new ThreadFactoryBuilder().setNameFormat("Samza StatisticsMonitor Thread-%d").setDaemon(true).build());
 
   // Use this as a set with value always set to True
   private final ConcurrentMap<StatisticsMonitorImpl.Listener, Boolean> listenerSet = new ConcurrentHashMap<>();
@@ -174,15 +173,4 @@ public class StatisticsMonitorImpl implements SystemStatisticsMonitor {
       }
     }
   }
-
-  // A convenience class that provides named threads
-  private static class StatisticsMonitorThreadFactory implements ThreadFactory {
-    private static final AtomicInteger INSTANCE_COUNT = new AtomicInteger();
-    private static final String PREFIX = "Samza-StatisticsMonitor-Thread-";
-
-    @Override
-    public Thread newThread(Runnable runnable) {
-      return new Thread(runnable, PREFIX + INSTANCE_COUNT.getAndIncrement());
-    }
-  }
 }

http://git-wip-us.apache.org/repos/asf/samza/blob/1dfc5cec/samza-core/src/main/java/org/apache/samza/coordinator/stream/CoordinatorStreamManager.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/coordinator/stream/CoordinatorStreamManager.java b/samza-core/src/main/java/org/apache/samza/coordinator/stream/CoordinatorStreamManager.java
index f6e68b5..4edc1a5 100644
--- a/samza-core/src/main/java/org/apache/samza/coordinator/stream/CoordinatorStreamManager.java
+++ b/samza-core/src/main/java/org/apache/samza/coordinator/stream/CoordinatorStreamManager.java
@@ -79,7 +79,7 @@ public class CoordinatorStreamManager {
   /**
    * Register source with the coordinator stream.
    *
-   * @param source
+   * @param source source to register with the coordinator stream
    */
   public void register(String source) {
     if (coordinatorStreamConsumer != null) {

http://git-wip-us.apache.org/repos/asf/samza/blob/1dfc5cec/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala b/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
index 9b18044..bef5b41 100644
--- a/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
+++ b/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
@@ -26,6 +26,7 @@ import java.util
 import java.util.Base64
 import java.util.concurrent.{ExecutorService, Executors, TimeUnit}
 
+import com.google.common.util.concurrent.ThreadFactoryBuilder
 import org.apache.samza.checkpoint.{CheckpointListener, CheckpointManagerFactory, OffsetManager, OffsetManagerMetrics}
 import org.apache.samza.config.JobConfig.Config2Job
 import org.apache.samza.config.MetricsConfig.Config2Metrics
@@ -405,10 +406,14 @@ object SamzaContainer extends Logging {
     val threadPoolSize = config.getThreadPoolSize
     info("Got thread pool size: " + threadPoolSize)
 
-    val taskThreadPool = if (!singleThreadMode && threadPoolSize > 0)
-      Executors.newFixedThreadPool(threadPoolSize)
-    else
+
+    val taskThreadPool = if (!singleThreadMode && threadPoolSize > 0) {
+      Executors.newFixedThreadPool(threadPoolSize,
+        new ThreadFactoryBuilder().setNameFormat("Samza Container Thread-%d").build())
+    } else {
       null
+    }
+
 
     val finalTaskFactory = TaskFactoryUtil.finalizeTaskFactory(
       taskFactory,

http://git-wip-us.apache.org/repos/asf/samza/blob/1dfc5cec/samza-core/src/main/scala/org/apache/samza/metrics/JvmMetrics.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/metrics/JvmMetrics.scala b/samza-core/src/main/scala/org/apache/samza/metrics/JvmMetrics.scala
index f26bd2c..7cc1452 100644
--- a/samza-core/src/main/scala/org/apache/samza/metrics/JvmMetrics.scala
+++ b/samza-core/src/main/scala/org/apache/samza/metrics/JvmMetrics.scala
@@ -19,23 +19,18 @@
 
 package org.apache.samza.metrics
 
-import scala.collection._
-import scala.collection.JavaConverters._
-import java.lang.management.ManagementFactory
+import com.google.common.util.concurrent.ThreadFactoryBuilder
+import com.sun.management.OperatingSystemMXBean
+import com.sun.management.UnixOperatingSystemMXBean
+import org.apache.samza.util.Logging
+
 import java.lang.Thread.State._
+import java.lang.management.ManagementFactory
 import java.util.concurrent.Executors
 import java.util.concurrent.TimeUnit
 
-import com.sun.management.{OperatingSystemMXBean, UnixOperatingSystemMXBean}
-import org.apache.samza.util.Logging
-import org.apache.samza.util.DaemonThreadFactory
-
-/**
- *  Companion object for class JvmMetrics encapsulating various constants
- */
-object JvmMetrics {
-  val JVM_METRICS_THREAD_NAME_PREFIX = "JVM-METRICS"
-}
+import scala.collection.JavaConverters._
+import scala.collection._
 
 /**
  * Straight up ripoff of Hadoop's metrics2 JvmMetrics class.
@@ -49,7 +44,8 @@ class JvmMetrics(val registry: MetricsRegistry) extends MetricsHelper with Runna
   val threadMXBean = ManagementFactory.getThreadMXBean()
   val osMXBean = ManagementFactory.getOperatingSystemMXBean()
   var gcBeanCounters = Map[String, (Counter, Counter)]()
-  val executor = Executors.newScheduledThreadPool(1, new DaemonThreadFactory(JvmMetrics.JVM_METRICS_THREAD_NAME_PREFIX))
+  val executor = Executors.newSingleThreadScheduledExecutor(
+    new ThreadFactoryBuilder().setNameFormat("Samza JvmMetrics Thread-%d").setDaemon(true).build())
 
   // jvm metrics
   val gMemNonHeapUsedM = newGauge("mem-non-heap-used-mb", 0.0F)

http://git-wip-us.apache.org/repos/asf/samza/blob/1dfc5cec/samza-core/src/main/scala/org/apache/samza/metrics/reporter/MetricsSnapshotReporter.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/metrics/reporter/MetricsSnapshotReporter.scala b/samza-core/src/main/scala/org/apache/samza/metrics/reporter/MetricsSnapshotReporter.scala
index 945ae47..65ca49c 100644
--- a/samza-core/src/main/scala/org/apache/samza/metrics/reporter/MetricsSnapshotReporter.scala
+++ b/samza-core/src/main/scala/org/apache/samza/metrics/reporter/MetricsSnapshotReporter.scala
@@ -19,30 +19,20 @@
 
 package org.apache.samza.metrics.reporter
 
+import com.google.common.util.concurrent.ThreadFactoryBuilder
+import org.apache.samza.metrics._
+import org.apache.samza.serializers.Serializer
+import org.apache.samza.system.OutgoingMessageEnvelope
+import org.apache.samza.system.SystemProducer
+import org.apache.samza.system.SystemStream
+import org.apache.samza.util.Logging
+
 import java.util.HashMap
 import java.util.Map
-import scala.collection.JavaConverters._
-import org.apache.samza.util.Logging
-import org.apache.samza.metrics.Counter
-import org.apache.samza.metrics.Gauge
-import org.apache.samza.metrics.Timer
-import org.apache.samza.metrics.MetricsReporter
-import org.apache.samza.metrics.MetricsVisitor
-import org.apache.samza.metrics.ReadableMetricsRegistry
 import java.util.concurrent.Executors
-import org.apache.samza.util.DaemonThreadFactory
 import java.util.concurrent.TimeUnit
-import org.apache.samza.serializers.Serializer
-import org.apache.samza.system.SystemProducer
-import org.apache.samza.system.SystemStream
-import org.apache.samza.system.OutgoingMessageEnvelope
 
-/**
- *  Companion object for class MetricsSnapshotReporter encapsulating various constants
- */
-object MetricsSnapshotReporter {
-  val METRIC_SNAPSHOT_REPORTER_THREAD_NAME_PREFIX = "METRIC-SNAPSHOT-REPORTER"
-}
+import scala.collection.JavaConverters._
 
 /**
  * MetricsSnapshotReporter is a generic metrics reporter that sends metrics to a stream.
@@ -66,7 +56,8 @@ class MetricsSnapshotReporter(
   serializer: Serializer[MetricsSnapshot] = null,
   clock: () => Long = () => { System.currentTimeMillis }) extends MetricsReporter with Runnable with Logging {
 
-  val executor = Executors.newScheduledThreadPool(1, new DaemonThreadFactory(MetricsSnapshotReporter.METRIC_SNAPSHOT_REPORTER_THREAD_NAME_PREFIX))
+  val executor = Executors.newSingleThreadScheduledExecutor(
+    new ThreadFactoryBuilder().setNameFormat("Samza MetricsSnapshotReporter Thread-%d").setDaemon(true).build())
   val resetTime = clock()
   var registries = List[(String, ReadableMetricsRegistry)]()
 

http://git-wip-us.apache.org/repos/asf/samza/blob/1dfc5cec/samza-core/src/main/scala/org/apache/samza/system/filereader/FileReaderSystemConsumer.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/system/filereader/FileReaderSystemConsumer.scala b/samza-core/src/main/scala/org/apache/samza/system/filereader/FileReaderSystemConsumer.scala
index 84dd6b4..ad7577c 100644
--- a/samza-core/src/main/scala/org/apache/samza/system/filereader/FileReaderSystemConsumer.scala
+++ b/samza-core/src/main/scala/org/apache/samza/system/filereader/FileReaderSystemConsumer.scala
@@ -19,24 +19,20 @@
 
 package org.apache.samza.system.filereader
 
-import org.apache.samza.util.BlockingEnvelopeMap
+import com.google.common.util.concurrent.ThreadFactoryBuilder
 import org.apache.samza.metrics.MetricsRegistry
+import org.apache.samza.system.IncomingMessageEnvelope
 import org.apache.samza.system.SystemStreamPartition
-import scala.collection.mutable.Map
+import org.apache.samza.util.BlockingEnvelopeMap
+import org.apache.samza.util.Logging
+
 import java.io.RandomAccessFile
-import org.apache.samza.system.IncomingMessageEnvelope
-import java.util.concurrent.LinkedBlockingQueue
-import java.util.concurrent.Executors
 import java.util.concurrent.ExecutorService
-import org.apache.samza.util.DaemonThreadFactory
-import org.apache.samza.util.Logging
+import java.util.concurrent.Executors
+import java.util.concurrent.LinkedBlockingQueue
+
+import scala.collection.mutable.Map
 
-object FileReaderSystemConsumer {
-  /**
-   * prefix for the file reader system thread names
-   */
-  val FILE_READER_SYSTEM_THREAD_PREFIX = "filereader-"
-}
 
 class FileReaderSystemConsumer(
   systemName: String,
@@ -77,8 +73,9 @@ class FileReaderSystemConsumer(
    * start one thread for each file reader
    */
   override def start {
-    pool = Executors.newFixedThreadPool(systemStreamPartitionAndStartingOffset.size, new DaemonThreadFactory(FileReaderSystemConsumer.FILE_READER_SYSTEM_THREAD_PREFIX))
-    systemStreamPartitionAndStartingOffset.map { case (ssp, offset) => pool.execute(readInputFiles(ssp, offset)) }
+    pool = Executors.newFixedThreadPool(systemStreamPartitionAndStartingOffset.size,
+      new ThreadFactoryBuilder().setNameFormat("Samza FileReader Thread-%d").setDaemon(true).build())
+    systemStreamPartitionAndStartingOffset.foreach { case (ssp, offset) => pool.execute(readInputFiles(ssp, offset)) }
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/samza/blob/1dfc5cec/samza-core/src/main/scala/org/apache/samza/util/DaemonThreadFactory.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/util/DaemonThreadFactory.scala b/samza-core/src/main/scala/org/apache/samza/util/DaemonThreadFactory.scala
deleted file mode 100644
index d2015ab..0000000
--- a/samza-core/src/main/scala/org/apache/samza/util/DaemonThreadFactory.scala
+++ /dev/null
@@ -1,39 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.samza.util
-
-import java.util.concurrent.ThreadFactory
-
-
-object ThreadNamePrefix {
-  val SAMZA_THREAD_NAME_PREFIX = "SAMZA-"
-}
-
-class DaemonThreadFactory(name: String) extends ThreadFactory {
-
-  def newThread(r: Runnable) = {
-    val thread = new Thread(r)
-    thread.setDaemon(true)
-    if (name.nonEmpty) {
-      thread.setName(ThreadNamePrefix.SAMZA_THREAD_NAME_PREFIX+name)
-    }
-    thread
-  }
-}

http://git-wip-us.apache.org/repos/asf/samza/blob/1dfc5cec/samza-core/src/test/scala/org/apache/samza/util/TestDaemonThreadFactory.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/test/scala/org/apache/samza/util/TestDaemonThreadFactory.scala b/samza-core/src/test/scala/org/apache/samza/util/TestDaemonThreadFactory.scala
deleted file mode 100644
index ee56e20..0000000
--- a/samza-core/src/test/scala/org/apache/samza/util/TestDaemonThreadFactory.scala
+++ /dev/null
@@ -1,37 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.samza.util
-
-import org.junit.Assert._
-import org.junit.Test
-
-class TestDaemonThreadFactory {
-  @Test
-  def testDaemonThreadFactoryCanCreatThreadGivenName() {
-    val testThreadName = "JvmMetrics"
-    val dtf = new DaemonThreadFactory(testThreadName)
-    val threadWithName = dtf.newThread(new Runnable {
-      def run() {
-        // Not testing this particular method
-      }
-    })
-    assertEquals(ThreadNamePrefix.SAMZA_THREAD_NAME_PREFIX + testThreadName, threadWithName.getName)
-  }
-}

http://git-wip-us.apache.org/repos/asf/samza/blob/1dfc5cec/samza-kafka/src/main/scala/org/apache/samza/system/kafka/BrokerProxy.scala
----------------------------------------------------------------------
diff --git a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/BrokerProxy.scala b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/BrokerProxy.scala
index 8a6618d..e5482a9 100644
--- a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/BrokerProxy.scala
+++ b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/BrokerProxy.scala
@@ -31,19 +31,11 @@ import kafka.consumer.ConsumerConfig
 import kafka.message.MessageSet
 import org.apache.samza.SamzaException
 import org.apache.samza.util.ExponentialSleepStrategy
+import org.apache.samza.util.KafkaUtil
 import org.apache.samza.util.Logging
-import org.apache.samza.util.ThreadNamePrefix.SAMZA_THREAD_NAME_PREFIX
 
 import scala.collection.JavaConverters._
 import scala.collection.concurrent
-import org.apache.samza.util.KafkaUtil
-
-/**
- *  Companion object for class JvmMetrics encapsulating various constants
- */
-object BrokerProxy {
-  val BROKER_PROXY_THREAD_NAME_PREFIX = "BROKER-PROXY-"
-}
 
 /**
  * A BrokerProxy consolidates Kafka fetches meant for a particular broker and retrieves them all at once, providing
@@ -294,7 +286,7 @@ class BrokerProxy(
     if (!thread.isAlive) {
       info("Starting " + toString)
       thread.setDaemon(true)
-      thread.setName(SAMZA_THREAD_NAME_PREFIX + BrokerProxy.BROKER_PROXY_THREAD_NAME_PREFIX + thread.getName)
+      thread.setName("Samza BrokerProxy " + thread.getName)
       thread.setUncaughtExceptionHandler(new UncaughtExceptionHandler {
         override def uncaughtException(t: Thread, e: Throwable) = error("Uncaught exception in broker proxy:", e)
       })

http://git-wip-us.apache.org/repos/asf/samza/blob/1dfc5cec/samza-sql/src/main/java/org/apache/samza/sql/interfaces/RelSchemaProviderFactory.java
----------------------------------------------------------------------
diff --git a/samza-sql/src/main/java/org/apache/samza/sql/interfaces/RelSchemaProviderFactory.java b/samza-sql/src/main/java/org/apache/samza/sql/interfaces/RelSchemaProviderFactory.java
index c614cdf..f9e7cd6 100644
--- a/samza-sql/src/main/java/org/apache/samza/sql/interfaces/RelSchemaProviderFactory.java
+++ b/samza-sql/src/main/java/org/apache/samza/sql/interfaces/RelSchemaProviderFactory.java
@@ -26,7 +26,8 @@ public interface RelSchemaProviderFactory {
 
   /**
    * Create a {@link RelSchemaProvider} given the config
-   * @param config Config needed to create the {@link RelSchemaProvider}
+   * @param systemStream the system stream to create the {@link RelSchemaProvider} for
+   * @param config config needed to create the {@link RelSchemaProvider}
    * @return {@link RelSchemaProvider} object created.
    */
   RelSchemaProvider create(SystemStream systemStream, Config config);

http://git-wip-us.apache.org/repos/asf/samza/blob/1dfc5cec/samza-sql/src/main/java/org/apache/samza/sql/interfaces/SamzaRelConverterFactory.java
----------------------------------------------------------------------
diff --git a/samza-sql/src/main/java/org/apache/samza/sql/interfaces/SamzaRelConverterFactory.java b/samza-sql/src/main/java/org/apache/samza/sql/interfaces/SamzaRelConverterFactory.java
index f239df6..0a6f275 100644
--- a/samza-sql/src/main/java/org/apache/samza/sql/interfaces/SamzaRelConverterFactory.java
+++ b/samza-sql/src/main/java/org/apache/samza/sql/interfaces/SamzaRelConverterFactory.java
@@ -31,9 +31,10 @@ public interface SamzaRelConverterFactory {
   /**
    * Create a {@link SamzaRelConverter}. This method is called when the framework wants to create the
    * {@link SamzaRelConverter} corresponding to the system.
-   * @param config
-   *  config that is used to create the object
-   * @return Returns the object created.
+   * @param systemStream the systemStream to create a converter for
+   * @param relSchemaProvider the relational schema provider
+   * @param config config that is used to create the object
+   * @return the object created.
    */
   SamzaRelConverter create(SystemStream systemStream, RelSchemaProvider relSchemaProvider, Config config);
 }

http://git-wip-us.apache.org/repos/asf/samza/blob/1dfc5cec/samza-tools/src/main/java/org/apache/samza/tools/GenerateKafkaEvents.java
----------------------------------------------------------------------
diff --git a/samza-tools/src/main/java/org/apache/samza/tools/GenerateKafkaEvents.java b/samza-tools/src/main/java/org/apache/samza/tools/GenerateKafkaEvents.java
index 6c30eee..2beef06 100644
--- a/samza-tools/src/main/java/org/apache/samza/tools/GenerateKafkaEvents.java
+++ b/samza-tools/src/main/java/org/apache/samza/tools/GenerateKafkaEvents.java
@@ -172,11 +172,11 @@ public class GenerateKafkaEvents {
   /**
    * Encode an Avro record into byte array
    *
-   * @param clazz The class type of the Avro record
+   * @param clazz The class type of the avro record
    * @param record the instance of the avro record
    * @param <T> The type of the avro record.
    * @return encoded bytes
-   * @throws java.io.IOException
+   * @throws IOException on I/O errors encoding the avro record
    */
   public static <T> byte[] encodeAvroSpecificRecord(Class<T> clazz, T record) throws IOException {
     DatumWriter<T> msgDatumWriter = new SpecificDatumWriter<>(clazz);

http://git-wip-us.apache.org/repos/asf/samza/blob/1dfc5cec/samza-yarn/src/main/java/org/apache/samza/job/yarn/YarnClusterResourceManager.java
----------------------------------------------------------------------
diff --git a/samza-yarn/src/main/java/org/apache/samza/job/yarn/YarnClusterResourceManager.java b/samza-yarn/src/main/java/org/apache/samza/job/yarn/YarnClusterResourceManager.java
index 9be8475..695b35a 100644
--- a/samza-yarn/src/main/java/org/apache/samza/job/yarn/YarnClusterResourceManager.java
+++ b/samza-yarn/src/main/java/org/apache/samza/job/yarn/YarnClusterResourceManager.java
@@ -539,7 +539,7 @@ public class YarnClusterResourceManager extends ClusterResourceManager implement
    * @param samzaContainerId id of the samza Container to run (passed as a command line parameter to the process)
    * @param container the samza container to run.
    * @param cmdBuilder the command builder that encapsulates the command, and the context
-   *
+   * @throws IOException on IO exceptions running the container
    */
   public void runContainer(String samzaContainerId, Container container, CommandBuilder cmdBuilder) throws IOException {
     String containerIdStr = ConverterUtils.toString(container.getId());

http://git-wip-us.apache.org/repos/asf/samza/blob/1dfc5cec/samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMasterSecurityManager.scala
----------------------------------------------------------------------
diff --git a/samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMasterSecurityManager.scala b/samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMasterSecurityManager.scala
index 8dba96a..185cda0 100644
--- a/samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMasterSecurityManager.scala
+++ b/samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMasterSecurityManager.scala
@@ -19,20 +19,22 @@
 
 package org.apache.samza.job.yarn
 
-import java.security.PrivilegedExceptionAction
-import java.util.concurrent.{TimeUnit, Executors}
-
+import com.google.common.util.concurrent.ThreadFactoryBuilder
 import org.apache.hadoop.conf.Configuration
-import org.apache.hadoop.fs.{Path, FileSystem}
-import org.apache.hadoop.security.{Credentials, UserGroupInformation}
+import org.apache.hadoop.fs.FileSystem
+import org.apache.hadoop.fs.Path
+import org.apache.hadoop.security.Credentials
+import org.apache.hadoop.security.UserGroupInformation
 import org.apache.samza.SamzaException
-import org.apache.samza.config.{YarnConfig, Config}
-import org.apache.samza.util.{DaemonThreadFactory, Logging}
+import org.apache.samza.config.Config
+import org.apache.samza.config.YarnConfig
 import org.apache.samza.container.SecurityManager
+import org.apache.samza.util.Logging
+
+import java.security.PrivilegedExceptionAction
+import java.util.concurrent.Executors
+import java.util.concurrent.TimeUnit
 
-object SamzaAppMasterSecurityManager {
-  val TOKEN_RENEW_THREAD_NAME_PREFIX = "TOKEN-RENEW-PREFIX"
-}
 
 /**
   * The SamzaAppMasterSecurityManager is responsible for renewing and distributing HDFS delegation tokens on a secure YARN
@@ -47,8 +49,11 @@ object SamzaAppMasterSecurityManager {
   * @param hadoopConf the hadoop configuration
   */
 class SamzaAppMasterSecurityManager(config: Config, hadoopConf: Configuration) extends SecurityManager with Logging {
-  private val tokenRenewExecutor = Executors.newSingleThreadScheduledExecutor(new DaemonThreadFactory(SamzaAppMasterSecurityManager
-    .TOKEN_RENEW_THREAD_NAME_PREFIX))
+  private val tokenRenewExecutor = Executors.newSingleThreadScheduledExecutor(
+    new ThreadFactoryBuilder()
+      .setNameFormat("Samza AMSecurityManager TokenRenewer Thread-%d")
+      .setDaemon(true)
+      .build())
 
   def start() = {
     val yarnConfig = new YarnConfig(config)

http://git-wip-us.apache.org/repos/asf/samza/blob/1dfc5cec/samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaContainerSecurityManager.scala
----------------------------------------------------------------------
diff --git a/samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaContainerSecurityManager.scala b/samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaContainerSecurityManager.scala
index 10b971f..fce840c 100644
--- a/samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaContainerSecurityManager.scala
+++ b/samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaContainerSecurityManager.scala
@@ -19,24 +19,30 @@
 
 package org.apache.samza.job.yarn
 
-import java.util.concurrent.{TimeUnit, Executors}
-
+import com.google.common.util.concurrent.ThreadFactoryBuilder
 import org.apache.hadoop.conf.Configuration
-import org.apache.hadoop.fs.{FileSystem, Path}
-import org.apache.hadoop.security.{UserGroupInformation, Credentials}
-import org.apache.samza.config.{Config, YarnConfig}
-import org.apache.samza.util.{Logging, DaemonThreadFactory}
+import org.apache.hadoop.fs.FileSystem
+import org.apache.hadoop.fs.Path
+import org.apache.hadoop.security.Credentials
+import org.apache.hadoop.security.UserGroupInformation
+import org.apache.samza.config.Config
+import org.apache.samza.config.YarnConfig
 import org.apache.samza.container.SecurityManager
+import org.apache.samza.util.Logging
 
-object SamzaContainerSecurityManager {
-  val TOKEN_RENEW_THREAD_NAME_PREFIX = "TOKEN-RENEW-PREFIX"
-  val INITIAL_DELAY_IN_SECONDS = 60
-}
+import java.util.concurrent.Executors
+import java.util.concurrent.TimeUnit
 
 class SamzaContainerSecurityManager(config: Config, hadoopConfig: Configuration) extends SecurityManager with Logging {
-  private val tokenRenewExecutor = Executors.newScheduledThreadPool(1, new DaemonThreadFactory(SamzaContainerSecurityManager.TOKEN_RENEW_THREAD_NAME_PREFIX))
-  private var lastRefreshTimestamp = 0L
+  private val InitialDelayInSeconds = 60
+  
+  private val tokenRenewExecutor = Executors.newSingleThreadScheduledExecutor(
+    new ThreadFactoryBuilder()
+      .setNameFormat("Samza ContainerSecurityManager TokenRenewer Thread-%d")
+      .setDaemon(true)
+      .build())
 
+  private var lastRefreshTimestamp = 0L
 
   def start() = {
     val yarnConfig = new YarnConfig(config)
@@ -75,8 +81,8 @@ class SamzaContainerSecurityManager(config: Config, hadoopConfig: Configuration)
       }
     }
 
-    info(s"Schedule the next fetch in ${renewalInterval + SamzaContainerSecurityManager.INITIAL_DELAY_IN_SECONDS} seconds")
-    tokenRenewExecutor.schedule(tokenRenewRunnable, renewalInterval + SamzaContainerSecurityManager.INITIAL_DELAY_IN_SECONDS, TimeUnit.SECONDS)
+    info(s"Schedule the next fetch in ${renewalInterval + InitialDelayInSeconds} seconds")
+    tokenRenewExecutor.schedule(tokenRenewRunnable, renewalInterval + InitialDelayInSeconds, TimeUnit.SECONDS)
   }
 
   private def getCredentialsFromHDFS(fs: FileSystem, tokenPath: Path): Credentials = {