You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by jm...@apache.org on 2019/02/07 17:00:47 UTC
[samza] branch master updated: SAMZA-2101: code cleanup and
refactoring
This is an automated email from the ASF dual-hosted git repository.
jmakes pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/samza.git
The following commit(s) were added to refs/heads/master by this push:
new a9ab7ab SAMZA-2101: code cleanup and refactoring
new 1bdbfc2 Merge pull request #910 from strkkk/SAMZA-2101
a9ab7ab is described below
commit a9ab7ab3d02f6b6a48fdb61589a1bbe46a4b6f70
Author: strkkk <an...@gmail.com>
AuthorDate: Thu Feb 7 13:38:35 2019 +0300
SAMZA-2101: code cleanup and refactoring
---
.../samza/autoscaling/deployer/ConfigManager.java | 24 +++++++++++-----------
.../apache/samza/autoscaling/utils/YarnUtil.java | 18 ++++++++--------
.../samza/autoscaling/utils/YarnUtilTest.java | 2 +-
3 files changed, 21 insertions(+), 23 deletions(-)
diff --git a/samza-autoscaling/src/main/java/org/apache/samza/autoscaling/deployer/ConfigManager.java b/samza-autoscaling/src/main/java/org/apache/samza/autoscaling/deployer/ConfigManager.java
index d709254..576be6a 100644
--- a/samza-autoscaling/src/main/java/org/apache/samza/autoscaling/deployer/ConfigManager.java
+++ b/samza-autoscaling/src/main/java/org/apache/samza/autoscaling/deployer/ConfigManager.java
@@ -39,6 +39,8 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
+import java.util.Arrays;
+import java.util.Collections;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
@@ -102,7 +104,7 @@ public class ConfigManager {
if (config.containsKey(pollingIntervalOpt)) {
long pollingInterval = config.getLong(pollingIntervalOpt);
if (pollingInterval <= 0) {
- throw new IllegalArgumentException("polling interval cannot be a negative value");
+ throw new IllegalArgumentException("polling interval should be greater than 0");
}
this.interval = pollingInterval;
} else {
@@ -119,7 +121,7 @@ public class ConfigManager {
* Then it reacts accordingly based on the configuration that is being set.
* The method the calls the start() method to initialized the system, runs in a infinite loop, and calls the stop() method at the end to stop the consumer and the system
*/
- public void run() {
+ private void run() {
start();
try {
while (true) {
@@ -139,7 +141,7 @@ public class ConfigManager {
/**
* Starts the system by starting the consumer
*/
- public void start() {
+ private void start() {
register();
coordinatorStreamConsumer.start();
coordinatorStreamIterator = coordinatorStreamConsumer.getStartIterator();
@@ -149,7 +151,7 @@ public class ConfigManager {
/**
* stops the consumer making the system ready to stop
*/
- public void stop() {
+ private void stop() {
coordinatorStreamConsumer.stop();
coordinatorServerURL = null;
yarnUtil.stop();
@@ -181,17 +183,15 @@ public class ConfigManager {
* This method just reads the messages, and it does not react to them or change any configuration of the system.
*/
private void skipUnreadMessages() {
- processConfigMessages(new LinkedList<String>());
+ processConfigMessages(Collections.emptyList());
log.info("Config manager skipped messages");
}
/**
* This function reads all the messages with "set-config" type added to the coordinator stream since the last time the method was invoked
*/
- public void processConfigMessages() {
- List<String> keysToProcess = new LinkedList<>();
- keysToProcess.add(YARN_CONTAINER_COUNT_OPT);
- keysToProcess.add(SERVER_URL_OPT);
+ private void processConfigMessages() {
+ List<String> keysToProcess = Arrays.asList(YARN_CONTAINER_COUNT_OPT, SERVER_URL_OPT);
processConfigMessages(keysToProcess);
}
@@ -234,7 +234,7 @@ public class ConfigManager {
//TODO: change the handlers to implement a common interface, to make them pluggable
} catch (Exception e) {
- log.debug("Error in reading a message, skipping message with key " + key);
+ log.error("Error in reading a message, skipping message with key " + key);
}
}
@@ -323,7 +323,7 @@ public class ConfigManager {
*
* @return current number of tasks in the job
*/
- public int getCurrentNumTasks() {
+ private int getCurrentNumTasks() {
int currentNumTasks = 0;
for (ContainerModel containerModel : SamzaContainer.readJobModel(coordinatorServerURL, defaultReadJobModelDelayMs).getContainers().values()) {
currentNumTasks += containerModel.getTasks().size();
@@ -337,7 +337,7 @@ public class ConfigManager {
*
* @return current number of containers in the job
*/
- public int getCurrentNumContainers() {
+ private int getCurrentNumContainers() {
return SamzaContainer.readJobModel(coordinatorServerURL, defaultReadJobModelDelayMs).getContainers().values().size();
}
diff --git a/samza-autoscaling/src/main/java/org/apache/samza/autoscaling/utils/YarnUtil.java b/samza-autoscaling/src/main/java/org/apache/samza/autoscaling/utils/YarnUtil.java
index cab46b9..7331f61 100644
--- a/samza-autoscaling/src/main/java/org/apache/samza/autoscaling/utils/YarnUtil.java
+++ b/samza-autoscaling/src/main/java/org/apache/samza/autoscaling/utils/YarnUtil.java
@@ -44,12 +44,12 @@ import java.util.Map;
*/
public class YarnUtil {
private static final Logger log = LoggerFactory.getLogger(YarnUtil.class);
- private CloseableHttpClient httpclient;
- private HttpHost rmServer;
- private YarnClient yarnClient;
+ private final CloseableHttpClient httpClient;
+ private final HttpHost rmServer;
+ private final YarnClient yarnClient;
public YarnUtil(String rmAddress, int rmPort) {
- this.httpclient = HttpClientBuilder.create().build();
+ this.httpClient = HttpClientBuilder.create().build();
this.rmServer = new HttpHost(rmAddress, rmPort, "http");
log.info("setting rm server to : " + rmServer);
YarnConfiguration hConfig = new YarnConfiguration();
@@ -70,17 +70,15 @@ public class YarnUtil {
try {
HttpGet getRequest = new HttpGet("/ws/v1/cluster/apps");
- HttpResponse httpResponse = httpclient.execute(rmServer, getRequest);
+ HttpResponse httpResponse = httpClient.execute(rmServer, getRequest);
String applications = EntityUtils.toString(httpResponse.getEntity());
log.debug("applications: " + applications);
List<Map<String, String>> applicationList = parseYarnApplications(applications);
String name = jobName + "_" + jobID;
for (Map<String, String> application : applicationList) {
- if (application.containsKey("state") && application.containsKey("name") && application.containsKey("id")) {
- if (application.get("state").toString().equals("RUNNING") && application.get("name").toString().equals(name)) {
- return application.get("id").toString();
- }
+ if ("RUNNING".equals(application.get("state")) && name.equals(application.get("name")) && application.containsKey("id")) {
+ return application.get("id");
}
}
} catch (NullPointerException | IOException e) {
@@ -150,7 +148,7 @@ public class YarnUtil {
*/
public void stop() {
try {
- httpclient.close();
+ httpClient.close();
} catch (IOException e) {
log.error("HTTP Client failed to close.", e);
}
diff --git a/samza-autoscaling/src/test/java/org/apache/samza/autoscaling/utils/YarnUtilTest.java b/samza-autoscaling/src/test/java/org/apache/samza/autoscaling/utils/YarnUtilTest.java
index 97ccb2d..7b4b74e 100644
--- a/samza-autoscaling/src/test/java/org/apache/samza/autoscaling/utils/YarnUtilTest.java
+++ b/samza-autoscaling/src/test/java/org/apache/samza/autoscaling/utils/YarnUtilTest.java
@@ -1,4 +1,4 @@
-/**
+/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information