You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by jm...@apache.org on 2019/02/07 17:00:47 UTC

[samza] branch master updated: SAMZA-2101: code cleanup and refactoring

This is an automated email from the ASF dual-hosted git repository.

jmakes pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/samza.git


The following commit(s) were added to refs/heads/master by this push:
     new a9ab7ab  SAMZA-2101: code cleanup and refactoring
     new 1bdbfc2  Merge pull request #910 from strkkk/SAMZA-2101
a9ab7ab is described below

commit a9ab7ab3d02f6b6a48fdb61589a1bbe46a4b6f70
Author: strkkk <an...@gmail.com>
AuthorDate: Thu Feb 7 13:38:35 2019 +0300

    SAMZA-2101: code cleanup and refactoring
---
 .../samza/autoscaling/deployer/ConfigManager.java  | 24 +++++++++++-----------
 .../apache/samza/autoscaling/utils/YarnUtil.java   | 18 ++++++++--------
 .../samza/autoscaling/utils/YarnUtilTest.java      |  2 +-
 3 files changed, 21 insertions(+), 23 deletions(-)

diff --git a/samza-autoscaling/src/main/java/org/apache/samza/autoscaling/deployer/ConfigManager.java b/samza-autoscaling/src/main/java/org/apache/samza/autoscaling/deployer/ConfigManager.java
index d709254..576be6a 100644
--- a/samza-autoscaling/src/main/java/org/apache/samza/autoscaling/deployer/ConfigManager.java
+++ b/samza-autoscaling/src/main/java/org/apache/samza/autoscaling/deployer/ConfigManager.java
@@ -39,6 +39,8 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
+import java.util.Arrays;
+import java.util.Collections;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
@@ -102,7 +104,7 @@ public class ConfigManager {
     if (config.containsKey(pollingIntervalOpt)) {
       long pollingInterval = config.getLong(pollingIntervalOpt);
       if (pollingInterval <= 0) {
-        throw new IllegalArgumentException("polling interval cannot be a negative value");
+        throw new IllegalArgumentException("polling interval should be greater than 0");
       }
       this.interval = pollingInterval;
     } else {
@@ -119,7 +121,7 @@ public class ConfigManager {
    * Then it reacts accordingly based on the configuration that is being set.
    * The method the calls the start() method to initialized the system, runs in a infinite loop, and calls the stop() method at the end to stop the consumer and the system
    */
-  public void run() {
+  private void run() {
     start();
     try {
       while (true) {
@@ -139,7 +141,7 @@ public class ConfigManager {
   /**
    * Starts the system by starting the consumer
    */
-  public void start() {
+  private void start() {
     register();
     coordinatorStreamConsumer.start();
     coordinatorStreamIterator = coordinatorStreamConsumer.getStartIterator();
@@ -149,7 +151,7 @@ public class ConfigManager {
   /**
    * stops the consumer making the system ready to stop
    */
-  public void stop() {
+  private void stop() {
     coordinatorStreamConsumer.stop();
     coordinatorServerURL = null;
     yarnUtil.stop();
@@ -181,17 +183,15 @@ public class ConfigManager {
    * This method just reads the messages, and it does not react to them or change any configuration of the system.
    */
   private void skipUnreadMessages() {
-    processConfigMessages(new LinkedList<String>());
+    processConfigMessages(Collections.emptyList());
     log.info("Config manager skipped messages");
   }
 
   /**
    * This function reads all the messages with "set-config" type added to the coordinator stream since the last time the method was invoked
    */
-  public void processConfigMessages() {
-    List<String> keysToProcess = new LinkedList<>();
-    keysToProcess.add(YARN_CONTAINER_COUNT_OPT);
-    keysToProcess.add(SERVER_URL_OPT);
+  private void processConfigMessages() {
+    List<String> keysToProcess = Arrays.asList(YARN_CONTAINER_COUNT_OPT, SERVER_URL_OPT);
     processConfigMessages(keysToProcess);
   }
 
@@ -234,7 +234,7 @@ public class ConfigManager {
 
         //TODO: change the handlers to implement a common interface, to make them pluggable
       } catch (Exception e) {
-        log.debug("Error in reading a message, skipping message with key " + key);
+        log.error("Error in reading a message, skipping message with key " + key);
       }
 
     }
@@ -323,7 +323,7 @@ public class ConfigManager {
    *
    * @return current number of tasks in the job
    */
-  public int getCurrentNumTasks() {
+  private int getCurrentNumTasks() {
     int currentNumTasks = 0;
     for (ContainerModel containerModel : SamzaContainer.readJobModel(coordinatorServerURL, defaultReadJobModelDelayMs).getContainers().values()) {
       currentNumTasks += containerModel.getTasks().size();
@@ -337,7 +337,7 @@ public class ConfigManager {
    *
    * @return current number of containers in the job
    */
-  public int getCurrentNumContainers() {
+  private int getCurrentNumContainers() {
     return SamzaContainer.readJobModel(coordinatorServerURL, defaultReadJobModelDelayMs).getContainers().values().size();
   }
 
diff --git a/samza-autoscaling/src/main/java/org/apache/samza/autoscaling/utils/YarnUtil.java b/samza-autoscaling/src/main/java/org/apache/samza/autoscaling/utils/YarnUtil.java
index cab46b9..7331f61 100644
--- a/samza-autoscaling/src/main/java/org/apache/samza/autoscaling/utils/YarnUtil.java
+++ b/samza-autoscaling/src/main/java/org/apache/samza/autoscaling/utils/YarnUtil.java
@@ -44,12 +44,12 @@ import java.util.Map;
  */
 public class YarnUtil {
   private static final Logger log = LoggerFactory.getLogger(YarnUtil.class);
-  private CloseableHttpClient httpclient;
-  private HttpHost rmServer;
-  private YarnClient yarnClient;
+  private final CloseableHttpClient httpClient;
+  private final HttpHost rmServer;
+  private final YarnClient yarnClient;
 
   public YarnUtil(String rmAddress, int rmPort) {
-    this.httpclient = HttpClientBuilder.create().build();
+    this.httpClient = HttpClientBuilder.create().build();
     this.rmServer = new HttpHost(rmAddress, rmPort, "http");
     log.info("setting rm server to : " + rmServer);
     YarnConfiguration hConfig = new YarnConfiguration();
@@ -70,17 +70,15 @@ public class YarnUtil {
 
     try {
       HttpGet getRequest = new HttpGet("/ws/v1/cluster/apps");
-      HttpResponse httpResponse = httpclient.execute(rmServer, getRequest);
+      HttpResponse httpResponse = httpClient.execute(rmServer, getRequest);
       String applications = EntityUtils.toString(httpResponse.getEntity());
       log.debug("applications: " + applications);
 
       List<Map<String, String>> applicationList = parseYarnApplications(applications);
       String name = jobName + "_" + jobID;
       for (Map<String, String> application : applicationList) {
-        if (application.containsKey("state") && application.containsKey("name") && application.containsKey("id")) {
-          if (application.get("state").toString().equals("RUNNING") && application.get("name").toString().equals(name)) {
-            return application.get("id").toString();
-          }
+        if ("RUNNING".equals(application.get("state")) && name.equals(application.get("name")) && application.containsKey("id")) {
+          return application.get("id");
         }
       }
     } catch (NullPointerException | IOException e) {
@@ -150,7 +148,7 @@ public class YarnUtil {
    */
   public void stop() {
     try {
-      httpclient.close();
+      httpClient.close();
     } catch (IOException e) {
       log.error("HTTP Client failed to close.", e);
     }
diff --git a/samza-autoscaling/src/test/java/org/apache/samza/autoscaling/utils/YarnUtilTest.java b/samza-autoscaling/src/test/java/org/apache/samza/autoscaling/utils/YarnUtilTest.java
index 97ccb2d..7b4b74e 100644
--- a/samza-autoscaling/src/test/java/org/apache/samza/autoscaling/utils/YarnUtilTest.java
+++ b/samza-autoscaling/src/test/java/org/apache/samza/autoscaling/utils/YarnUtilTest.java
@@ -1,4 +1,4 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information