You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by lh...@apache.org on 2020/02/06 23:31:22 UTC

[samza] branch 1.3.1 updated (a02c8c4 -> 17be867)

This is an automated email from the ASF dual-hosted git repository.

lhaiesp pushed a change to branch 1.3.1
in repository https://gitbox.apache.org/repos/asf/samza.git.


    from a02c8c4  SAMZA-2423: Heartbeat failure causes incorrect container shutdown (#1240)
     new 7d90cc2  Clean up unused org.apache.samza.autoscaling module (#1250)
     new 3c2863c  Fix the coordinator stream creation workflow.
     new e250678  SAMZA-2431: Fix the checkpoint and changelog topic auto-creation. (#1251)
     new cccca7c  Fix the RocksDB TTL type conversion in change log properties generation. (#1254)
     new 964252a  SAMZA-2446: Invoke onCheckpoint only for registered SSPs (#1260)
     new 17be867  SAMZA-2447: Checkpoint dir removal should only search in valid store dirs (#1261)

The 6 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 build.gradle                                       |  40 ---
 .../java/org/apache/samza/system/StreamSpec.java   |   4 +-
 .../samza/autoscaling/deployer/ConfigManager.java  | 376 ---------------------
 .../apache/samza/autoscaling/utils/YarnUtil.java   | 158 ---------
 .../samza/autoscaling/utils/YarnUtilTest.java      |  38 ---
 .../resources/exampleResourceManagerOutput.json    | 121 -------
 .../apache/samza/checkpoint/OffsetManager.scala    |  17 +-
 .../TransactionalStateTaskStorageManager.scala     |  12 +-
 .../samza/checkpoint/TestOffsetManager.scala       |  10 +-
 .../TestTransactionalStateTaskStorageManager.java  |  27 ++
 .../samza/system/kafka/KafkaSystemAdmin.java       |  17 +-
 .../org/apache/samza/config/KafkaConfig.scala      |   6 +-
 .../system/kafka/TestKafkaSystemAdminJava.java     |  65 ++++
 .../org/apache/samza/config/TestKafkaConfig.scala  |  65 +++-
 samza-shell/src/main/bash/run-config-manager.sh    |  25 --
 settings.gradle                                    |   1 -
 sonar-project.properties                           |   2 +-
 17 files changed, 200 insertions(+), 784 deletions(-)
 delete mode 100644 samza-autoscaling/src/main/java/org/apache/samza/autoscaling/deployer/ConfigManager.java
 delete mode 100644 samza-autoscaling/src/main/java/org/apache/samza/autoscaling/utils/YarnUtil.java
 delete mode 100644 samza-autoscaling/src/test/java/org/apache/samza/autoscaling/utils/YarnUtilTest.java
 delete mode 100644 samza-autoscaling/src/test/resources/exampleResourceManagerOutput.json
 delete mode 100755 samza-shell/src/main/bash/run-config-manager.sh


[samza] 01/06: Clean up unused org.apache.samza.autoscaling module (#1250)

Posted by lh...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

lhaiesp pushed a commit to branch 1.3.1
in repository https://gitbox.apache.org/repos/asf/samza.git

commit 7d90cc239306636dccf337eec5808e5c0f0a5dc1
Author: Ke Wu <ke...@icloud.com>
AuthorDate: Tue Jan 14 12:03:24 2020 -0800

    Clean up unused org.apache.samza.autoscaling module (#1250)
    
    Issues: samza-autoscaling module is not used.
    
    Changes: remove the unused module.
    
    API Changes:
    None
    
    Upgrade Instructions:
    None
    
    Usage Instructions:
    None
    
    Tests: build
---
 build.gradle                                       |  40 ---
 .../samza/autoscaling/deployer/ConfigManager.java  | 376 ---------------------
 .../apache/samza/autoscaling/utils/YarnUtil.java   | 158 ---------
 .../samza/autoscaling/utils/YarnUtilTest.java      |  38 ---
 .../resources/exampleResourceManagerOutput.json    | 121 -------
 samza-shell/src/main/bash/run-config-manager.sh    |  25 --
 settings.gradle                                    |   1 -
 sonar-project.properties                           |   2 +-
 8 files changed, 1 insertion(+), 760 deletions(-)

diff --git a/build.gradle b/build.gradle
index 8a78ab6..07d6145 100644
--- a/build.gradle
+++ b/build.gradle
@@ -261,46 +261,6 @@ project(":samza-aws_$scalaSuffix") {
   }
 }
 
-
-project(":samza-autoscaling_$scalaSuffix") {
-  apply plugin: 'scala'
-  apply plugin: 'checkstyle'
-
-  // Force scala joint compilation
-  sourceSets.main.scala.srcDir "src/main/java"
-  sourceSets.test.scala.srcDir "src/test/java"
-
-  // Disable the Javac compiler by forcing joint compilation by scalac. This is equivalent to setting
-  // tasks.compileTestJava.enabled = false
-  sourceSets.main.java.srcDirs = []
-  sourceSets.test.java.srcDirs = []
-
-  dependencies {
-    compile project(':samza-api')
-    compile project(":samza-core_$scalaSuffix")
-    compile "org.scala-lang:scala-library:$scalaVersion"
-    compile "org.slf4j:slf4j-api:$slf4jVersion"
-    compile "net.sf.jopt-simple:jopt-simple:$joptSimpleVersion"
-    compile "org.codehaus.jackson:jackson-mapper-asl:$jacksonVersion"
-    compile "org.eclipse.jetty:jetty-webapp:$jettyVersion"
-    compile("org.apache.hadoop:hadoop-yarn-client:$yarnVersion") {
-      exclude module: 'servlet-api'
-    }
-    compile("org.apache.hadoop:hadoop-common:$yarnVersion") {
-      exclude module: 'servlet-api'
-    }
-    compile "org.apache.httpcomponents:httpclient:$httpClientVersion"
-    testCompile "junit:junit:$junitVersion"
-    testCompile "org.mockito:mockito-core:$mockitoVersion"
-    testCompile "org.scalatest:scalatest_$scalaSuffix:$scalaTestVersion"
-  }
-
-  checkstyle {
-    configFile = new File(rootDir, "checkstyle/checkstyle.xml")
-    toolVersion = "$checkstyleVersion"
-  }
-}
-
 project(":samza-elasticsearch_$scalaSuffix") {
   apply plugin: 'java'
 
diff --git a/samza-autoscaling/src/main/java/org/apache/samza/autoscaling/deployer/ConfigManager.java b/samza-autoscaling/src/main/java/org/apache/samza/autoscaling/deployer/ConfigManager.java
deleted file mode 100644
index 1d319d6..0000000
--- a/samza-autoscaling/src/main/java/org/apache/samza/autoscaling/deployer/ConfigManager.java
+++ /dev/null
@@ -1,376 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-
-package org.apache.samza.autoscaling.deployer;
-
-import joptsimple.OptionSet;
-
-import org.apache.hadoop.yarn.exceptions.YarnException;
-import org.apache.samza.autoscaling.utils.YarnUtil;
-import org.apache.samza.config.Config;
-import org.apache.samza.config.JobConfig;
-import org.apache.samza.container.SamzaContainer;
-import org.apache.samza.coordinator.stream.messages.CoordinatorStreamMessage;
-import org.apache.samza.coordinator.stream.CoordinatorStreamSystemConsumer;
-import org.apache.samza.coordinator.stream.messages.SetConfig;
-import org.apache.samza.job.JobRunner;
-import org.apache.samza.job.model.ContainerModel;
-import org.apache.samza.metrics.MetricsRegistryMap;
-import org.apache.samza.system.SystemStreamPartitionIterator;
-import org.apache.samza.util.CommandLine;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Map;
-
-
-/**
- * This class is a separate module that runs along side with a job, and handles all config changes submitted to a job after the bootstrap of the job.
- * All config changes are written to the coordinator stream using the @Link{CoordinatorStreamWriter}.
- * The way this class works is that it reads all messages with type "set-config" written to the coordinator stream after
- * the bootstrap of the job, and it handles the messages accordingly.
- * The current configuration changes it handles are
- * 1. changing the number of containers of a job
- * 2. setting the server url for the first time (in order to get the JobModel).
- * In order to use this class the run() method should be called to react to changes,
- * or call the start(), processConfigMessages(), and stop() function instead.
- * Additionally, you have to add the following configurations to the config file:
- * yarn.rm.address=localhost //the ip of the resource manager in yarn
- * yarn.rm.port=8088 //the port of the resource manager http server
- * Additionally, the config manger will periodically poll the coordinator stream to see if there are any new messages.
- * This period is set to 100 ms by default. However, it can be configured by adding the following property to the input config file.
- * configManager.polling.interval=&lt; polling interval &gt;
- */
-
-public class ConfigManager {
-  private final CoordinatorStreamSystemConsumer coordinatorStreamConsumer;
-  private SystemStreamPartitionIterator coordinatorStreamIterator;
-  private static final Logger log = LoggerFactory.getLogger(ConfigManager.class);
-  private final long defaultPollingInterval = 100;
-  private final int defaultReadJobModelDelayMs = 100;
-  private final long interval;
-  private String coordinatorServerURL = null;
-  private final String jobName;
-  private final int jobID;
-  private Config config;
-
-  private YarnUtil yarnUtil;
-
-  private final String rmAddressOpt = "yarn.rm.address";
-  private final String rmPortOpt = "yarn.rm.port";
-  private final String pollingIntervalOpt = "configManager.polling.interval";
-  private static final String SERVER_URL_OPT = "samza.autoscaling.server.url";
-  private static final String YARN_CONTAINER_COUNT_OPT = "yarn.container.count";
-
-  public ConfigManager(Config config) {
-
-    //get rm address and port
-    if (!config.containsKey(rmAddressOpt) || !config.containsKey(rmPortOpt)) {
-      throw new IllegalArgumentException("Missing config: the config file does not contain the rm host or port.");
-    }
-    String rmAddress = config.get(rmAddressOpt);
-    int rmPort = config.getInt(rmPortOpt);
-
-    //get job name and id;
-    if (!config.containsKey(JobConfig.JOB_NAME)) {
-      throw new IllegalArgumentException("Missing config: the config does not contain the job name");
-    }
-    jobName = config.get(JobConfig.JOB_NAME);
-    jobID = config.getInt(JobConfig.JOB_ID, 1);
-
-    //set polling interval
-    if (config.containsKey(pollingIntervalOpt)) {
-      long pollingInterval = config.getLong(pollingIntervalOpt);
-      if (pollingInterval <= 0) {
-        throw new IllegalArgumentException("polling interval should be greater than 0");
-      }
-      this.interval = pollingInterval;
-    } else {
-      this.interval = defaultPollingInterval;
-    }
-
-    this.config = config;
-    this.coordinatorStreamConsumer = new CoordinatorStreamSystemConsumer(config, new MetricsRegistryMap());
-    this.yarnUtil = new YarnUtil(rmAddress, rmPort);
-  }
-
-  /**
-   * This method is an infinite loop that periodically checks if there are any new messages in the job coordinator stream, and reads them if they exist.
-   * Then it reacts accordingly based on the configuration that is being set.
-   * The method the calls the start() method to initialized the system, runs in a infinite loop, and calls the stop() method at the end to stop the consumer and the system
-   */
-  private void run() {
-    start();
-    try {
-      while (true) {
-        Thread.sleep(interval);
-        processConfigMessages();
-      }
-    } catch (InterruptedException e) {
-      e.printStackTrace();
-      log.warn("Got interrupt in config manager thread, so shutting down");
-      Thread.currentThread().interrupt();
-    } finally {
-      log.info("Stopping the config manager");
-      stop();
-    }
-  }
-
-  /**
-   * Starts the system by starting the consumer
-   */
-  private void start() {
-    register();
-    coordinatorStreamConsumer.start();
-    coordinatorStreamIterator = coordinatorStreamConsumer.getStartIterator();
-    bootstrap();
-  }
-
-  /**
-   * stops the consumer making the system ready to stop
-   */
-  private void stop() {
-    coordinatorStreamConsumer.stop();
-    coordinatorServerURL = null;
-    yarnUtil.stop();
-  }
-
-  /**
-   * registers the consumer
-   */
-  private void register() {
-    coordinatorStreamConsumer.register();
-  }
-
-
-  /**
-   * This function will bootstrap by reading all the unread messages until the moment of calling the function, and therefore find the server url.
-   */
-  private void bootstrap() {
-    List<String> keysToProcess = new LinkedList<>();
-    keysToProcess.add(SERVER_URL_OPT);
-    processConfigMessages(keysToProcess);
-    if (coordinatorServerURL == null) {
-      throw new IllegalStateException("coordinator server url is null, while the bootstrap has finished ");
-    }
-    log.info("Config manager bootstrapped");
-  }
-
-  /**
-   * notAValidEvent all the unread messages up to the time this function is called.
-   * This method just reads the messages, and it does not react to them or change any configuration of the system.
-   */
-  private void skipUnreadMessages() {
-    processConfigMessages(Collections.emptyList());
-    log.info("Config manager skipped messages");
-  }
-
-  /**
-   * This function reads all the messages with "set-config" type added to the coordinator stream since the last time the method was invoked
-   */
-  private void processConfigMessages() {
-    List<String> keysToProcess = Arrays.asList(YARN_CONTAINER_COUNT_OPT, SERVER_URL_OPT);
-    processConfigMessages(keysToProcess);
-  }
-
-  /**
-   * This function reads all the messages with "set-config" type added to the coordinator stream since the last time the method was invoked
-   *
-   * @param keysToProcess a list of keys to process. Only messages with these keys will call their handler function,
-   *                      and other messages will be skipped. If the list is empty all messages will be skipped.
-   */
-  @SuppressWarnings("unchecked")
-  private void processConfigMessages(List<String> keysToProcess) {
-    if (!coordinatorStreamConsumer.hasNewMessages(coordinatorStreamIterator)) {
-      return;
-    }
-    if (keysToProcess == null) {
-      throw new IllegalArgumentException("The keys to process list is null");
-    }
-    for (CoordinatorStreamMessage message : coordinatorStreamConsumer.getUnreadMessages(coordinatorStreamIterator, SetConfig.TYPE)) {
-      String key = null;
-      try {
-        SetConfig setConfigMessage = new SetConfig(message);
-        key = setConfigMessage.getKey();
-        Map<String, String> valuesMap = (Map<String, String>) setConfigMessage.getMessageMap().get("values");
-        String value = null;
-        if (valuesMap != null) {
-          value = valuesMap.get("value");
-        }
-
-        log.debug("Received set-config message with key: " + key + " and value: " + value);
-
-        if (keysToProcess.contains(key)) {
-          if (key.equals(YARN_CONTAINER_COUNT_OPT)) {
-            handleYarnContainerChange(value);
-          } else if (key.equals(SERVER_URL_OPT)) {
-            handleServerURLChange(value);
-          } else {
-            log.info("Setting the " + key + " configuration is currently not supported, skipping the message");
-          }
-        }
-
-        //TODO: change the handlers to implement a common interface, to make them pluggable
-      } catch (Exception e) {
-        log.error("Error in reading a message, skipping message with key " + key);
-      }
-
-    }
-
-  }
-
-  /**
-   * This method handle setConfig messages that want to change the url of the server the JobCoordinator has brought up.
-   *
-   * @param newServerURL the new value of the server URL
-   */
-  private void handleServerURLChange(String newServerURL) {
-    this.coordinatorServerURL = newServerURL;
-    log.info("Server URL being set to " + newServerURL);
-  }
-
-  /**
-   * This method handles setConfig messages that want to change the number of containers of a job
-   *
-   * @param containerCountAsString the new number of containers in a String format
-   */
-  private void handleYarnContainerChange(String containerCountAsString) throws IOException, YarnException {
-    String applicationId = yarnUtil.getRunningAppId(jobName, jobID);
-
-    int containerCount = Integer.valueOf(containerCountAsString);
-
-    //checking the input is valid
-    int currentNumTask = getCurrentNumTasks();
-    int currentNumContainers = getCurrentNumContainers();
-    if (containerCount == currentNumContainers) {
-      log.error("The new number of containers is equal to the current number of containers, skipping this message");
-      return;
-    }
-    if (containerCount <= 0) {
-      log.error("The number of containers cannot be zero or less, skipping this message");
-      return;
-    }
-
-
-    if (containerCount > currentNumTask) {
-      log.error("The number of containers cannot be more than the number of task, skipping this message");
-      return;
-    }
-
-
-    //killing the current job
-    log.info("Killing the current job");
-    yarnUtil.killApplication(applicationId);
-    //reset the global variables
-    coordinatorServerURL = null;
-
-
-    try {
-      //waiting for the job to be killed
-      String state = yarnUtil.getApplicationState(applicationId);
-      Thread.sleep(1000);
-      int countSleep = 1;
-
-      while (!state.equals("KILLED")) {
-        state = yarnUtil.getApplicationState(applicationId);
-        log.info("Job kill signal sent, but job not killed yet for " + applicationId + ". Sleeping for another 1000ms");
-        Thread.sleep(1000);
-        countSleep++;
-        if (countSleep > 10) {
-          throw new IllegalStateException("Job has not been killed after 10 attempts.");
-        }
-      }
-    } catch (InterruptedException e) {
-      e.printStackTrace();
-      Thread.currentThread().interrupt();
-    }
-
-    log.info("Killed the current job successfully");
-
-    //start the job again
-    log.info("Staring the job again");
-    skipUnreadMessages();
-    JobRunner jobRunner = new JobRunner(config);
-    jobRunner.run(false);
-  }
-
-
-  /**
-   * This method returns the number of tasks in the job. It works by querying the server, and getting the job model.
-   * Then it extracts the number of tasks from the job model
-   *
-   * @return current number of tasks in the job
-   */
-  private int getCurrentNumTasks() {
-    int currentNumTasks = 0;
-    for (ContainerModel containerModel : SamzaContainer.readJobModel(coordinatorServerURL, defaultReadJobModelDelayMs).getContainers().values()) {
-      currentNumTasks += containerModel.getTasks().size();
-    }
-    return currentNumTasks;
-  }
-
-  /**
-   * This method returns the number of containers in the job. It works by querying the server, and getting the job model.
-   * Then it extracts the number of containers from the job model
-   *
-   * @return current number of containers in the job
-   */
-  private int getCurrentNumContainers() {
-    return SamzaContainer.readJobModel(coordinatorServerURL, defaultReadJobModelDelayMs).getContainers().values().size();
-  }
-
-
-  /**
-   * Gets the current value of the server URL that the job coordinator is serving the job model on.
-   *
-   * @return the current server URL. If null, it means the job has not set the server yet.
-   */
-  public String getCoordinatorServerURL() {
-    return coordinatorServerURL;
-  }
-
-  /**
-   * Main function for using the Config Manager. The main function starts a Config Manager, and reacts to all messages thereafter
-   * In order for this module to run, you have to add the following configurations to the config file:
-   * yarn.rm.address=localhost //the ip of the resource manager in yarn
-   * yarn.rm.port=8088 //the port of the resource manager http server
-   * Additionally, the config manger will periodically poll the coordinator stream to see if there are any new messages.
-   * This period is set to 100 ms by default. However, it can be configured by adding the following property to the input config file.
-   * configManager.polling.interval= &lt; polling interval &gt;
-   * To run the code use the following command:
-   * {path to samza deployment}/samza/bin/run-config-manager.sh  --config-factory={config-factory} --config-path={path to config file of a job}
-   *
-   * @param args input arguments for running ConfigManager.
-   */
-  public static void main(String[] args) {
-    CommandLine cmdline = new CommandLine();
-    OptionSet options = cmdline.parser().parse(args);
-    Config config = cmdline.loadConfig(options);
-    ConfigManager configManager = new ConfigManager(config);
-    configManager.run();
-  }
-
-
-}
diff --git a/samza-autoscaling/src/main/java/org/apache/samza/autoscaling/utils/YarnUtil.java b/samza-autoscaling/src/main/java/org/apache/samza/autoscaling/utils/YarnUtil.java
deleted file mode 100644
index 7331f61..0000000
--- a/samza-autoscaling/src/main/java/org/apache/samza/autoscaling/utils/YarnUtil.java
+++ /dev/null
@@ -1,158 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.samza.autoscaling.utils;
-
-import org.apache.hadoop.yarn.api.records.ApplicationId;
-import org.apache.hadoop.yarn.client.api.YarnClient;
-import org.apache.hadoop.yarn.conf.YarnConfiguration;
-import org.apache.hadoop.yarn.exceptions.YarnException;
-import org.apache.http.HttpHost;
-import org.apache.http.HttpResponse;
-import org.apache.http.client.methods.HttpGet;
-import org.apache.http.impl.client.CloseableHttpClient;
-import org.apache.http.impl.client.HttpClientBuilder;
-import org.apache.http.util.EntityUtils;
-import org.codehaus.jackson.map.ObjectMapper;
-import org.codehaus.jackson.type.TypeReference;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-import java.util.List;
-import java.util.Map;
-
-/**
- * This is a helper class to interact with yarn. Some of the functionalities it provides are killing an application,
- * getting the state of an application, getting an application id given the job name and job id.
- */
-public class YarnUtil {
-  private static final Logger log = LoggerFactory.getLogger(YarnUtil.class);
-  private final CloseableHttpClient httpClient;
-  private final HttpHost rmServer;
-  private final YarnClient yarnClient;
-
-  public YarnUtil(String rmAddress, int rmPort) {
-    this.httpClient = HttpClientBuilder.create().build();
-    this.rmServer = new HttpHost(rmAddress, rmPort, "http");
-    log.info("setting rm server to : " + rmServer);
-    YarnConfiguration hConfig = new YarnConfiguration();
-    hConfig.set(YarnConfiguration.RM_ADDRESS, rmAddress + ":" + YarnConfiguration.DEFAULT_RM_PORT);
-    yarnClient = YarnClient.createYarnClient();
-    yarnClient.init(hConfig);
-    yarnClient.start();
-  }
-
-  /**
-   * Queries rm for all the applications currently running and finds the application with the matching job name and id
-   *
-   * @param jobName the name of the job
-   * @param jobID   the job id
-   * @return the application id of the job running in yarn. If application id is not found, it will return null.
-   */
-  public String getRunningAppId(String jobName, int jobID) {
-
-    try {
-      HttpGet getRequest = new HttpGet("/ws/v1/cluster/apps");
-      HttpResponse httpResponse = httpClient.execute(rmServer, getRequest);
-      String applications = EntityUtils.toString(httpResponse.getEntity());
-      log.debug("applications: " + applications);
-
-      List<Map<String, String>> applicationList = parseYarnApplications(applications);
-      String name = jobName + "_" + jobID;
-      for (Map<String, String> application : applicationList) {
-        if ("RUNNING".equals(application.get("state")) && name.equals(application.get("name")) && application.containsKey("id")) {
-          return application.get("id");
-        }
-      }
-    } catch (NullPointerException | IOException e) {
-      e.printStackTrace();
-      throw new IllegalStateException("there is no valid application id for the given job name and job id. job name: " + jobName + " job id: " + jobID);
-    }
-
-    return null;
-  }
-
-  List<Map<String, String>> parseYarnApplications(String applications) throws IOException {
-    ObjectMapper mapper = new ObjectMapper();
-    Map<String, Map<String, List<Map<String, String>>>> yarnApplications = mapper.readValue(applications, new TypeReference<Map<String, Map<String, List<Map<String, Object>>>>>() {
-    });
-    return yarnApplications.get("apps").get("app");
-  }
-
-  /**
-   * This function returns the state of a given application. This state can be on of the
-   * {"NEW", "NEW_SAVING", "SUBMITTED", "ACCEPTED", "RUNNING", "FINISHED", "FAILED", "KILLED"}
-   *
-   * @param applicationId the application id of the application the state is being queried
-   * @return the state of the application which is one of the following values: {"NEW", "NEW_SAVING", "SUBMITTED", "ACCEPTED", "RUNNING", "FINISHED", "FAILED", "KILLED"}
-   * @throws IOException   Throws IO exception
-   * @throws YarnException in case of errors or if YARN rejects the request due to
-   *                       access-control restrictions.
-   */
-  public String getApplicationState(String applicationId) throws IOException, YarnException {
-
-    return yarnClient.getApplicationReport(getApplicationIDFromString(applicationId)).getYarnApplicationState().toString();
-
-  }
-
-  /**
-   * This function kills an application given the applicationId
-   *
-   * @param applicationId the application Id of the job to be killed
-   * @throws IOException   Throws IO exception
-   * @throws YarnException in case of errors or if YARN rejects the request due to
-   *                       access-control restrictions.
-   */
-  public void killApplication(String applicationId) throws IOException, YarnException {
-
-    log.info("killing job with application id: " + applicationId);
-
-    yarnClient.killApplication(getApplicationIDFromString(applicationId));
-  }
-
-  /**
-   * This function converts an application in form of a String into a {@link ApplicationId}
-   *
-   * @param appIDStr The application id in form of a string
-   * @return the application id as an instance of ApplicationId class.
-   */
-  private ApplicationId getApplicationIDFromString(String appIDStr) {
-    String[] parts = appIDStr.split("_");
-    if (parts.length < 3) {
-      throw new IllegalStateException("the application id found is not valid. application id: " + appIDStr);
-    }
-    long timestamp = Long.valueOf(parts[1]);
-    int id = Integer.valueOf(parts[2]);
-    return ApplicationId.newInstance(timestamp, id);
-  }
-
-  /**
-   * This function stops the YarnUtil by stopping the yarn client and http client.
-   */
-  public void stop() {
-    try {
-      httpClient.close();
-    } catch (IOException e) {
-      log.error("HTTP Client failed to close.", e);
-    }
-    yarnClient.stop();
-  }
-
-}
diff --git a/samza-autoscaling/src/test/java/org/apache/samza/autoscaling/utils/YarnUtilTest.java b/samza-autoscaling/src/test/java/org/apache/samza/autoscaling/utils/YarnUtilTest.java
deleted file mode 100644
index 7b4b74e..0000000
--- a/samza-autoscaling/src/test/java/org/apache/samza/autoscaling/utils/YarnUtilTest.java
+++ /dev/null
@@ -1,38 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.samza.autoscaling.utils;
-
-import org.apache.commons.io.IOUtils;
-import org.junit.Test;
-
-import java.io.IOException;
-import java.util.List;
-import java.util.Map;
-
-import static org.junit.Assert.assertEquals;
-
-public class YarnUtilTest {
-
-  @Test
-  public void handleJsonArraysAsWellAsStrings() throws IOException {
-    YarnUtil yarnUtil = new YarnUtil("rm", 0);
-    List<Map<String, String>> applications = yarnUtil.parseYarnApplications(IOUtils.toString(getClass().getClassLoader().getResourceAsStream("exampleResourceManagerOutput.json")));
-    assertEquals("RUNNING", applications.get(0).get("state"));
-  }
-}
diff --git a/samza-autoscaling/src/test/resources/exampleResourceManagerOutput.json b/samza-autoscaling/src/test/resources/exampleResourceManagerOutput.json
deleted file mode 100644
index 9f8a025..0000000
--- a/samza-autoscaling/src/test/resources/exampleResourceManagerOutput.json
+++ /dev/null
@@ -1,121 +0,0 @@
-{
-  "apps": {
-    "app": [
-      {
-        "id": "application_1459790549146_0003",
-        "user": "root",
-        "name": "protodeserializer_1",
-        "queue": "default",
-        "state": "RUNNING",
-        "finalStatus": "UNDEFINED",
-        "progress": 0,
-        "trackingUI": "ApplicationMaster",
-        "trackingUrl": "http://yarnrm:8088/proxy/application_1459790549146_0003/",
-        "diagnostics": "",
-        "clusterId": 1459790549146,
-        "applicationType": "Samza",
-        "applicationTags": "",
-        "startedTime": 1459792852675,
-        "finishedTime": 0,
-        "elapsedTime": 738921,
-        "amContainerLogs": "http://yarnnm:8042/node/containerlogs/container_1459790549146_0003_01_000001/root",
-        "amHostHttpAddress": "yarnnm:8042",
-        "allocatedMB": 1024,
-        "allocatedVCores": 2,
-        "runningContainers": 2,
-        "memorySeconds": 749045,
-        "vcoreSeconds": 1462,
-        "preemptedResourceMB": 0,
-        "preemptedResourceVCores": 0,
-        "numNonAMContainerPreempted": 0,
-        "numAMContainerPreempted": 0,
-        "resourceRequests": [
-          {
-            "capability": {
-              "memory": 512,
-              "virtualCores": 1
-            },
-            "nodeLabelExpression": "",
-            "numContainers": 0,
-            "priority": {
-              "priority": 0
-            },
-            "relaxLocality": true,
-            "resourceName": "*"
-          },
-          {
-            "capability": {
-              "memory": 512,
-              "virtualCores": 1
-            },
-            "nodeLabelExpression": "",
-            "numContainers": 0,
-            "priority": {
-              "priority": 0
-            },
-            "relaxLocality": true,
-            "resourceName": "/default-rack"
-          }
-        ]
-      },
-      {
-        "id": "application_1459790549146_0002",
-        "user": "root",
-        "name": "protodeserializer_1",
-        "queue": "default",
-        "state": "KILLED",
-        "finalStatus": "KILLED",
-        "progress": 100,
-        "trackingUI": "History",
-        "trackingUrl": "http://yarnrm:8088/cluster/app/application_1459790549146_0002",
-        "diagnostics": "Application killed by user.",
-        "clusterId": 1459790549146,
-        "applicationType": "Samza",
-        "applicationTags": "",
-        "startedTime": 1459791820396,
-        "finishedTime": 1459792284264,
-        "elapsedTime": 463868,
-        "amContainerLogs": "http://yarnnm:8042/node/containerlogs/container_1459790549146_0002_01_000001/root",
-        "amHostHttpAddress": "yarnnm:8042",
-        "allocatedMB": -1,
-        "allocatedVCores": -1,
-        "runningContainers": -1,
-        "memorySeconds": 462177,
-        "vcoreSeconds": 902,
-        "preemptedResourceMB": 0,
-        "preemptedResourceVCores": 0,
-        "numNonAMContainerPreempted": 0,
-        "numAMContainerPreempted": 0
-      },
-      {
-        "id": "application_1459790549146_0001",
-        "user": "root",
-        "name": "protodeserializer_1",
-        "queue": "default",
-        "state": "KILLED",
-        "finalStatus": "KILLED",
-        "progress": 100,
-        "trackingUI": "History",
-        "trackingUrl": "http://yarnrm:8088/cluster/app/application_1459790549146_0001",
-        "diagnostics": "Application killed by user.",
-        "clusterId": 1459790549146,
-        "applicationType": "Samza",
-        "applicationTags": "",
-        "startedTime": 1459791108916,
-        "finishedTime": 1459791813659,
-        "elapsedTime": 704743,
-        "amContainerLogs": "http://yarnnm:8042/node/containerlogs/container_1459790549146_0001_01_000001/root",
-        "amHostHttpAddress": "yarnnm:8042",
-        "allocatedMB": -1,
-        "allocatedVCores": -1,
-        "runningContainers": -1,
-        "memorySeconds": 711605,
-        "vcoreSeconds": 1389,
-        "preemptedResourceMB": 0,
-        "preemptedResourceVCores": 0,
-        "numNonAMContainerPreempted": 0,
-        "numAMContainerPreempted": 0
-      }
-    ]
-  }
-}
\ No newline at end of file
diff --git a/samza-shell/src/main/bash/run-config-manager.sh b/samza-shell/src/main/bash/run-config-manager.sh
deleted file mode 100755
index 96777e7..0000000
--- a/samza-shell/src/main/bash/run-config-manager.sh
+++ /dev/null
@@ -1,25 +0,0 @@
-#!/bin/bash
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements.  See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership.  The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License.  You may obtain a copy of the License at
-#
-#   http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied.  See the License for the
-# specific language governing permissions and limitations
-# under the License.
-
-if [[ -n $(find "$base_dir/lib" -regex ".*samza-log4j2.*.jar*") ]]; then
-    [[ $JAVA_OPTS != *-Dlog4j.configurationFile* ]] && export JAVA_OPTS="$JAVA_OPTS -Dlog4j.configurationFile=file:$(dirname $0)/log4j2-console.xml"
-elif [[ -n $(find "$base_dir/lib" -regex ".*samza-log4j.*.jar*") ]]; then
-    [[ $JAVA_OPTS != *-Dlog4j.configuration* ]] && export JAVA_OPTS="$JAVA_OPTS -Dlog4j.configuration=file:$(dirname $0)/log4j-console.xml"
-fi
-
-exec $(dirname $0)/run-class.sh org.apache.samza.autoscaling.deployer.ConfigManager "$@"
diff --git a/settings.gradle b/settings.gradle
index c636706..cf4c9be 100644
--- a/settings.gradle
+++ b/settings.gradle
@@ -23,7 +23,6 @@ include \
   'samza-shell'
 
 def scalaModules = [
-        'samza-autoscaling',
         'samza-aws',
         'samza-azure',
         'samza-core',
diff --git a/sonar-project.properties b/sonar-project.properties
index 6e420da..2e9c7be 100644
--- a/sonar-project.properties
+++ b/sonar-project.properties
@@ -32,4 +32,4 @@ sonar.tests=src/test
 sonar.jacoco.reportPaths=build/jacoco/test.exec
 
 # List of subprojects here
-sonar.modules=samza-api,samza-autoscaling,samza-azure,samza-core,samza-elasticsearch,samza-hdfs,samza-kafka,samza-kv-inmemory,samza-kv-rocksdb,samza-kv-couchbase,samza-kv,samza-log4j,samza-rest,samza-shell,samza-test,samza-yarn
+sonar.modules=samza-api,samza-azure,samza-core,samza-elasticsearch,samza-hdfs,samza-kafka,samza-kv-inmemory,samza-kv-rocksdb,samza-kv-couchbase,samza-kv,samza-log4j,samza-rest,samza-shell,samza-test,samza-yarn


[samza] 05/06: SAMZA-2446: Invoke onCheckpoint only for registered SSPs (#1260)

Posted by lh...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

lhaiesp pushed a commit to branch 1.3.1
in repository https://gitbox.apache.org/repos/asf/samza.git

commit 964252aed6fb16b7952b93e56c3a0cd8f1072861
Author: bkonold <bk...@users.noreply.github.com>
AuthorDate: Tue Jan 28 15:43:59 2020 -0800

    SAMZA-2446: Invoke onCheckpoint only for registered SSPs (#1260)
---
 .../org/apache/samza/checkpoint/OffsetManager.scala     | 17 +++++++++++------
 .../org/apache/samza/checkpoint/TestOffsetManager.scala | 10 +++++++++-
 2 files changed, 20 insertions(+), 7 deletions(-)

diff --git a/samza-core/src/main/scala/org/apache/samza/checkpoint/OffsetManager.scala b/samza-core/src/main/scala/org/apache/samza/checkpoint/OffsetManager.scala
index 33fca8f..442d83f 100644
--- a/samza-core/src/main/scala/org/apache/samza/checkpoint/OffsetManager.scala
+++ b/samza-core/src/main/scala/org/apache/samza/checkpoint/OffsetManager.scala
@@ -353,13 +353,18 @@ class OffsetManager(
         }
       }
 
-      // invoke checkpoint listeners
-      checkpoint.getOffsets.asScala.groupBy { case (ssp, _) => ssp.getSystem }.foreach {
-        case (systemName:String, offsets: Map[SystemStreamPartition, String]) => {
-          // Option is empty if there is no checkpointListener for this systemName
-          checkpointListeners.get(systemName).foreach(_.onCheckpoint(offsets.asJava))
+      // Invoke checkpoint listeners only for SSPs that are registered with the OffsetManager. For example,
+      // changelog SSPs are not registered but may be present in the Checkpoint if transactional state checkpointing
+      // is enabled.
+      val registeredSSPs = systemStreamPartitions.getOrElse(taskName, Set[SystemStreamPartition]())
+      checkpoint.getOffsets.asScala
+        .filterKeys(registeredSSPs.contains)
+        .groupBy { case (ssp, _) => ssp.getSystem }.foreach {
+          case (systemName:String, offsets: Map[SystemStreamPartition, String]) => {
+            // Option is empty if there is no checkpointListener for this systemName
+            checkpointListeners.get(systemName).foreach(_.onCheckpoint(offsets.asJava))
+          }
         }
-      }
     }
 
     // delete corresponding startpoints after checkpoint is supposed to be committed
diff --git a/samza-core/src/test/scala/org/apache/samza/checkpoint/TestOffsetManager.scala b/samza-core/src/test/scala/org/apache/samza/checkpoint/TestOffsetManager.scala
index 50c793c..677504d 100644
--- a/samza-core/src/test/scala/org/apache/samza/checkpoint/TestOffsetManager.scala
+++ b/samza-core/src/test/scala/org/apache/samza/checkpoint/TestOffsetManager.scala
@@ -388,9 +388,11 @@ class TestOffsetManager {
     val systemName2 = "test-system2"
     val systemStream = new SystemStream(systemName, "test-stream")
     val systemStream2 = new SystemStream(systemName2, "test-stream2")
+    val systemStream3 = new SystemStream(systemName, "test-stream3")
     val partition = new Partition(0)
     val systemStreamPartition = new SystemStreamPartition(systemStream, partition)
     val systemStreamPartition2 = new SystemStreamPartition(systemStream2, partition)
+    val unregisteredSystemStreamPartition = new SystemStreamPartition(systemStream3, partition)
     val testStreamMetadata = new SystemStreamMetadata(systemStream.getStream, Map(partition -> new SystemStreamPartitionMetadata("0", "1", "2")).asJava)
     val testStreamMetadata2 = new SystemStreamMetadata(systemStream2.getStream, Map(partition -> new SystemStreamPartitionMetadata("0", "1", "2")).asJava)
     val systemStreamMetadata = Map(systemStream -> testStreamMetadata, systemStream2->testStreamMetadata2)
@@ -420,7 +422,10 @@ class TestOffsetManager {
     offsetManager.start
     // Should get offset 45 back from the checkpoint manager, which is last processed, and system admin should return 46 as starting offset.
     assertTrue(startpointManagerUtil.getStartpointManager.getFanOutForTask(taskName).containsKey(systemStreamPartition))
-    checkpoint(offsetManager, taskName)
+    val offsetsToCheckpoint = new java.util.HashMap[SystemStreamPartition, String]()
+    offsetsToCheckpoint.putAll(offsetManager.buildCheckpoint(taskName).getOffsets)
+    offsetsToCheckpoint.put(unregisteredSystemStreamPartition, "50")
+    offsetManager.writeCheckpoint(taskName, new Checkpoint(offsetsToCheckpoint))
 
     intercept[IllegalStateException] {
       // StartpointManager should stop after last fan out is removed
@@ -434,6 +439,9 @@ class TestOffsetManager {
     assertEquals("45", consumer.recentCheckpoint.get(systemStreamPartition))
     // make sure only the system with the callbacks gets them
     assertNull(consumer.recentCheckpoint.get(systemStreamPartition2))
+    // even though systemStream and systemStream3 share the same checkpointListener, callback should not execute for
+    // systemStream3 since it is not registered with the OffsetManager
+    assertNull(consumer.recentCheckpoint.get(unregisteredSystemStreamPartition))
 
     offsetManager.update(taskName, systemStreamPartition, "46")
     offsetManager.update(taskName, systemStreamPartition, "47")


[samza] 06/06: SAMZA-2447: Checkpoint dir removal should only search in valid store dirs (#1261)

Posted by lh...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

lhaiesp pushed a commit to branch 1.3.1
in repository https://gitbox.apache.org/repos/asf/samza.git

commit 17be8671ba317e01ed3abcae704bff82aac41aca
Author: bkonold <bk...@users.noreply.github.com>
AuthorDate: Tue Jan 28 15:56:44 2020 -0800

    SAMZA-2447: Checkpoint dir removal should only search in valid store dirs (#1261)
---
 .../TransactionalStateTaskStorageManager.scala     | 12 ++++++----
 .../TestTransactionalStateTaskStorageManager.java  | 27 ++++++++++++++++++++++
 2 files changed, 34 insertions(+), 5 deletions(-)

diff --git a/samza-core/src/main/scala/org/apache/samza/storage/TransactionalStateTaskStorageManager.scala b/samza-core/src/main/scala/org/apache/samza/storage/TransactionalStateTaskStorageManager.scala
index 20c7271..0335710 100644
--- a/samza-core/src/main/scala/org/apache/samza/storage/TransactionalStateTaskStorageManager.scala
+++ b/samza-core/src/main/scala/org/apache/samza/storage/TransactionalStateTaskStorageManager.scala
@@ -90,11 +90,13 @@ class TransactionalStateTaskStorageManager(
             val fileFilter: FileFilter = new WildcardFileFilter(taskStoreName + "-*")
             val checkpointDirs = storeDir.listFiles(fileFilter)
 
-            checkpointDirs
-              .filter(!_.getName.contains(latestCheckpointId.toString))
-              .foreach(checkpointDir => {
-                FileUtils.deleteDirectory(checkpointDir)
-              })
+            if (checkpointDirs != null) {
+              checkpointDirs
+                .filter(!_.getName.contains(latestCheckpointId.toString))
+                .foreach(checkpointDir => {
+                  FileUtils.deleteDirectory(checkpointDir)
+                })
+            }
           })
       }
     }
diff --git a/samza-core/src/test/scala/org/apache/samza/storage/TestTransactionalStateTaskStorageManager.java b/samza-core/src/test/scala/org/apache/samza/storage/TestTransactionalStateTaskStorageManager.java
index f2d4972..244a35b 100644
--- a/samza-core/src/test/scala/org/apache/samza/storage/TestTransactionalStateTaskStorageManager.java
+++ b/samza-core/src/test/scala/org/apache/samza/storage/TestTransactionalStateTaskStorageManager.java
@@ -21,6 +21,7 @@ package org.apache.samza.storage;
 
 import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.ImmutableSet;
+import java.io.FileFilter;
 import scala.Option;
 import scala.collection.immutable.Map;
 
@@ -492,6 +493,32 @@ public class TestTransactionalStateTaskStorageManager {
     fail("Should have thrown an exception if no changelog offset found for checkpointed store");
   }
 
+  @Test
+  public void testRemoveOldCheckpointsWhenBaseDirContainsRegularFiles() {
+    TaskName taskName = new TaskName("Partition 0");
+    ContainerStorageManager containerStorageManager = mock(ContainerStorageManager.class);
+    Map<String, SystemStream> changelogSystemStreams = mock(Map.class);
+    SystemAdmins systemAdmins = mock(SystemAdmins.class);
+    File loggedStoreBaseDir = mock(File.class);
+    Partition changelogPartition = new Partition(0);
+    TaskMode taskMode = TaskMode.Active;
+    StorageManagerUtil storageManagerUtil = mock(StorageManagerUtil.class);
+
+    File mockStoreDir = mock(File.class);
+    String mockStoreDirName = "notDirectory";
+
+    when(loggedStoreBaseDir.listFiles()).thenReturn(new File[] {mockStoreDir});
+    when(mockStoreDir.getName()).thenReturn(mockStoreDirName);
+    when(storageManagerUtil.getTaskStoreDir(eq(loggedStoreBaseDir), eq(mockStoreDirName), eq(taskName), eq(taskMode))).thenReturn(mockStoreDir);
+    // null here can happen if listFiles is called on a non-directory
+    when(mockStoreDir.listFiles(any(FileFilter.class))).thenReturn(null);
+
+    TransactionalStateTaskStorageManager tsm = new TransactionalStateTaskStorageManager(taskName, containerStorageManager,
+        changelogSystemStreams, systemAdmins, loggedStoreBaseDir, changelogPartition, taskMode, storageManagerUtil);
+
+    tsm.removeOldCheckpoints(CheckpointId.create());
+  }
+
   private TransactionalStateTaskStorageManager buildTSM(ContainerStorageManager csm, Partition changelogPartition,
       StorageManagerUtil smu) {
     TaskName taskName = new TaskName("Partition 0");


[samza] 03/06: SAMZA-2431: Fix the checkpoint and changelog topic auto-creation. (#1251)

Posted by lh...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

lhaiesp pushed a commit to branch 1.3.1
in repository https://gitbox.apache.org/repos/asf/samza.git

commit e250678fa4bb238ec82414d20ba2212da29a1f27
Author: shanthoosh <sv...@linkedin.com>
AuthorDate: Tue Jan 14 19:02:13 2020 -0800

    SAMZA-2431: Fix the checkpoint and changelog topic auto-creation. (#1251)
    
    * Fix the checkpoint and changelog topic creation configurations.
    
    * Address review comments.
    
    * Address review comments.
---
 .../java/org/apache/samza/system/StreamSpec.java   |  4 +-
 .../samza/system/kafka/KafkaSystemAdmin.java       |  7 +++-
 .../org/apache/samza/config/KafkaConfig.scala      |  6 ++-
 .../system/kafka/TestKafkaSystemAdminJava.java     | 26 ++++++++++++
 .../org/apache/samza/config/TestKafkaConfig.scala  | 46 +++++++++++++++++++++-
 5 files changed, 82 insertions(+), 7 deletions(-)

diff --git a/samza-api/src/main/java/org/apache/samza/system/StreamSpec.java b/samza-api/src/main/java/org/apache/samza/system/StreamSpec.java
index a1ad5e4..c122371 100644
--- a/samza-api/src/main/java/org/apache/samza/system/StreamSpec.java
+++ b/samza-api/src/main/java/org/apache/samza/system/StreamSpec.java
@@ -19,6 +19,8 @@
 
 package org.apache.samza.system;
 
+import com.google.common.base.Joiner;
+
 import java.io.Serializable;
 import java.util.Collections;
 import java.util.HashMap;
@@ -269,6 +271,6 @@ public class StreamSpec implements Serializable {
 
   @Override
   public String toString() {
-    return String.format("StreamSpec: id=%s, systemName=%s, pName=%s, partCount=%d.", id, systemName, physicalName, partitionCount);
+    return String.format("StreamSpec: id=%s, systemName=%s, pName=%s, partCount=%d, config=%s.", id, systemName, physicalName, partitionCount, Joiner.on(",").withKeyValueSeparator("=").join(config));
   }
 }
diff --git a/samza-kafka/src/main/java/org/apache/samza/system/kafka/KafkaSystemAdmin.java b/samza-kafka/src/main/java/org/apache/samza/system/kafka/KafkaSystemAdmin.java
index e5d6af1..ecb95a9 100644
--- a/samza-kafka/src/main/java/org/apache/samza/system/kafka/KafkaSystemAdmin.java
+++ b/samza-kafka/src/main/java/org/apache/samza/system/kafka/KafkaSystemAdmin.java
@@ -541,8 +541,11 @@ public class KafkaSystemAdmin implements SystemAdmin {
           new KafkaStreamSpec(spec.getId(), spec.getPhysicalName(), systemName, 1, coordinatorStreamReplicationFactor,
               coordinatorStreamProperties);
     } else if (spec.isCheckpointStream()) {
-      kafkaSpec = KafkaStreamSpec.fromSpec(StreamSpec.createCheckpointStreamSpec(spec.getPhysicalName(), systemName))
-              .copyWithReplicationFactor(Integer.parseInt(new KafkaConfig(config).getCheckpointReplicationFactor().get()));
+      Properties checkpointTopicProperties = new Properties();
+      checkpointTopicProperties.putAll(spec.getConfig());
+      kafkaSpec = KafkaStreamSpec.fromSpec(StreamSpec.createCheckpointStreamSpec(spec.getPhysicalName(), spec.getSystemName()))
+              .copyWithReplicationFactor(Integer.parseInt(new KafkaConfig(config).getCheckpointReplicationFactor().get()))
+              .copyWithProperties(checkpointTopicProperties);
     } else if (intermediateStreamProperties.containsKey(spec.getId())) {
       kafkaSpec = KafkaStreamSpec.fromSpec(spec);
       Properties properties = kafkaSpec.getProperties();
diff --git a/samza-kafka/src/main/scala/org/apache/samza/config/KafkaConfig.scala b/samza-kafka/src/main/scala/org/apache/samza/config/KafkaConfig.scala
index 75fbb6b..3b5f5f3 100644
--- a/samza-kafka/src/main/scala/org/apache/samza/config/KafkaConfig.scala
+++ b/samza-kafka/src/main/scala/org/apache/samza/config/KafkaConfig.scala
@@ -316,7 +316,6 @@ class KafkaConfig(config: Config) extends ScalaMapConfig(config) {
     val filteredConfigs = config.subset(KafkaConfig.CHANGELOG_STREAM_KAFKA_SETTINGS format name, true)
     val kafkaChangeLogProperties = new Properties
 
-    val appConfig = new ApplicationConfig(config)
     // SAMZA-1600: do not use the combination of "compact,delete" as cleanup policy until we pick up Kafka broker 0.11.0.57,
     // 1.0.2, or 1.1.0 (see KAFKA-6568)
 
@@ -325,7 +324,10 @@ class KafkaConfig(config: Config) extends ScalaMapConfig(config) {
     //  - Set topic TTL to be the same as RocksDB TTL
     Option(config.get("stores.%s.rocksdb.ttl.ms" format name)) match {
       case Some(rocksDbTtl) =>
-        if (!config.containsKey("stores.%s.changelog.kafka.cleanup.policy" format name)) {
+        if (!rocksDbTtl.isEmpty && rocksDbTtl.toInt < 0) {
+          kafkaChangeLogProperties.setProperty("cleanup.policy", "compact")
+          kafkaChangeLogProperties.setProperty("max.message.bytes", getChangelogStreamMaxMessageByte(name))
+        } else if (!config.containsKey("stores.%s.changelog.kafka.cleanup.policy" format name)) {
           kafkaChangeLogProperties.setProperty("cleanup.policy", "delete")
           if (!config.containsKey("stores.%s.changelog.kafka.retention.ms" format name)) {
             kafkaChangeLogProperties.setProperty("retention.ms", String.valueOf(rocksDbTtl))
diff --git a/samza-kafka/src/test/java/org/apache/samza/system/kafka/TestKafkaSystemAdminJava.java b/samza-kafka/src/test/java/org/apache/samza/system/kafka/TestKafkaSystemAdminJava.java
index 7ca03f3..82d635f 100644
--- a/samza-kafka/src/test/java/org/apache/samza/system/kafka/TestKafkaSystemAdminJava.java
+++ b/samza-kafka/src/test/java/org/apache/samza/system/kafka/TestKafkaSystemAdminJava.java
@@ -115,6 +115,32 @@ public class TestKafkaSystemAdminJava extends TestKafkaSystemAdmin {
   }
 
   @Test
+  public void testToKafkaSpecForCheckpointStreamShouldReturnTheCorrectStreamSpecByPreservingTheConfig() {
+    String topicName = "testStream";
+    String streamId = "samza-internal-checkpoint-stream-id";
+    int partitionCount = 1;
+    Map<String, String> map = new HashMap<>();
+    map.put("cleanup.policy", "compact");
+    map.put("replication.factor", "3");
+    map.put("segment.bytes", "536870912");
+    map.put("delete.retention.ms", "86400000");
+
+    Config config = new MapConfig(map);
+
+    StreamSpec spec = new StreamSpec(streamId, topicName, SYSTEM, partitionCount, config);
+    KafkaSystemAdmin kafkaSystemAdmin = systemAdmin();
+    KafkaStreamSpec kafkaStreamSpec = kafkaSystemAdmin.toKafkaSpec(spec);
+    System.out.println(kafkaStreamSpec);
+    assertEquals(streamId, kafkaStreamSpec.getId());
+    assertEquals(topicName, kafkaStreamSpec.getPhysicalName());
+    assertEquals(partitionCount, kafkaStreamSpec.getPartitionCount());
+    assertEquals(3, kafkaStreamSpec.getReplicationFactor());
+    assertEquals("compact", kafkaStreamSpec.getConfig().get("cleanup.policy"));
+    assertEquals("536870912", kafkaStreamSpec.getConfig().get("segment.bytes"));
+    assertEquals("86400000", kafkaStreamSpec.getConfig().get("delete.retention.ms"));
+  }
+
+  @Test
   public void testToKafkaSpec() {
     String topicName = "testStream";
 
diff --git a/samza-kafka/src/test/scala/org/apache/samza/config/TestKafkaConfig.scala b/samza-kafka/src/test/scala/org/apache/samza/config/TestKafkaConfig.scala
index 8558a85..00b103d 100644
--- a/samza-kafka/src/test/scala/org/apache/samza/config/TestKafkaConfig.scala
+++ b/samza-kafka/src/test/scala/org/apache/samza/config/TestKafkaConfig.scala
@@ -22,14 +22,14 @@ package org.apache.samza.config
 import java.util.Properties
 import java.util.concurrent.TimeUnit
 
-import org.apache.samza.config.factories.PropertiesConfigFactory
 import org.junit.Assert._
+import org.junit.After
+import org.junit.Before
 import org.junit.Test
 
 import scala.collection.JavaConverters._
 import org.apache.kafka.common.serialization.ByteArraySerializer
 import org.apache.kafka.clients.producer.ProducerConfig
-import org.junit.Before
 
 class TestKafkaConfig {
 
@@ -47,6 +47,10 @@ class TestKafkaConfig {
     props.setProperty(JobConfig.JOB_NAME, "jobName")
   }
 
+  @After
+  def clearUpProperties(): Unit = {
+    props.clear()
+  }
 
   @Test
   def testStreamLevelFetchSizeOverride() {
@@ -82,6 +86,44 @@ class TestKafkaConfig {
   }
 
   @Test
+  def testChangeLogPropertiesShouldReturnCorrectTopicConfigurationForInfiniteTTLStores(): Unit = {
+    val props = new Properties
+    props.setProperty(KAFKA_PRODUCER_PROPERTY_PREFIX + "bootstrap.servers", "localhost:9092")
+    props.setProperty("systems." + SYSTEM_NAME + ".consumer.zookeeper.connect", "localhost:2181/")
+    props.setProperty(JobConfig.JOB_NAME, "jobName")
+
+    props.setProperty("stores.test1.changelog", "kafka.mychangelog1")
+    props.setProperty("stores.test1.rocksdb.ttl.ms", "-1")
+
+    val mapConfig = new MapConfig(props.asScala.asJava)
+    val kafkaConfig = new KafkaConfig(mapConfig)
+    val kafkaProperties = kafkaConfig.getChangelogKafkaProperties("test1")
+    assertEquals("compact", kafkaProperties.getProperty("cleanup.policy"))
+    assertEquals("536870912", kafkaProperties.getProperty("segment.bytes"))
+    assertEquals("1000012", kafkaProperties.getProperty("max.message.bytes"))
+    assertEquals("86400000", kafkaProperties.getProperty("delete.retention.ms"))
+  }
+
+  @Test
+  def testChangeLogPropertiesShouldReturnCorrectTopicConfigurationForStoresWithEmptyRocksDBTTL(): Unit = {
+    val props = new Properties
+    props.setProperty(KAFKA_PRODUCER_PROPERTY_PREFIX + "bootstrap.servers", "localhost:9092")
+    props.setProperty("systems." + SYSTEM_NAME + ".consumer.zookeeper.connect", "localhost:2181/")
+    props.setProperty(JobConfig.JOB_NAME, "jobName")
+
+    props.setProperty("stores.test1.changelog", "kafka.mychangelog1")
+
+    val mapConfig = new MapConfig(props.asScala.asJava)
+    val kafkaConfig = new KafkaConfig(mapConfig)
+    val kafkaProperties = kafkaConfig.getChangelogKafkaProperties("test1")
+    assertEquals("compact", kafkaProperties.getProperty("cleanup.policy"))
+    assertEquals("536870912", kafkaProperties.getProperty("segment.bytes"))
+    assertEquals("86400000", kafkaProperties.getProperty("delete.retention.ms"))
+    assertEquals("1000012", kafkaProperties.getProperty("max.message.bytes"))
+
+  }
+
+  @Test
   def testChangeLogProperties() {
     props.setProperty("job.changelog.system", SYSTEM_NAME)
     props.setProperty("systems." + SYSTEM_NAME + ".samza.factory", "org.apache.samza.system.kafka.KafkaSystemFactory")


[samza] 02/06: Fix the coordinator stream creation workflow.

Posted by lh...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

lhaiesp pushed a commit to branch 1.3.1
in repository https://gitbox.apache.org/repos/asf/samza.git

commit 3c2863c1e2074f3132feaa6c315ec109726198d0
Author: Shanthoosh Venkataraman <sp...@usc.edu>
AuthorDate: Tue Jan 14 16:20:29 2020 -0800

    Fix the coordinator stream creation workflow.
---
 .../samza/system/kafka/KafkaSystemAdmin.java       | 10 +++---
 .../system/kafka/TestKafkaSystemAdminJava.java     | 39 ++++++++++++++++++++++
 2 files changed, 44 insertions(+), 5 deletions(-)

diff --git a/samza-kafka/src/main/java/org/apache/samza/system/kafka/KafkaSystemAdmin.java b/samza-kafka/src/main/java/org/apache/samza/system/kafka/KafkaSystemAdmin.java
index 97229db..e5d6af1 100644
--- a/samza-kafka/src/main/java/org/apache/samza/system/kafka/KafkaSystemAdmin.java
+++ b/samza-kafka/src/main/java/org/apache/samza/system/kafka/KafkaSystemAdmin.java
@@ -464,19 +464,19 @@ public class KafkaSystemAdmin implements SystemAdmin {
     LOG.info("Creating Kafka topic: {} on system: {}", streamSpec.getPhysicalName(), streamSpec.getSystemName());
     final String REPL_FACTOR = "replication.factor";
 
-    KafkaStreamSpec kSpec = toKafkaSpec(streamSpec);
-    String topicName = kSpec.getPhysicalName();
+    KafkaStreamSpec kafkaStreamSpec = toKafkaSpec(streamSpec);
+    String topicName = kafkaStreamSpec.getPhysicalName();
 
     // create topic.
-    NewTopic newTopic = new NewTopic(topicName, kSpec.getPartitionCount(), (short) kSpec.getReplicationFactor());
+    NewTopic newTopic = new NewTopic(topicName, kafkaStreamSpec.getPartitionCount(), (short) kafkaStreamSpec.getReplicationFactor());
 
     // specify the configs
-    Map<String, String> streamConfig = new HashMap<>(streamSpec.getConfig());
+    Map<String, String> streamConfig = new HashMap<>(kafkaStreamSpec.getConfig());
     // HACK - replication.factor is invalid config for AdminClient.createTopics
     if (streamConfig.containsKey(REPL_FACTOR)) {
       String repl = streamConfig.get(REPL_FACTOR);
       LOG.warn("Configuration {}={} for topic={} is invalid. Using kSpec repl factor {}",
-          REPL_FACTOR, repl, kSpec.getPhysicalName(), kSpec.getReplicationFactor());
+          REPL_FACTOR, repl, kafkaStreamSpec.getPhysicalName(), kafkaStreamSpec.getReplicationFactor());
       streamConfig.remove(REPL_FACTOR);
     }
     newTopic.configs(new MapConfig(streamConfig));
diff --git a/samza-kafka/src/test/java/org/apache/samza/system/kafka/TestKafkaSystemAdminJava.java b/samza-kafka/src/test/java/org/apache/samza/system/kafka/TestKafkaSystemAdminJava.java
index 1431600..7ca03f3 100644
--- a/samza-kafka/src/test/java/org/apache/samza/system/kafka/TestKafkaSystemAdminJava.java
+++ b/samza-kafka/src/test/java/org/apache/samza/system/kafka/TestKafkaSystemAdminJava.java
@@ -19,15 +19,22 @@
 
 package org.apache.samza.system.kafka;
 
+import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.ImmutableSet;
+import java.util.Collection;
 import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
+import org.apache.commons.lang3.RandomStringUtils;
+import org.apache.kafka.clients.admin.DescribeConfigsResult;
 import org.apache.kafka.clients.admin.DescribeTopicsResult;
 import org.apache.kafka.clients.admin.TopicDescription;
 import org.apache.kafka.clients.consumer.KafkaConsumer;
 import org.apache.kafka.clients.consumer.OffsetAndTimestamp;
 import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.config.ConfigResource;
+import org.apache.kafka.common.config.TopicConfig;
 import org.apache.samza.Partition;
 import org.apache.samza.config.ApplicationConfig;
 import org.apache.samza.config.Config;
@@ -62,6 +69,38 @@ public class TestKafkaSystemAdminJava extends TestKafkaSystemAdmin {
   private static final String TEST_OFFSET = "10";
 
   @Test
+  public void testCreateStreamShouldCoordinatorStreamWithCorrectTopicProperties() throws Exception {
+    String coordinatorTopicName = String.format("topic-name-%s", RandomStringUtils.randomAlphabetic(5));
+    StreamSpec coordinatorStreamSpec = KafkaStreamSpec.createCoordinatorStreamSpec(coordinatorTopicName, SYSTEM());
+
+    boolean hasCreatedStream = systemAdmin().createStream(coordinatorStreamSpec);
+
+    assertTrue(hasCreatedStream);
+
+    Map<String, String> coordinatorTopicProperties = getTopicConfigFromKafkaBroker(coordinatorTopicName);
+
+    assertEquals("compact", coordinatorTopicProperties.get(TopicConfig.CLEANUP_POLICY_CONFIG));
+    assertEquals("26214400", coordinatorTopicProperties.get(TopicConfig.SEGMENT_BYTES_CONFIG));
+    assertEquals("86400000", coordinatorTopicProperties.get(TopicConfig.DELETE_RETENTION_MS_CONFIG));
+  }
+
+  private static Map<String, String> getTopicConfigFromKafkaBroker(String topicName) throws Exception {
+    List<ConfigResource> configResourceList = ImmutableList.of(
+        new ConfigResource(ConfigResource.Type.TOPIC, topicName));
+    Map<ConfigResource, org.apache.kafka.clients.admin.Config> configResourceConfigMap =
+        adminClient().describeConfigs(configResourceList).all().get();
+    Map<String, String> kafkaTopicConfig = new HashMap<>();
+
+    configResourceConfigMap.values().forEach(configEntry -> {
+      configEntry.entries().forEach(config -> {
+          kafkaTopicConfig.put(config.name(), config.value());
+      });
+    });
+
+    return kafkaTopicConfig;
+  }
+
+  @Test
   public void testGetOffsetsAfter() {
     SystemStreamPartition ssp1 = new SystemStreamPartition(SYSTEM, TOPIC, new Partition(0));
     SystemStreamPartition ssp2 = new SystemStreamPartition(SYSTEM, TOPIC, new Partition(1));


[samza] 04/06: Fix the RocksDB TTL type conversion in change log properties generation. (#1254)

Posted by lh...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

lhaiesp pushed a commit to branch 1.3.1
in repository https://gitbox.apache.org/repos/asf/samza.git

commit cccca7cf137ee168eed07238f5deeb0f60d9710d
Author: shanthoosh <sv...@linkedin.com>
AuthorDate: Fri Jan 17 10:19:34 2020 -0800

    Fix the RocksDB TTL type conversion in change log properties generation. (#1254)
---
 .../scala/org/apache/samza/config/KafkaConfig.scala   |  2 +-
 .../org/apache/samza/config/TestKafkaConfig.scala     | 19 +++++++++++++++++++
 2 files changed, 20 insertions(+), 1 deletion(-)

diff --git a/samza-kafka/src/main/scala/org/apache/samza/config/KafkaConfig.scala b/samza-kafka/src/main/scala/org/apache/samza/config/KafkaConfig.scala
index 3b5f5f3..69a9966 100644
--- a/samza-kafka/src/main/scala/org/apache/samza/config/KafkaConfig.scala
+++ b/samza-kafka/src/main/scala/org/apache/samza/config/KafkaConfig.scala
@@ -324,7 +324,7 @@ class KafkaConfig(config: Config) extends ScalaMapConfig(config) {
     //  - Set topic TTL to be the same as RocksDB TTL
     Option(config.get("stores.%s.rocksdb.ttl.ms" format name)) match {
       case Some(rocksDbTtl) =>
-        if (!rocksDbTtl.isEmpty && rocksDbTtl.toInt < 0) {
+        if (!rocksDbTtl.isEmpty && rocksDbTtl.toLong < 0) {
           kafkaChangeLogProperties.setProperty("cleanup.policy", "compact")
           kafkaChangeLogProperties.setProperty("max.message.bytes", getChangelogStreamMaxMessageByte(name))
         } else if (!config.containsKey("stores.%s.changelog.kafka.cleanup.policy" format name)) {
diff --git a/samza-kafka/src/test/scala/org/apache/samza/config/TestKafkaConfig.scala b/samza-kafka/src/test/scala/org/apache/samza/config/TestKafkaConfig.scala
index 00b103d..64b476b 100644
--- a/samza-kafka/src/test/scala/org/apache/samza/config/TestKafkaConfig.scala
+++ b/samza-kafka/src/test/scala/org/apache/samza/config/TestKafkaConfig.scala
@@ -105,6 +105,25 @@ class TestKafkaConfig {
   }
 
   @Test
+  def testChangeLogPropertiesShouldReturnCorrectTopicConfigurationForLargeTTLStores(): Unit = {
+    val props = new Properties
+    props.setProperty(KAFKA_PRODUCER_PROPERTY_PREFIX + "bootstrap.servers", "localhost:9092")
+    props.setProperty("systems." + SYSTEM_NAME + ".consumer.zookeeper.connect", "localhost:2181/")
+    props.setProperty(JobConfig.JOB_NAME, "jobName")
+
+    props.setProperty("stores.test1.changelog", "kafka.mychangelog1")
+    // Set the RocksDB TTL to be 28 days.
+    props.setProperty("stores.test1.rocksdb.ttl.ms", "2419200000")
+
+    val mapConfig = new MapConfig(props.asScala.asJava)
+    val kafkaConfig = new KafkaConfig(mapConfig)
+    val kafkaProperties = kafkaConfig.getChangelogKafkaProperties("test1")
+    assertEquals("delete", kafkaProperties.getProperty("cleanup.policy"))
+    assertEquals("536870912", kafkaProperties.getProperty("segment.bytes"))
+    assertEquals("86400000", kafkaProperties.getProperty("delete.retention.ms"))
+  }
+
+  @Test
   def testChangeLogPropertiesShouldReturnCorrectTopicConfigurationForStoresWithEmptyRocksDBTTL(): Unit = {
     val props = new Properties
     props.setProperty(KAFKA_PRODUCER_PROPERTY_PREFIX + "bootstrap.servers", "localhost:9092")