You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by na...@apache.org on 2016/12/24 01:04:03 UTC
[6/7] samza git commit: added configs for EI3 testing, thread naming,
cleanup
added configs for EI3 testing, thread naming, cleanup
Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/0a2b63a4
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/0a2b63a4
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/0a2b63a4
Branch: refs/heads/samza-standalone
Commit: 0a2b63a44ab45a7c3a9b6d8743a5839e7d08cba4
Parents: e37f910
Author: navina <na...@apache.org>
Authored: Fri Dec 23 17:02:45 2016 -0800
Committer: navina <na...@apache.org>
Committed: Fri Dec 23 17:02:45 2016 -0800
----------------------------------------------------------------------
.../apache/samza/processor/SamzaContainerController.java | 10 ++++++++--
.../java/org/apache/samza/processor/StreamProcessor.java | 1 +
.../org/apache/samza/zk/ScheduleAfterDebounceTime.java | 4 +++-
.../main/java/org/apache/samza/zk/ZkJobCoordinator.java | 1 +
samza-core/src/main/java/org/apache/samza/zk/ZkUtils.java | 8 ++++++++
5 files changed, 21 insertions(+), 3 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/samza/blob/0a2b63a4/samza-core/src/main/java/org/apache/samza/processor/SamzaContainerController.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/processor/SamzaContainerController.java b/samza-core/src/main/java/org/apache/samza/processor/SamzaContainerController.java
index 9352f27..fb227d0 100644
--- a/samza-core/src/main/java/org/apache/samza/processor/SamzaContainerController.java
+++ b/samza-core/src/main/java/org/apache/samza/processor/SamzaContainerController.java
@@ -1,5 +1,6 @@
package org.apache.samza.processor;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.apache.samza.config.ClusterManagerConfig;
import org.apache.samza.config.Config;
import org.apache.samza.config.TaskConfigJava;
@@ -45,8 +46,10 @@ public class SamzaContainerController {
public SamzaContainerController (
Object taskFactory,
long containerShutdownMs,
+ String processorId,
Map<String, MetricsReporter> metricsReporterMap) {
- this.executorService = Executors.newSingleThreadExecutor();
+ this.executorService = Executors.newSingleThreadExecutor(new ThreadFactoryBuilder()
+ .setNameFormat("p" + processorId + "-container-thread-%d").build());
this.taskFactory = taskFactory;
this.metricsReporterMap = metricsReporterMap;
if (containerShutdownMs == -1) {
@@ -73,6 +76,7 @@ public class SamzaContainerController {
if (new ClusterManagerConfig(config).getHostAffinityEnabled()) {
localityManager = SamzaContainer$.MODULE$.getLocalityManager(containerModel.getContainerId(), config);
}
+ log.info("About to create container: " + containerModel.getContainerId());
container = SamzaContainer$.MODULE$.apply(
containerModel.getContainerId(),
containerModel,
@@ -82,6 +86,7 @@ public class SamzaContainerController {
new JmxServer(),
Util.<String, MetricsReporter>javaMapAsScalaMap(metricsReporterMap),
taskFactory);
+ log.info("About to start container: " + containerModel.getContainerId());
containerFuture = executorService.submit(() -> container.run());
}
@@ -109,7 +114,8 @@ public class SamzaContainerController {
container.shutdown();
try {
- containerFuture.get(containerShutdownMs, TimeUnit.MILLISECONDS);
+ if(containerFuture != null)
+ containerFuture.get(containerShutdownMs, TimeUnit.MILLISECONDS);
} catch (InterruptedException | ExecutionException e) {
log.error("Ran into problems while trying to stop the container in the processor!", e);
} catch (TimeoutException e) {
http://git-wip-us.apache.org/repos/asf/samza/blob/0a2b63a4/samza-core/src/main/java/org/apache/samza/processor/StreamProcessor.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/processor/StreamProcessor.java b/samza-core/src/main/java/org/apache/samza/processor/StreamProcessor.java
index 61795e1..596f8f4 100644
--- a/samza-core/src/main/java/org/apache/samza/processor/StreamProcessor.java
+++ b/samza-core/src/main/java/org/apache/samza/processor/StreamProcessor.java
@@ -121,6 +121,7 @@ public class StreamProcessor {
this.containerController = new SamzaContainerController(
taskFactory,
new TaskConfigJava(updatedConfig).getShutdownMs(),
+ String.valueOf(processorId),
customMetricsReporters);
this.jobCoordinator = Util.
http://git-wip-us.apache.org/repos/asf/samza/blob/0a2b63a4/samza-core/src/main/java/org/apache/samza/zk/ScheduleAfterDebounceTime.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/zk/ScheduleAfterDebounceTime.java b/samza-core/src/main/java/org/apache/samza/zk/ScheduleAfterDebounceTime.java
index 1854de6..e217dab 100644
--- a/samza-core/src/main/java/org/apache/samza/zk/ScheduleAfterDebounceTime.java
+++ b/samza-core/src/main/java/org/apache/samza/zk/ScheduleAfterDebounceTime.java
@@ -1,5 +1,6 @@
package org.apache.samza.zk;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.Executors;
@@ -19,7 +20,8 @@ public class ScheduleAfterDebounceTime {
public static final int DEBOUNCE_TIME_MS = 2000;
- private final ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(1);
+ private final ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(
+ new ThreadFactoryBuilder().setNameFormat("zk-debounce-thread-%d").setDaemon(true).build());
private final Map<String, ScheduledFuture> futureHandles = new HashMap<>();
public ScheduleAfterDebounceTime () {
http://git-wip-us.apache.org/repos/asf/samza/blob/0a2b63a4/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinator.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinator.java b/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinator.java
index 560e19b..f661547 100644
--- a/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinator.java
+++ b/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinator.java
@@ -95,6 +95,7 @@ public class ZkJobCoordinator implements JobCoordinator, ZkListener {
@Override
public void stop() {
zkController.stop();
+ containerController.shutdown();
}
@Override
http://git-wip-us.apache.org/repos/asf/samza/blob/0a2b63a4/samza-core/src/main/java/org/apache/samza/zk/ZkUtils.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/zk/ZkUtils.java b/samza-core/src/main/java/org/apache/samza/zk/ZkUtils.java
index 0be2b04..de8c213 100644
--- a/samza-core/src/main/java/org/apache/samza/zk/ZkUtils.java
+++ b/samza-core/src/main/java/org/apache/samza/zk/ZkUtils.java
@@ -185,7 +185,15 @@ public class ZkUtils {
}
public void close() {
+ try {
+ zkConnnection.close();
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ }
zkClient.close();
+
+ if(debounceTimer != null)
+ debounceTimer.stopScheduler();
}
public void deleteRoot() {