You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by na...@apache.org on 2016/12/24 01:04:03 UTC

[6/7] samza git commit: added configs for EI3 testing, thread naming, cleanup

added configs for EI3 testing, thread naming, cleanup


Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/0a2b63a4
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/0a2b63a4
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/0a2b63a4

Branch: refs/heads/samza-standalone
Commit: 0a2b63a44ab45a7c3a9b6d8743a5839e7d08cba4
Parents: e37f910
Author: navina <na...@apache.org>
Authored: Fri Dec 23 17:02:45 2016 -0800
Committer: navina <na...@apache.org>
Committed: Fri Dec 23 17:02:45 2016 -0800

----------------------------------------------------------------------
 .../apache/samza/processor/SamzaContainerController.java  | 10 ++++++++--
 .../java/org/apache/samza/processor/StreamProcessor.java  |  1 +
 .../org/apache/samza/zk/ScheduleAfterDebounceTime.java    |  4 +++-
 .../main/java/org/apache/samza/zk/ZkJobCoordinator.java   |  1 +
 samza-core/src/main/java/org/apache/samza/zk/ZkUtils.java |  8 ++++++++
 5 files changed, 21 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/samza/blob/0a2b63a4/samza-core/src/main/java/org/apache/samza/processor/SamzaContainerController.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/processor/SamzaContainerController.java b/samza-core/src/main/java/org/apache/samza/processor/SamzaContainerController.java
index 9352f27..fb227d0 100644
--- a/samza-core/src/main/java/org/apache/samza/processor/SamzaContainerController.java
+++ b/samza-core/src/main/java/org/apache/samza/processor/SamzaContainerController.java
@@ -1,5 +1,6 @@
 package org.apache.samza.processor;
 
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
 import org.apache.samza.config.ClusterManagerConfig;
 import org.apache.samza.config.Config;
 import org.apache.samza.config.TaskConfigJava;
@@ -45,8 +46,10 @@ public class SamzaContainerController {
   public SamzaContainerController (
       Object taskFactory,
       long containerShutdownMs,
+      String processorId,
       Map<String, MetricsReporter> metricsReporterMap) {
-    this.executorService = Executors.newSingleThreadExecutor();
+    this.executorService = Executors.newSingleThreadExecutor(new ThreadFactoryBuilder()
+        .setNameFormat("p" + processorId + "-container-thread-%d").build());
     this.taskFactory = taskFactory;
     this.metricsReporterMap = metricsReporterMap;
     if (containerShutdownMs == -1) {
@@ -73,6 +76,7 @@ public class SamzaContainerController {
     if (new ClusterManagerConfig(config).getHostAffinityEnabled()) {
       localityManager = SamzaContainer$.MODULE$.getLocalityManager(containerModel.getContainerId(), config);
     }
+    log.info("About to create container: " + containerModel.getContainerId());
     container = SamzaContainer$.MODULE$.apply(
         containerModel.getContainerId(),
         containerModel,
@@ -82,6 +86,7 @@ public class SamzaContainerController {
         new JmxServer(),
         Util.<String, MetricsReporter>javaMapAsScalaMap(metricsReporterMap),
         taskFactory);
+    log.info("About to start container: " + containerModel.getContainerId());
     containerFuture = executorService.submit(() -> container.run());
   }
 
@@ -109,7 +114,8 @@ public class SamzaContainerController {
 
     container.shutdown();
     try {
-      containerFuture.get(containerShutdownMs, TimeUnit.MILLISECONDS);
+      if(containerFuture != null)
+        containerFuture.get(containerShutdownMs, TimeUnit.MILLISECONDS);
     } catch (InterruptedException | ExecutionException e) {
       log.error("Ran into problems while trying to stop the container in the processor!", e);
     } catch (TimeoutException e) {

http://git-wip-us.apache.org/repos/asf/samza/blob/0a2b63a4/samza-core/src/main/java/org/apache/samza/processor/StreamProcessor.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/processor/StreamProcessor.java b/samza-core/src/main/java/org/apache/samza/processor/StreamProcessor.java
index 61795e1..596f8f4 100644
--- a/samza-core/src/main/java/org/apache/samza/processor/StreamProcessor.java
+++ b/samza-core/src/main/java/org/apache/samza/processor/StreamProcessor.java
@@ -121,6 +121,7 @@ public class StreamProcessor {
     this.containerController = new SamzaContainerController(
         taskFactory,
         new TaskConfigJava(updatedConfig).getShutdownMs(),
+        String.valueOf(processorId),
         customMetricsReporters);
 
     this.jobCoordinator = Util.

http://git-wip-us.apache.org/repos/asf/samza/blob/0a2b63a4/samza-core/src/main/java/org/apache/samza/zk/ScheduleAfterDebounceTime.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/zk/ScheduleAfterDebounceTime.java b/samza-core/src/main/java/org/apache/samza/zk/ScheduleAfterDebounceTime.java
index 1854de6..e217dab 100644
--- a/samza-core/src/main/java/org/apache/samza/zk/ScheduleAfterDebounceTime.java
+++ b/samza-core/src/main/java/org/apache/samza/zk/ScheduleAfterDebounceTime.java
@@ -1,5 +1,6 @@
 package org.apache.samza.zk;
 
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.concurrent.Executors;
@@ -19,7 +20,8 @@ public class ScheduleAfterDebounceTime {
   public static final int DEBOUNCE_TIME_MS = 2000;
 
 
-  private final ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(1);
+  private final ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(
+      new ThreadFactoryBuilder().setNameFormat("zk-debounce-thread-%d").setDaemon(true).build());
   private final Map<String, ScheduledFuture> futureHandles = new HashMap<>();
 
   public ScheduleAfterDebounceTime () {

http://git-wip-us.apache.org/repos/asf/samza/blob/0a2b63a4/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinator.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinator.java b/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinator.java
index 560e19b..f661547 100644
--- a/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinator.java
+++ b/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinator.java
@@ -95,6 +95,7 @@ public class ZkJobCoordinator implements JobCoordinator, ZkListener {
   @Override
   public void stop() {
     zkController.stop();
+    containerController.shutdown();
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/samza/blob/0a2b63a4/samza-core/src/main/java/org/apache/samza/zk/ZkUtils.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/zk/ZkUtils.java b/samza-core/src/main/java/org/apache/samza/zk/ZkUtils.java
index 0be2b04..de8c213 100644
--- a/samza-core/src/main/java/org/apache/samza/zk/ZkUtils.java
+++ b/samza-core/src/main/java/org/apache/samza/zk/ZkUtils.java
@@ -185,7 +185,15 @@ public class ZkUtils {
   }
 
   public void close() {
+    try {
+      zkConnnection.close();
+    } catch (InterruptedException e) {
+      e.printStackTrace();
+    }
     zkClient.close();
+
+    if(debounceTimer != null)
+      debounceTimer.stopScheduler();
   }
 
   public void deleteRoot() {