You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by ja...@apache.org on 2018/03/04 00:31:02 UTC
samza git commit: Misc. minor cleanup.
Repository: samza
Updated Branches:
refs/heads/master bc0a47b7c -> 1dfc5cecd
Misc. minor cleanup.
1. Added a meaningful name for the container thread pool threads.
2. Made the thread names for framework threads consistent.
3. Made a couple of monitoring/metrics threads daemon.
4. Fixed a few checkstyle warning about missing param/throws documentation.
Author: Prateek Maheshwari <pm...@linkedin.com>
Reviewers: Jagadish <ja...@apache.org>, Jacob M <jm...@apache.org>
Closes #433 from prateekm/container-thread-pool-name
Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/1dfc5cec
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/1dfc5cec
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/1dfc5cec
Branch: refs/heads/master
Commit: 1dfc5cecdfa358851f01b4ef105ccb0f62e9891c
Parents: bc0a47b
Author: Prateek Maheshwari <pm...@linkedin.com>
Authored: Sat Mar 3 16:30:59 2018 -0800
Committer: Jagadish <jv...@linkedin.com>
Committed: Sat Mar 3 16:30:59 2018 -0800
----------------------------------------------------------------------
.../disk/PollingScanDiskSpaceMonitor.java | 20 ++++------
.../container/host/StatisticsMonitorImpl.java | 20 ++--------
.../stream/CoordinatorStreamManager.java | 2 +-
.../apache/samza/container/SamzaContainer.scala | 11 ++++--
.../org/apache/samza/metrics/JvmMetrics.scala | 24 +++++-------
.../reporter/MetricsSnapshotReporter.scala | 31 ++++++----------
.../filereader/FileReaderSystemConsumer.scala | 27 ++++++--------
.../apache/samza/util/DaemonThreadFactory.scala | 39 --------------------
.../samza/util/TestDaemonThreadFactory.scala | 37 -------------------
.../apache/samza/system/kafka/BrokerProxy.scala | 12 +-----
.../interfaces/RelSchemaProviderFactory.java | 3 +-
.../interfaces/SamzaRelConverterFactory.java | 7 ++--
.../apache/samza/tools/GenerateKafkaEvents.java | 4 +-
.../job/yarn/YarnClusterResourceManager.java | 2 +-
.../yarn/SamzaAppMasterSecurityManager.scala | 29 +++++++++------
.../yarn/SamzaContainerSecurityManager.scala | 34 ++++++++++-------
16 files changed, 101 insertions(+), 201 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/samza/blob/1dfc5cec/samza-core/src/main/java/org/apache/samza/container/disk/PollingScanDiskSpaceMonitor.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/container/disk/PollingScanDiskSpaceMonitor.java b/samza-core/src/main/java/org/apache/samza/container/disk/PollingScanDiskSpaceMonitor.java
index 75e461d..2ae8545 100644
--- a/samza-core/src/main/java/org/apache/samza/container/disk/PollingScanDiskSpaceMonitor.java
+++ b/samza-core/src/main/java/org/apache/samza/container/disk/PollingScanDiskSpaceMonitor.java
@@ -18,6 +18,8 @@
*/
package org.apache.samza.container.disk;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -34,9 +36,7 @@ import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicInteger;
/**
* An implementation of {@link DiskSpaceMonitor} that polls for disk usage based on a specified
@@ -47,7 +47,6 @@ import java.util.concurrent.atomic.AtomicInteger;
public class PollingScanDiskSpaceMonitor implements DiskSpaceMonitor {
private enum State { INIT, RUNNING, STOPPED }
- private static final ThreadFactory THREAD_FACTORY = new ThreadFactoryImpl();
private static final Logger log = LoggerFactory.getLogger(PollingScanDiskSpaceMonitor.class);
// Note: we use this as a set where the value is always Boolean.TRUE.
@@ -57,7 +56,11 @@ public class PollingScanDiskSpaceMonitor implements DiskSpaceMonitor {
private final Object lock = new Object();
private final ScheduledExecutorService schedulerService =
- Executors.newSingleThreadScheduledExecutor(THREAD_FACTORY);
+ Executors.newSingleThreadScheduledExecutor(
+ new ThreadFactoryBuilder()
+ .setNameFormat("Samza PollingScanDiskSpaceMonitor Thread-%d")
+ .setDaemon(true)
+ .build());
private final Set<Path> watchPaths;
private final long pollingIntervalMillis;
@@ -197,13 +200,4 @@ public class PollingScanDiskSpaceMonitor implements DiskSpaceMonitor {
}
}
}
-
- private static class ThreadFactoryImpl implements ThreadFactory {
- private static final String PREFIX = "Samza-" + PollingScanDiskSpaceMonitor.class.getSimpleName() + "-";
- private static final AtomicInteger INSTANCE_NUM = new AtomicInteger();
-
- public Thread newThread(Runnable runnable) {
- return new Thread(runnable, PREFIX + INSTANCE_NUM.getAndIncrement());
- }
- }
}
http://git-wip-us.apache.org/repos/asf/samza/blob/1dfc5cec/samza-core/src/main/java/org/apache/samza/container/host/StatisticsMonitorImpl.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/container/host/StatisticsMonitorImpl.java b/samza-core/src/main/java/org/apache/samza/container/host/StatisticsMonitorImpl.java
index 3dfdf36..23704b4 100644
--- a/samza-core/src/main/java/org/apache/samza/container/host/StatisticsMonitorImpl.java
+++ b/samza-core/src/main/java/org/apache/samza/container/host/StatisticsMonitorImpl.java
@@ -18,6 +18,8 @@
*/
package org.apache.samza.container.host;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -25,9 +27,7 @@ import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicInteger;
/**
* An implementation of {@link SystemStatisticsMonitor} for unix and mac platforms. Users can implement their own
@@ -42,8 +42,6 @@ import java.util.concurrent.atomic.AtomicInteger;
* This class is thread-safe.
*/
public class StatisticsMonitorImpl implements SystemStatisticsMonitor {
-
- private static final ThreadFactory THREAD_FACTORY = new StatisticsMonitorThreadFactory();
private static final Logger LOG = LoggerFactory.getLogger(StatisticsMonitorImpl.class);
/**
@@ -60,7 +58,8 @@ public class StatisticsMonitorImpl implements SystemStatisticsMonitor {
// Single threaded executor to handle callback invocations.
private final ScheduledExecutorService schedulerService =
- Executors.newSingleThreadScheduledExecutor(THREAD_FACTORY);
+ Executors.newSingleThreadScheduledExecutor(
+ new ThreadFactoryBuilder().setNameFormat("Samza StatisticsMonitor Thread-%d").setDaemon(true).build());
// Use this as a set with value always set to True
private final ConcurrentMap<StatisticsMonitorImpl.Listener, Boolean> listenerSet = new ConcurrentHashMap<>();
@@ -174,15 +173,4 @@ public class StatisticsMonitorImpl implements SystemStatisticsMonitor {
}
}
}
-
- // A convenience class that provides named threads
- private static class StatisticsMonitorThreadFactory implements ThreadFactory {
- private static final AtomicInteger INSTANCE_COUNT = new AtomicInteger();
- private static final String PREFIX = "Samza-StatisticsMonitor-Thread-";
-
- @Override
- public Thread newThread(Runnable runnable) {
- return new Thread(runnable, PREFIX + INSTANCE_COUNT.getAndIncrement());
- }
- }
}
http://git-wip-us.apache.org/repos/asf/samza/blob/1dfc5cec/samza-core/src/main/java/org/apache/samza/coordinator/stream/CoordinatorStreamManager.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/coordinator/stream/CoordinatorStreamManager.java b/samza-core/src/main/java/org/apache/samza/coordinator/stream/CoordinatorStreamManager.java
index f6e68b5..4edc1a5 100644
--- a/samza-core/src/main/java/org/apache/samza/coordinator/stream/CoordinatorStreamManager.java
+++ b/samza-core/src/main/java/org/apache/samza/coordinator/stream/CoordinatorStreamManager.java
@@ -79,7 +79,7 @@ public class CoordinatorStreamManager {
/**
* Register source with the coordinator stream.
*
- * @param source
+ * @param source source to register with the coordinator stream
*/
public void register(String source) {
if (coordinatorStreamConsumer != null) {
http://git-wip-us.apache.org/repos/asf/samza/blob/1dfc5cec/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala b/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
index 9b18044..bef5b41 100644
--- a/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
+++ b/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
@@ -26,6 +26,7 @@ import java.util
import java.util.Base64
import java.util.concurrent.{ExecutorService, Executors, TimeUnit}
+import com.google.common.util.concurrent.ThreadFactoryBuilder
import org.apache.samza.checkpoint.{CheckpointListener, CheckpointManagerFactory, OffsetManager, OffsetManagerMetrics}
import org.apache.samza.config.JobConfig.Config2Job
import org.apache.samza.config.MetricsConfig.Config2Metrics
@@ -405,10 +406,14 @@ object SamzaContainer extends Logging {
val threadPoolSize = config.getThreadPoolSize
info("Got thread pool size: " + threadPoolSize)
- val taskThreadPool = if (!singleThreadMode && threadPoolSize > 0)
- Executors.newFixedThreadPool(threadPoolSize)
- else
+
+ val taskThreadPool = if (!singleThreadMode && threadPoolSize > 0) {
+ Executors.newFixedThreadPool(threadPoolSize,
+ new ThreadFactoryBuilder().setNameFormat("Samza Container Thread-%d").build())
+ } else {
null
+ }
+
val finalTaskFactory = TaskFactoryUtil.finalizeTaskFactory(
taskFactory,
http://git-wip-us.apache.org/repos/asf/samza/blob/1dfc5cec/samza-core/src/main/scala/org/apache/samza/metrics/JvmMetrics.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/metrics/JvmMetrics.scala b/samza-core/src/main/scala/org/apache/samza/metrics/JvmMetrics.scala
index f26bd2c..7cc1452 100644
--- a/samza-core/src/main/scala/org/apache/samza/metrics/JvmMetrics.scala
+++ b/samza-core/src/main/scala/org/apache/samza/metrics/JvmMetrics.scala
@@ -19,23 +19,18 @@
package org.apache.samza.metrics
-import scala.collection._
-import scala.collection.JavaConverters._
-import java.lang.management.ManagementFactory
+import com.google.common.util.concurrent.ThreadFactoryBuilder
+import com.sun.management.OperatingSystemMXBean
+import com.sun.management.UnixOperatingSystemMXBean
+import org.apache.samza.util.Logging
+
import java.lang.Thread.State._
+import java.lang.management.ManagementFactory
import java.util.concurrent.Executors
import java.util.concurrent.TimeUnit
-import com.sun.management.{OperatingSystemMXBean, UnixOperatingSystemMXBean}
-import org.apache.samza.util.Logging
-import org.apache.samza.util.DaemonThreadFactory
-
-/**
- * Companion object for class JvmMetrics encapsulating various constants
- */
-object JvmMetrics {
- val JVM_METRICS_THREAD_NAME_PREFIX = "JVM-METRICS"
-}
+import scala.collection.JavaConverters._
+import scala.collection._
/**
* Straight up ripoff of Hadoop's metrics2 JvmMetrics class.
@@ -49,7 +44,8 @@ class JvmMetrics(val registry: MetricsRegistry) extends MetricsHelper with Runna
val threadMXBean = ManagementFactory.getThreadMXBean()
val osMXBean = ManagementFactory.getOperatingSystemMXBean()
var gcBeanCounters = Map[String, (Counter, Counter)]()
- val executor = Executors.newScheduledThreadPool(1, new DaemonThreadFactory(JvmMetrics.JVM_METRICS_THREAD_NAME_PREFIX))
+ val executor = Executors.newSingleThreadScheduledExecutor(
+ new ThreadFactoryBuilder().setNameFormat("Samza JvmMetrics Thread-%d").setDaemon(true).build())
// jvm metrics
val gMemNonHeapUsedM = newGauge("mem-non-heap-used-mb", 0.0F)
http://git-wip-us.apache.org/repos/asf/samza/blob/1dfc5cec/samza-core/src/main/scala/org/apache/samza/metrics/reporter/MetricsSnapshotReporter.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/metrics/reporter/MetricsSnapshotReporter.scala b/samza-core/src/main/scala/org/apache/samza/metrics/reporter/MetricsSnapshotReporter.scala
index 945ae47..65ca49c 100644
--- a/samza-core/src/main/scala/org/apache/samza/metrics/reporter/MetricsSnapshotReporter.scala
+++ b/samza-core/src/main/scala/org/apache/samza/metrics/reporter/MetricsSnapshotReporter.scala
@@ -19,30 +19,20 @@
package org.apache.samza.metrics.reporter
+import com.google.common.util.concurrent.ThreadFactoryBuilder
+import org.apache.samza.metrics._
+import org.apache.samza.serializers.Serializer
+import org.apache.samza.system.OutgoingMessageEnvelope
+import org.apache.samza.system.SystemProducer
+import org.apache.samza.system.SystemStream
+import org.apache.samza.util.Logging
+
import java.util.HashMap
import java.util.Map
-import scala.collection.JavaConverters._
-import org.apache.samza.util.Logging
-import org.apache.samza.metrics.Counter
-import org.apache.samza.metrics.Gauge
-import org.apache.samza.metrics.Timer
-import org.apache.samza.metrics.MetricsReporter
-import org.apache.samza.metrics.MetricsVisitor
-import org.apache.samza.metrics.ReadableMetricsRegistry
import java.util.concurrent.Executors
-import org.apache.samza.util.DaemonThreadFactory
import java.util.concurrent.TimeUnit
-import org.apache.samza.serializers.Serializer
-import org.apache.samza.system.SystemProducer
-import org.apache.samza.system.SystemStream
-import org.apache.samza.system.OutgoingMessageEnvelope
-/**
- * Companion object for class MetricsSnapshotReporter encapsulating various constants
- */
-object MetricsSnapshotReporter {
- val METRIC_SNAPSHOT_REPORTER_THREAD_NAME_PREFIX = "METRIC-SNAPSHOT-REPORTER"
-}
+import scala.collection.JavaConverters._
/**
* MetricsSnapshotReporter is a generic metrics reporter that sends metrics to a stream.
@@ -66,7 +56,8 @@ class MetricsSnapshotReporter(
serializer: Serializer[MetricsSnapshot] = null,
clock: () => Long = () => { System.currentTimeMillis }) extends MetricsReporter with Runnable with Logging {
- val executor = Executors.newScheduledThreadPool(1, new DaemonThreadFactory(MetricsSnapshotReporter.METRIC_SNAPSHOT_REPORTER_THREAD_NAME_PREFIX))
+ val executor = Executors.newSingleThreadScheduledExecutor(
+ new ThreadFactoryBuilder().setNameFormat("Samza MetricsSnapshotReporter Thread-%d").setDaemon(true).build())
val resetTime = clock()
var registries = List[(String, ReadableMetricsRegistry)]()
http://git-wip-us.apache.org/repos/asf/samza/blob/1dfc5cec/samza-core/src/main/scala/org/apache/samza/system/filereader/FileReaderSystemConsumer.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/system/filereader/FileReaderSystemConsumer.scala b/samza-core/src/main/scala/org/apache/samza/system/filereader/FileReaderSystemConsumer.scala
index 84dd6b4..ad7577c 100644
--- a/samza-core/src/main/scala/org/apache/samza/system/filereader/FileReaderSystemConsumer.scala
+++ b/samza-core/src/main/scala/org/apache/samza/system/filereader/FileReaderSystemConsumer.scala
@@ -19,24 +19,20 @@
package org.apache.samza.system.filereader
-import org.apache.samza.util.BlockingEnvelopeMap
+import com.google.common.util.concurrent.ThreadFactoryBuilder
import org.apache.samza.metrics.MetricsRegistry
+import org.apache.samza.system.IncomingMessageEnvelope
import org.apache.samza.system.SystemStreamPartition
-import scala.collection.mutable.Map
+import org.apache.samza.util.BlockingEnvelopeMap
+import org.apache.samza.util.Logging
+
import java.io.RandomAccessFile
-import org.apache.samza.system.IncomingMessageEnvelope
-import java.util.concurrent.LinkedBlockingQueue
-import java.util.concurrent.Executors
import java.util.concurrent.ExecutorService
-import org.apache.samza.util.DaemonThreadFactory
-import org.apache.samza.util.Logging
+import java.util.concurrent.Executors
+import java.util.concurrent.LinkedBlockingQueue
+
+import scala.collection.mutable.Map
-object FileReaderSystemConsumer {
- /**
- * prefix for the file reader system thread names
- */
- val FILE_READER_SYSTEM_THREAD_PREFIX = "filereader-"
-}
class FileReaderSystemConsumer(
systemName: String,
@@ -77,8 +73,9 @@ class FileReaderSystemConsumer(
* start one thread for each file reader
*/
override def start {
- pool = Executors.newFixedThreadPool(systemStreamPartitionAndStartingOffset.size, new DaemonThreadFactory(FileReaderSystemConsumer.FILE_READER_SYSTEM_THREAD_PREFIX))
- systemStreamPartitionAndStartingOffset.map { case (ssp, offset) => pool.execute(readInputFiles(ssp, offset)) }
+ pool = Executors.newFixedThreadPool(systemStreamPartitionAndStartingOffset.size,
+ new ThreadFactoryBuilder().setNameFormat("Samza FileReader Thread-%d").setDaemon(true).build())
+ systemStreamPartitionAndStartingOffset.foreach { case (ssp, offset) => pool.execute(readInputFiles(ssp, offset)) }
}
/**
http://git-wip-us.apache.org/repos/asf/samza/blob/1dfc5cec/samza-core/src/main/scala/org/apache/samza/util/DaemonThreadFactory.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/util/DaemonThreadFactory.scala b/samza-core/src/main/scala/org/apache/samza/util/DaemonThreadFactory.scala
deleted file mode 100644
index d2015ab..0000000
--- a/samza-core/src/main/scala/org/apache/samza/util/DaemonThreadFactory.scala
+++ /dev/null
@@ -1,39 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.samza.util
-
-import java.util.concurrent.ThreadFactory
-
-
-object ThreadNamePrefix {
- val SAMZA_THREAD_NAME_PREFIX = "SAMZA-"
-}
-
-class DaemonThreadFactory(name: String) extends ThreadFactory {
-
- def newThread(r: Runnable) = {
- val thread = new Thread(r)
- thread.setDaemon(true)
- if (name.nonEmpty) {
- thread.setName(ThreadNamePrefix.SAMZA_THREAD_NAME_PREFIX+name)
- }
- thread
- }
-}
http://git-wip-us.apache.org/repos/asf/samza/blob/1dfc5cec/samza-core/src/test/scala/org/apache/samza/util/TestDaemonThreadFactory.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/test/scala/org/apache/samza/util/TestDaemonThreadFactory.scala b/samza-core/src/test/scala/org/apache/samza/util/TestDaemonThreadFactory.scala
deleted file mode 100644
index ee56e20..0000000
--- a/samza-core/src/test/scala/org/apache/samza/util/TestDaemonThreadFactory.scala
+++ /dev/null
@@ -1,37 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.samza.util
-
-import org.junit.Assert._
-import org.junit.Test
-
-class TestDaemonThreadFactory {
- @Test
- def testDaemonThreadFactoryCanCreatThreadGivenName() {
- val testThreadName = "JvmMetrics"
- val dtf = new DaemonThreadFactory(testThreadName)
- val threadWithName = dtf.newThread(new Runnable {
- def run() {
- // Not testing this particular method
- }
- })
- assertEquals(ThreadNamePrefix.SAMZA_THREAD_NAME_PREFIX + testThreadName, threadWithName.getName)
- }
-}
http://git-wip-us.apache.org/repos/asf/samza/blob/1dfc5cec/samza-kafka/src/main/scala/org/apache/samza/system/kafka/BrokerProxy.scala
----------------------------------------------------------------------
diff --git a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/BrokerProxy.scala b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/BrokerProxy.scala
index 8a6618d..e5482a9 100644
--- a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/BrokerProxy.scala
+++ b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/BrokerProxy.scala
@@ -31,19 +31,11 @@ import kafka.consumer.ConsumerConfig
import kafka.message.MessageSet
import org.apache.samza.SamzaException
import org.apache.samza.util.ExponentialSleepStrategy
+import org.apache.samza.util.KafkaUtil
import org.apache.samza.util.Logging
-import org.apache.samza.util.ThreadNamePrefix.SAMZA_THREAD_NAME_PREFIX
import scala.collection.JavaConverters._
import scala.collection.concurrent
-import org.apache.samza.util.KafkaUtil
-
-/**
- * Companion object for class JvmMetrics encapsulating various constants
- */
-object BrokerProxy {
- val BROKER_PROXY_THREAD_NAME_PREFIX = "BROKER-PROXY-"
-}
/**
* A BrokerProxy consolidates Kafka fetches meant for a particular broker and retrieves them all at once, providing
@@ -294,7 +286,7 @@ class BrokerProxy(
if (!thread.isAlive) {
info("Starting " + toString)
thread.setDaemon(true)
- thread.setName(SAMZA_THREAD_NAME_PREFIX + BrokerProxy.BROKER_PROXY_THREAD_NAME_PREFIX + thread.getName)
+ thread.setName("Samza BrokerProxy " + thread.getName)
thread.setUncaughtExceptionHandler(new UncaughtExceptionHandler {
override def uncaughtException(t: Thread, e: Throwable) = error("Uncaught exception in broker proxy:", e)
})
http://git-wip-us.apache.org/repos/asf/samza/blob/1dfc5cec/samza-sql/src/main/java/org/apache/samza/sql/interfaces/RelSchemaProviderFactory.java
----------------------------------------------------------------------
diff --git a/samza-sql/src/main/java/org/apache/samza/sql/interfaces/RelSchemaProviderFactory.java b/samza-sql/src/main/java/org/apache/samza/sql/interfaces/RelSchemaProviderFactory.java
index c614cdf..f9e7cd6 100644
--- a/samza-sql/src/main/java/org/apache/samza/sql/interfaces/RelSchemaProviderFactory.java
+++ b/samza-sql/src/main/java/org/apache/samza/sql/interfaces/RelSchemaProviderFactory.java
@@ -26,7 +26,8 @@ public interface RelSchemaProviderFactory {
/**
* Create a {@link RelSchemaProvider} given the config
- * @param config Config needed to create the {@link RelSchemaProvider}
+ * @param systemStream the system stream to create the {@link RelSchemaProvider} for
+ * @param config config needed to create the {@link RelSchemaProvider}
* @return {@link RelSchemaProvider} object created.
*/
RelSchemaProvider create(SystemStream systemStream, Config config);
http://git-wip-us.apache.org/repos/asf/samza/blob/1dfc5cec/samza-sql/src/main/java/org/apache/samza/sql/interfaces/SamzaRelConverterFactory.java
----------------------------------------------------------------------
diff --git a/samza-sql/src/main/java/org/apache/samza/sql/interfaces/SamzaRelConverterFactory.java b/samza-sql/src/main/java/org/apache/samza/sql/interfaces/SamzaRelConverterFactory.java
index f239df6..0a6f275 100644
--- a/samza-sql/src/main/java/org/apache/samza/sql/interfaces/SamzaRelConverterFactory.java
+++ b/samza-sql/src/main/java/org/apache/samza/sql/interfaces/SamzaRelConverterFactory.java
@@ -31,9 +31,10 @@ public interface SamzaRelConverterFactory {
/**
* Create a {@link SamzaRelConverter}. This method is called when the framework wants to create the
* {@link SamzaRelConverter} corresponding to the system.
- * @param config
- * config that is used to create the object
- * @return Returns the object created.
+ * @param systemStream the systemStream to create a converter for
+ * @param relSchemaProvider the relational schema provider
+ * @param config config that is used to create the object
+ * @return the object created.
*/
SamzaRelConverter create(SystemStream systemStream, RelSchemaProvider relSchemaProvider, Config config);
}
http://git-wip-us.apache.org/repos/asf/samza/blob/1dfc5cec/samza-tools/src/main/java/org/apache/samza/tools/GenerateKafkaEvents.java
----------------------------------------------------------------------
diff --git a/samza-tools/src/main/java/org/apache/samza/tools/GenerateKafkaEvents.java b/samza-tools/src/main/java/org/apache/samza/tools/GenerateKafkaEvents.java
index 6c30eee..2beef06 100644
--- a/samza-tools/src/main/java/org/apache/samza/tools/GenerateKafkaEvents.java
+++ b/samza-tools/src/main/java/org/apache/samza/tools/GenerateKafkaEvents.java
@@ -172,11 +172,11 @@ public class GenerateKafkaEvents {
/**
* Encode an Avro record into byte array
*
- * @param clazz The class type of the Avro record
+ * @param clazz The class type of the avro record
* @param record the instance of the avro record
* @param <T> The type of the avro record.
* @return encoded bytes
- * @throws java.io.IOException
+ * @throws IOException on I/O errors encoding the avro record
*/
public static <T> byte[] encodeAvroSpecificRecord(Class<T> clazz, T record) throws IOException {
DatumWriter<T> msgDatumWriter = new SpecificDatumWriter<>(clazz);
http://git-wip-us.apache.org/repos/asf/samza/blob/1dfc5cec/samza-yarn/src/main/java/org/apache/samza/job/yarn/YarnClusterResourceManager.java
----------------------------------------------------------------------
diff --git a/samza-yarn/src/main/java/org/apache/samza/job/yarn/YarnClusterResourceManager.java b/samza-yarn/src/main/java/org/apache/samza/job/yarn/YarnClusterResourceManager.java
index 9be8475..695b35a 100644
--- a/samza-yarn/src/main/java/org/apache/samza/job/yarn/YarnClusterResourceManager.java
+++ b/samza-yarn/src/main/java/org/apache/samza/job/yarn/YarnClusterResourceManager.java
@@ -539,7 +539,7 @@ public class YarnClusterResourceManager extends ClusterResourceManager implement
* @param samzaContainerId id of the samza Container to run (passed as a command line parameter to the process)
* @param container the samza container to run.
* @param cmdBuilder the command builder that encapsulates the command, and the context
- *
+ * @throws IOException on IO exceptions running the container
*/
public void runContainer(String samzaContainerId, Container container, CommandBuilder cmdBuilder) throws IOException {
String containerIdStr = ConverterUtils.toString(container.getId());
http://git-wip-us.apache.org/repos/asf/samza/blob/1dfc5cec/samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMasterSecurityManager.scala
----------------------------------------------------------------------
diff --git a/samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMasterSecurityManager.scala b/samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMasterSecurityManager.scala
index 8dba96a..185cda0 100644
--- a/samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMasterSecurityManager.scala
+++ b/samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMasterSecurityManager.scala
@@ -19,20 +19,22 @@
package org.apache.samza.job.yarn
-import java.security.PrivilegedExceptionAction
-import java.util.concurrent.{TimeUnit, Executors}
-
+import com.google.common.util.concurrent.ThreadFactoryBuilder
import org.apache.hadoop.conf.Configuration
-import org.apache.hadoop.fs.{Path, FileSystem}
-import org.apache.hadoop.security.{Credentials, UserGroupInformation}
+import org.apache.hadoop.fs.FileSystem
+import org.apache.hadoop.fs.Path
+import org.apache.hadoop.security.Credentials
+import org.apache.hadoop.security.UserGroupInformation
import org.apache.samza.SamzaException
-import org.apache.samza.config.{YarnConfig, Config}
-import org.apache.samza.util.{DaemonThreadFactory, Logging}
+import org.apache.samza.config.Config
+import org.apache.samza.config.YarnConfig
import org.apache.samza.container.SecurityManager
+import org.apache.samza.util.Logging
+
+import java.security.PrivilegedExceptionAction
+import java.util.concurrent.Executors
+import java.util.concurrent.TimeUnit
-object SamzaAppMasterSecurityManager {
- val TOKEN_RENEW_THREAD_NAME_PREFIX = "TOKEN-RENEW-PREFIX"
-}
/**
* The SamzaAppMasterSecurityManager is responsible for renewing and distributing HDFS delegation tokens on a secure YARN
@@ -47,8 +49,11 @@ object SamzaAppMasterSecurityManager {
* @param hadoopConf the hadoop configuration
*/
class SamzaAppMasterSecurityManager(config: Config, hadoopConf: Configuration) extends SecurityManager with Logging {
- private val tokenRenewExecutor = Executors.newSingleThreadScheduledExecutor(new DaemonThreadFactory(SamzaAppMasterSecurityManager
- .TOKEN_RENEW_THREAD_NAME_PREFIX))
+ private val tokenRenewExecutor = Executors.newSingleThreadScheduledExecutor(
+ new ThreadFactoryBuilder()
+ .setNameFormat("Samza AMSecurityManager TokenRenewer Thread-%d")
+ .setDaemon(true)
+ .build())
def start() = {
val yarnConfig = new YarnConfig(config)
http://git-wip-us.apache.org/repos/asf/samza/blob/1dfc5cec/samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaContainerSecurityManager.scala
----------------------------------------------------------------------
diff --git a/samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaContainerSecurityManager.scala b/samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaContainerSecurityManager.scala
index 10b971f..fce840c 100644
--- a/samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaContainerSecurityManager.scala
+++ b/samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaContainerSecurityManager.scala
@@ -19,24 +19,30 @@
package org.apache.samza.job.yarn
-import java.util.concurrent.{TimeUnit, Executors}
-
+import com.google.common.util.concurrent.ThreadFactoryBuilder
import org.apache.hadoop.conf.Configuration
-import org.apache.hadoop.fs.{FileSystem, Path}
-import org.apache.hadoop.security.{UserGroupInformation, Credentials}
-import org.apache.samza.config.{Config, YarnConfig}
-import org.apache.samza.util.{Logging, DaemonThreadFactory}
+import org.apache.hadoop.fs.FileSystem
+import org.apache.hadoop.fs.Path
+import org.apache.hadoop.security.Credentials
+import org.apache.hadoop.security.UserGroupInformation
+import org.apache.samza.config.Config
+import org.apache.samza.config.YarnConfig
import org.apache.samza.container.SecurityManager
+import org.apache.samza.util.Logging
-object SamzaContainerSecurityManager {
- val TOKEN_RENEW_THREAD_NAME_PREFIX = "TOKEN-RENEW-PREFIX"
- val INITIAL_DELAY_IN_SECONDS = 60
-}
+import java.util.concurrent.Executors
+import java.util.concurrent.TimeUnit
class SamzaContainerSecurityManager(config: Config, hadoopConfig: Configuration) extends SecurityManager with Logging {
- private val tokenRenewExecutor = Executors.newScheduledThreadPool(1, new DaemonThreadFactory(SamzaContainerSecurityManager.TOKEN_RENEW_THREAD_NAME_PREFIX))
- private var lastRefreshTimestamp = 0L
+ private val InitialDelayInSeconds = 60
+
+ private val tokenRenewExecutor = Executors.newSingleThreadScheduledExecutor(
+ new ThreadFactoryBuilder()
+ .setNameFormat("Samza ContainerSecurityManager TokenRenewer Thread-%d")
+ .setDaemon(true)
+ .build())
+ private var lastRefreshTimestamp = 0L
def start() = {
val yarnConfig = new YarnConfig(config)
@@ -75,8 +81,8 @@ class SamzaContainerSecurityManager(config: Config, hadoopConfig: Configuration)
}
}
- info(s"Schedule the next fetch in ${renewalInterval + SamzaContainerSecurityManager.INITIAL_DELAY_IN_SECONDS} seconds")
- tokenRenewExecutor.schedule(tokenRenewRunnable, renewalInterval + SamzaContainerSecurityManager.INITIAL_DELAY_IN_SECONDS, TimeUnit.SECONDS)
+ info(s"Schedule the next fetch in ${renewalInterval + InitialDelayInSeconds} seconds")
+ tokenRenewExecutor.schedule(tokenRenewRunnable, renewalInterval + InitialDelayInSeconds, TimeUnit.SECONDS)
}
private def getCredentialsFromHDFS(fs: FileSystem, tokenPath: Path): Credentials = {