You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@heron.apache.org by sa...@apache.org on 2022/05/09 21:35:56 UTC
[incubator-heron] 01/07: [Sched] Removed Aurora from schedulers.
This is an automated email from the ASF dual-hosted git repository.
saadurrahman pushed a commit to branch saadurrahman/3829-Deprecate-Apache-Aurora-dev
in repository https://gitbox.apache.org/repos/asf/incubator-heron.git
commit bfd151cf14034660ef7e5d767984d24f4f4a93a1
Author: Saad Ur Rahman <sa...@apache.org>
AuthorDate: Mon May 9 17:18:23 2022 -0400
[Sched] Removed Aurora from schedulers.
Apache Aurora source and tests removed from scheduler.
---
heron/schedulers/src/java/BUILD | 21 --
.../scheduler/aurora/AuroraCLIController.java | 214 ------------
.../heron/scheduler/aurora/AuroraContext.java | 53 ---
.../heron/scheduler/aurora/AuroraController.java | 44 ---
.../apache/heron/scheduler/aurora/AuroraField.java | 39 ---
.../aurora/AuroraHeronShellController.java | 132 --------
.../heron/scheduler/aurora/AuroraLauncher.java | 58 ----
.../heron/scheduler/aurora/AuroraScheduler.java | 299 -----------------
heron/schedulers/tests/java/BUILD | 19 --
.../scheduler/aurora/AuroraCLIControllerTest.java | 200 -----------
.../heron/scheduler/aurora/AuroraContextTest.java | 49 ---
.../heron/scheduler/aurora/AuroraLauncherTest.java | 80 -----
.../scheduler/aurora/AuroraSchedulerTest.java | 372 ---------------------
13 files changed, 1580 deletions(-)
diff --git a/heron/schedulers/src/java/BUILD b/heron/schedulers/src/java/BUILD
index b223c7bc440..caa66719e13 100644
--- a/heron/schedulers/src/java/BUILD
+++ b/heron/schedulers/src/java/BUILD
@@ -93,27 +93,6 @@ genrule(
cmd = "cp $< $@",
)
-java_library(
- name = "aurora-scheduler-java",
- srcs = glob(["**/aurora/*.java"]),
- resources = glob(["**/aurora/*.aurora"]),
- deps = scheduler_deps_files,
-)
-
-java_binary(
- name = "aurora-scheduler-unshaded",
- srcs = glob(["**/aurora/*.java"]),
- resources = glob(["**/aurora/*.aurora"]),
- deps = scheduler_deps_files,
-)
-
-genrule(
- name = "heron-aurora-scheduler",
- srcs = [":aurora-scheduler-unshaded_deploy.jar"],
- outs = ["heron-aurora-scheduler.jar"],
- cmd = "cp $< $@",
-)
-
java_library(
name = "null-scheduler-java",
srcs = glob(
diff --git a/heron/schedulers/src/java/org/apache/heron/scheduler/aurora/AuroraCLIController.java b/heron/schedulers/src/java/org/apache/heron/scheduler/aurora/AuroraCLIController.java
deleted file mode 100644
index 5cd710a7bc0..00000000000
--- a/heron/schedulers/src/java/org/apache/heron/scheduler/aurora/AuroraCLIController.java
+++ /dev/null
@@ -1,214 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.heron.scheduler.aurora;
-
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.logging.Logger;
-import java.util.stream.Collectors;
-
-import com.google.common.annotations.VisibleForTesting;
-
-import org.apache.heron.spi.packing.PackingPlan;
-import org.apache.heron.spi.utils.ShellUtils;
-
-/**
- * Implementation of AuroraController that shells out to the Aurora CLI to control the Aurora
- * scheduler workflow of a topology.
- */
-class AuroraCLIController implements AuroraController {
- private static final Logger LOG = Logger.getLogger(AuroraCLIController.class.getName());
-
- private final String jobSpec;
- private final boolean isVerbose;
- private String auroraFilename;
-
- AuroraCLIController(
- String jobName,
- String cluster,
- String role,
- String env,
- String auroraFilename,
- boolean isVerbose) {
- this.auroraFilename = auroraFilename;
- this.isVerbose = isVerbose;
- this.jobSpec = String.format("%s/%s/%s/%s", cluster, role, env, jobName);
- }
-
- @Override
- public boolean createJob(Map<AuroraField, String> bindings, Map<String, String> extra) {
- List<String> auroraCmd =
- new ArrayList<>(Arrays.asList("aurora", "job", "create", "--wait-until", "RUNNING"));
-
- for (AuroraField field : bindings.keySet()) {
- auroraCmd.add("--bind");
- auroraCmd.add(String.format("%s=%s", field, bindings.get(field)));
- }
-
- if (!extra.isEmpty()) {
- for (String field : extra.keySet()) {
- if (field.equals(AuroraContext.JOB_TEMPLATE)) {
- auroraFilename = auroraFilename.replace("heron.aurora", extra.get(field));
- } else {
- auroraCmd.add("--bind");
- auroraCmd.add(String.format("%s=%s", field, extra.get(field)));
- }
- }
- }
-
- auroraCmd.add(jobSpec);
- auroraCmd.add(auroraFilename);
-
- if (isVerbose) {
- auroraCmd.add("--verbose");
- }
-
- return runProcess(auroraCmd);
- }
-
- // Kill an aurora job
- @Override
- public boolean killJob() {
- List<String> auroraCmd = new ArrayList<>(Arrays.asList("aurora", "job", "killall"));
- auroraCmd.add(jobSpec);
-
- appendAuroraCommandOptions(auroraCmd, isVerbose);
-
- return runProcess(auroraCmd);
- }
-
- // Restart an aurora job
- @Override
- public boolean restart(Integer containerId) {
- List<String> auroraCmd = new ArrayList<>(Arrays.asList("aurora", "job", "restart"));
- if (containerId != null) {
- auroraCmd.add(String.format("%s/%d", jobSpec, containerId));
- } else {
- auroraCmd.add(jobSpec);
- }
-
- appendAuroraCommandOptions(auroraCmd, isVerbose);
-
- return runProcess(auroraCmd);
- }
-
- @Override
- public void removeContainers(Set<PackingPlan.ContainerPlan> containersToRemove) {
- String instancesToKill = getInstancesIdsToKill(containersToRemove);
- //aurora job kill <cluster>/<role>/<env>/<name>/<instance_ids>
- List<String> auroraCmd = new ArrayList<>(Arrays.asList(
- "aurora", "job", "kill", jobSpec + "/" + instancesToKill));
-
- appendAuroraCommandOptions(auroraCmd, isVerbose);
- LOG.info(String.format(
- "Killing %s aurora containers: %s", containersToRemove.size(), auroraCmd));
- if (!runProcess(auroraCmd)) {
- throw new RuntimeException("Failed to kill freed aurora instances: " + instancesToKill);
- }
- }
-
- private static final String ERR_PROMPT =
- "The topology can be in a strange stage. Please check carefully or redeploy the topology !!";
-
- @Override
- public Set<Integer> addContainers(Integer count) {
- //aurora job add <cluster>/<role>/<env>/<name>/<instance_id> <count>
- //clone instance 0
- List<String> auroraCmd = new ArrayList<>(Arrays.asList(
- "aurora", "job", "add", "--wait-until", "RUNNING",
- jobSpec + "/0", count.toString(), "--verbose"));
-
- LOG.info(String.format("Requesting %s new aurora containers %s", count, auroraCmd));
- StringBuilder stderr = new StringBuilder();
- if (!runProcess(auroraCmd, null, stderr)) {
- throw new RuntimeException(
- "Failed to create " + count + " new aurora instances. " + ERR_PROMPT);
- }
-
- if (stderr.length() <= 0) { // no container was added
- throw new RuntimeException("Empty output by Aurora. " + ERR_PROMPT);
- }
- return extractContainerIds(stderr.toString());
- }
-
- private Set<Integer> extractContainerIds(String auroraOutputStr) {
- String pattern = "Querying instance statuses: [";
- int idx1 = auroraOutputStr.indexOf(pattern);
- if (idx1 < 0) { // no container was added
- LOG.info("stdout & stderr by Aurora " + auroraOutputStr);
- return new HashSet<Integer>();
- }
- idx1 += pattern.length();
- int idx2 = auroraOutputStr.indexOf("]", idx1);
- String containerIdStr = auroraOutputStr.substring(idx1, idx2);
- LOG.info("container IDs returned by Aurora " + containerIdStr);
- return Arrays.asList(containerIdStr.split(", "))
- .stream().map(x->Integer.valueOf(x)).collect(Collectors.toSet());
- }
-
- // Utils method for unit tests
- @VisibleForTesting
- boolean runProcess(List<String> auroraCmd, StringBuilder stdout, StringBuilder stderr) {
- int status =
- ShellUtils.runProcess(auroraCmd.toArray(new String[auroraCmd.size()]),
- stderr != null ? stderr : new StringBuilder());
-
- if (status != 0) {
- LOG.severe(String.format(
- "Failed to run process. Command=%s, STDOUT=%s, STDERR=%s", auroraCmd, stdout, stderr));
- }
- return status == 0;
- }
-
- // Utils method for unit tests
- @VisibleForTesting
- boolean runProcess(List<String> auroraCmd) {
- return runProcess(auroraCmd, null, null);
- }
-
- private static String getInstancesIdsToKill(Set<PackingPlan.ContainerPlan> containersToRemove) {
- StringBuilder ids = new StringBuilder();
- for (PackingPlan.ContainerPlan containerPlan : containersToRemove) {
- if (ids.length() > 0) {
- ids.append(",");
- }
- ids.append(containerPlan.getId());
- }
- return ids.toString();
- }
-
- // Static method to append verbose and batching options if needed
- private static void appendAuroraCommandOptions(List<String> auroraCmd, boolean isVerbose) {
- // Append verbose if needed
- if (isVerbose) {
- auroraCmd.add("--verbose");
- }
-
- // Append batch size.
- // Note that we can not use "--no-batching" since "restart" command does not accept it.
- // So we play a small trick here by setting batch size Integer.MAX_VALUE.
- auroraCmd.add("--batch-size");
- auroraCmd.add(Integer.toString(Integer.MAX_VALUE));
- }
-}
diff --git a/heron/schedulers/src/java/org/apache/heron/scheduler/aurora/AuroraContext.java b/heron/schedulers/src/java/org/apache/heron/scheduler/aurora/AuroraContext.java
deleted file mode 100644
index d48cf8d7580..00000000000
--- a/heron/schedulers/src/java/org/apache/heron/scheduler/aurora/AuroraContext.java
+++ /dev/null
@@ -1,53 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.heron.scheduler.aurora;
-
-import java.io.File;
-
-import org.apache.heron.spi.common.Config;
-import org.apache.heron.spi.common.Context;
-
-public final class AuroraContext extends Context {
- public static final String JOB_LINK_TEMPLATE = "heron.scheduler.job.link.template";
- public static final String JOB_TEMPLATE = "heron.scheduler.job.template";
- public static final String JOB_MAX_KILL_ATTEMPTS = "heron.scheduler.job.max.kill.attempts";
- public static final String JOB_KILL_RETRY_INTERVAL_MS =
- "heron.scheduler.job.kill.retry.interval.ms";
-
- private AuroraContext() {
- }
-
- public static String getJobLinkTemplate(Config config) {
- return config.getStringValue(JOB_LINK_TEMPLATE);
- }
-
- public static String getHeronAuroraPath(Config config) {
- return config.getStringValue(JOB_TEMPLATE,
- new File(Context.heronConf(config), "heron.aurora").getPath());
- }
-
- public static int getJobMaxKillAttempts(Config config) {
- return config.getIntegerValue(JOB_MAX_KILL_ATTEMPTS, 5);
- }
-
- public static long getJobKillRetryIntervalMs(Config config) {
- return config.getLongValue(JOB_KILL_RETRY_INTERVAL_MS, 2000);
- }
-}
diff --git a/heron/schedulers/src/java/org/apache/heron/scheduler/aurora/AuroraController.java b/heron/schedulers/src/java/org/apache/heron/scheduler/aurora/AuroraController.java
deleted file mode 100644
index a70333908a0..00000000000
--- a/heron/schedulers/src/java/org/apache/heron/scheduler/aurora/AuroraController.java
+++ /dev/null
@@ -1,44 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.heron.scheduler.aurora;
-
-import java.util.Map;
-import java.util.Set;
-
-import org.apache.heron.spi.packing.PackingPlan;
-
-/**
- * Interface that defines how a client interacts with aurora to control the job lifecycle
- */
-public interface AuroraController {
-
- boolean createJob(Map<AuroraField, String> auroraProperties, Map<String, String> extra);
- boolean killJob();
-
- /**
- * Restarts a given container, or the entire job if containerId is null
- * @param containerId ID of container to restart, or entire job if null
- * @return the boolean return value
- */
- boolean restart(Integer containerId);
-
- void removeContainers(Set<PackingPlan.ContainerPlan> containersToRemove);
- Set<Integer> addContainers(Integer count);
-}
diff --git a/heron/schedulers/src/java/org/apache/heron/scheduler/aurora/AuroraField.java b/heron/schedulers/src/java/org/apache/heron/scheduler/aurora/AuroraField.java
deleted file mode 100644
index cc04ec10c98..00000000000
--- a/heron/schedulers/src/java/org/apache/heron/scheduler/aurora/AuroraField.java
+++ /dev/null
@@ -1,39 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.heron.scheduler.aurora;
-
-/**
- * Field names passed to aurora controllers during job creation
- */
-public enum AuroraField {
- CLUSTER,
- CORE_PACKAGE_URI,
- CPUS_PER_CONTAINER,
- DISK_PER_CONTAINER,
- ENVIRON,
- EXECUTOR_BINARY,
- NUM_CONTAINERS,
- RAM_PER_CONTAINER,
- ROLE,
- TIER,
- TOPOLOGY_ARGUMENTS,
- TOPOLOGY_NAME,
- TOPOLOGY_PACKAGE_URI
-}
diff --git a/heron/schedulers/src/java/org/apache/heron/scheduler/aurora/AuroraHeronShellController.java b/heron/schedulers/src/java/org/apache/heron/scheduler/aurora/AuroraHeronShellController.java
deleted file mode 100644
index aa6c708ca7a..00000000000
--- a/heron/schedulers/src/java/org/apache/heron/scheduler/aurora/AuroraHeronShellController.java
+++ /dev/null
@@ -1,132 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.heron.scheduler.aurora;
-
-import java.net.HttpURLConnection;
-import java.util.Map;
-import java.util.Set;
-import java.util.logging.Logger;
-
-import org.apache.heron.proto.system.PhysicalPlans.StMgr;
-import org.apache.heron.spi.common.Config;
-import org.apache.heron.spi.common.Context;
-import org.apache.heron.spi.packing.PackingPlan;
-import org.apache.heron.spi.statemgr.IStateManager;
-import org.apache.heron.spi.statemgr.SchedulerStateManagerAdaptor;
-import org.apache.heron.spi.utils.NetworkUtils;
-import org.apache.heron.spi.utils.ReflectionUtils;
-
-/**
- * Implementation of AuroraController that is a wrapper of AuroraCLIController.
- * The difference is `restart` command:
- * 1. restart whole topology: delegate to AuroraCLIController
- * 2. restart container 0: delegate to AuroraCLIController
- * 3. restart container x(x>0): call heron-shell endpoint `/killexecutor`
- * For backpressure, only containers with heron-stmgr may send out backpressure.
- * This class is to handle `restart backpressure containers inside container`,
- * while delegating to AuroraCLIController for all the other scenarios.
- */
-class AuroraHeronShellController implements AuroraController {
- private static final Logger LOG = Logger.getLogger(AuroraHeronShellController.class.getName());
-
- private final String topologyName;
- private final AuroraCLIController cliController;
- private final SchedulerStateManagerAdaptor stateMgrAdaptor;
-
- AuroraHeronShellController(String jobName, String cluster, String role, String env,
- String auroraFilename, boolean isVerbose, Config localConfig)
- throws ClassNotFoundException, InstantiationException, IllegalAccessException {
- this.topologyName = jobName;
- this.cliController =
- new AuroraCLIController(jobName, cluster, role, env, auroraFilename, isVerbose);
-
- Config config = Config.toClusterMode(localConfig);
- String stateMgrClass = Context.stateManagerClass(config);
- IStateManager stateMgr = ReflectionUtils.newInstance(stateMgrClass);
- stateMgr.initialize(config);
- stateMgrAdaptor = new SchedulerStateManagerAdaptor(stateMgr, 5000);
- }
-
- @Override
- public boolean createJob(Map<AuroraField, String> bindings, Map<String, String> extra) {
- return cliController.createJob(bindings, extra);
- }
-
- @Override
- public boolean killJob() {
- return cliController.killJob();
- }
-
- private StMgr searchContainer(Integer id) {
- String prefix = "stmgr-" + id;
- for (StMgr sm : stateMgrAdaptor.getPhysicalPlan(topologyName).getStmgrsList()) {
- if (sm.getId().equals(prefix)) {
- return sm;
- }
- }
- return null;
- }
-
- // Restart an aurora container
- @Override
- public boolean restart(Integer containerId) {
- // there is no backpressure for container 0, delegate to aurora client
- if (containerId == null || containerId == 0) {
- return cliController.restart(containerId);
- }
-
- if (stateMgrAdaptor == null) {
- LOG.warning("SchedulerStateManagerAdaptor not initialized");
- return false;
- }
-
- StMgr sm = searchContainer(containerId);
- if (sm == null) {
- LOG.warning("container not found in pplan " + containerId);
- return false;
- }
-
- String url = "http://" + sm.getHostName() + ":" + sm.getShellPort() + "/killexecutor";
- String payload = "secret=" + stateMgrAdaptor.getExecutionState(topologyName).getTopologyId();
- LOG.info("sending `kill container` to " + url + "; payload: " + payload);
-
- HttpURLConnection con = NetworkUtils.getHttpConnection(url);
- try {
- if (NetworkUtils.sendHttpPostRequest(con, "X", payload.getBytes())) {
- return NetworkUtils.checkHttpResponseCode(con, 200);
- } else { // if heron-shell command fails, delegate to aurora client
- LOG.info("heron-shell killexecutor failed; try aurora client ..");
- return cliController.restart(containerId);
- }
- } finally {
- con.disconnect();
- }
- }
-
- @Override
- public void removeContainers(Set<PackingPlan.ContainerPlan> containersToRemove) {
- cliController.removeContainers(containersToRemove);
- }
-
- @Override
- public Set<Integer> addContainers(Integer count) {
- return cliController.addContainers(count);
- }
-}
diff --git a/heron/schedulers/src/java/org/apache/heron/scheduler/aurora/AuroraLauncher.java b/heron/schedulers/src/java/org/apache/heron/scheduler/aurora/AuroraLauncher.java
deleted file mode 100644
index 023fcd98626..00000000000
--- a/heron/schedulers/src/java/org/apache/heron/scheduler/aurora/AuroraLauncher.java
+++ /dev/null
@@ -1,58 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.heron.scheduler.aurora;
-
-import org.apache.heron.scheduler.utils.LauncherUtils;
-import org.apache.heron.spi.common.Config;
-import org.apache.heron.spi.packing.PackingPlan;
-import org.apache.heron.spi.scheduler.ILauncher;
-import org.apache.heron.spi.scheduler.IScheduler;
-
-/**
- * Launch topology locally to Aurora.
- */
-
-public class AuroraLauncher implements ILauncher {
- private Config config;
- private Config runtime;
-
- @Override
- public void initialize(Config mConfig, Config mRuntime) {
- this.config = mConfig;
- this.runtime = mRuntime;
- }
-
- @Override
- public void close() {
-
- }
-
- @Override
- public boolean launch(PackingPlan packing) {
- LauncherUtils launcherUtils = LauncherUtils.getInstance();
- Config ytruntime = launcherUtils.createConfigWithPackingDetails(runtime, packing);
- return launcherUtils.onScheduleAsLibrary(config, ytruntime, getScheduler(), packing);
- }
-
- // Get AuroraScheduler
- protected IScheduler getScheduler() {
- return new AuroraScheduler();
- }
-}
diff --git a/heron/schedulers/src/java/org/apache/heron/scheduler/aurora/AuroraScheduler.java b/heron/schedulers/src/java/org/apache/heron/scheduler/aurora/AuroraScheduler.java
deleted file mode 100644
index 9c386f22a95..00000000000
--- a/heron/schedulers/src/java/org/apache/heron/scheduler/aurora/AuroraScheduler.java
+++ /dev/null
@@ -1,299 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.heron.scheduler.aurora;
-
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Map;
-import java.util.Scanner;
-import java.util.Set;
-import java.util.concurrent.ExecutionException;
-import java.util.logging.Level;
-import java.util.logging.Logger;
-
-import com.google.common.base.Optional;
-
-import org.apache.heron.api.generated.TopologyAPI;
-import org.apache.heron.api.utils.TopologyUtils;
-import org.apache.heron.api.utils.Utils;
-import org.apache.heron.proto.scheduler.Scheduler;
-import org.apache.heron.scheduler.UpdateTopologyManager;
-import org.apache.heron.scheduler.utils.Runtime;
-import org.apache.heron.scheduler.utils.SchedulerUtils;
-import org.apache.heron.spi.common.Config;
-import org.apache.heron.spi.common.Context;
-import org.apache.heron.spi.common.Key;
-import org.apache.heron.spi.common.TokenSub;
-import org.apache.heron.spi.packing.PackingPlan;
-import org.apache.heron.spi.packing.Resource;
-import org.apache.heron.spi.scheduler.IScalable;
-import org.apache.heron.spi.scheduler.IScheduler;
-
-public class AuroraScheduler implements IScheduler, IScalable {
- private static final Logger LOG = Logger.getLogger(AuroraLauncher.class.getName());
-
- private Config config;
- private Config runtime;
- private AuroraController controller;
- private UpdateTopologyManager updateTopologyManager;
-
- @Override
- public void initialize(Config mConfig, Config mRuntime) {
- this.config = Config.toClusterMode(mConfig);
- this.runtime = mRuntime;
- try {
- this.controller = getController();
- } catch (ClassNotFoundException | InstantiationException | IllegalAccessException e) {
- LOG.severe("AuroraController initialization failed " + e.getMessage());
- }
- this.updateTopologyManager =
- new UpdateTopologyManager(config, runtime, Optional.<IScalable>of(this));
- }
-
- /**
- * Get an AuroraController based on the config and runtime
- *
- * @return AuroraController
- */
- protected AuroraController getController()
- throws ClassNotFoundException, InstantiationException, IllegalAccessException {
- Boolean cliController = config.getBooleanValue(Key.AURORA_CONTROLLER_CLASS);
- Config localConfig = Config.toLocalMode(this.config);
- if (cliController) {
- return new AuroraCLIController(
- Runtime.topologyName(runtime),
- Context.cluster(localConfig),
- Context.role(localConfig),
- Context.environ(localConfig),
- AuroraContext.getHeronAuroraPath(localConfig),
- Context.verbose(localConfig));
- } else {
- return new AuroraHeronShellController(
- Runtime.topologyName(runtime),
- Context.cluster(localConfig),
- Context.role(localConfig),
- Context.environ(localConfig),
- AuroraContext.getHeronAuroraPath(localConfig),
- Context.verbose(localConfig),
- localConfig);
- }
- }
-
- @Override
- public void close() {
- if (updateTopologyManager != null) {
- updateTopologyManager.close();
- }
- }
-
- @Override
- public boolean onSchedule(PackingPlan packing) {
- if (packing == null || packing.getContainers().isEmpty()) {
- LOG.severe("No container requested. Can't schedule");
- return false;
- }
-
- LOG.info("Launching topology in aurora");
-
- // Align the cpu, RAM, disk to the maximal one, and set them to ScheduledResource
- PackingPlan updatedPackingPlan = packing.cloneWithHomogeneousScheduledResource();
- SchedulerUtils.persistUpdatedPackingPlan(Runtime.topologyName(runtime), updatedPackingPlan,
- Runtime.schedulerStateManagerAdaptor(runtime));
-
- // Use the ScheduledResource to create aurora properties
- // the ScheduledResource is guaranteed to be set after calling
- // cloneWithHomogeneousScheduledResource in the above code
- Resource containerResource =
- updatedPackingPlan.getContainers().iterator().next().getScheduledResource().get();
- Map<AuroraField, String> auroraProperties = createAuroraProperties(containerResource);
- Map<String, String> extraProperties = createExtraProperties(containerResource);
-
- return controller.createJob(auroraProperties, extraProperties);
- }
-
- @Override
- public List<String> getJobLinks() {
- List<String> jobLinks = new ArrayList<>();
-
- //Only the aurora job page is returned
- String jobLinkFormat = AuroraContext.getJobLinkTemplate(config);
- if (jobLinkFormat != null && !jobLinkFormat.isEmpty()) {
- String jobLink = TokenSub.substitute(config, jobLinkFormat);
- jobLinks.add(jobLink);
- }
-
- return jobLinks;
- }
-
- @Override
- public boolean onKill(Scheduler.KillTopologyRequest request) {
- // The aurora service can be unavailable or unstable for a while,
- // we will try to kill the job with multiple attempts
- int attempts = AuroraContext.getJobMaxKillAttempts(config);
- long retryIntervalMs = AuroraContext.getJobKillRetryIntervalMs(config);
- LOG.info("Will try " + attempts + " attempts at interval: " + retryIntervalMs + " ms");
-
- // First attempt
- boolean res = controller.killJob();
- attempts--;
-
- // Failure retry
- while (!res && attempts > 0) {
- LOG.warning("Failed to kill the topology. Will retry in " + retryIntervalMs + " ms...");
- Utils.sleep(retryIntervalMs);
-
- // Retry the killJob()
- res = controller.killJob();
- attempts--;
- }
-
- return res;
- }
-
- @Override
- public boolean onRestart(Scheduler.RestartTopologyRequest request) {
- Integer containerId = null;
- if (request.getContainerIndex() != -1) {
- containerId = request.getContainerIndex();
- }
- return controller.restart(containerId);
- }
-
- @Override
- public boolean onUpdate(Scheduler.UpdateTopologyRequest request) {
- try {
- updateTopologyManager.updateTopology(
- request.getCurrentPackingPlan(), request.getProposedPackingPlan());
- } catch (ExecutionException | InterruptedException e) {
- LOG.log(Level.SEVERE, "Could not update topology for request: " + request, e);
- return false;
- }
- return true;
- }
-
- private static final String CONFIRMED_YES = "y";
- boolean hasConfirmedWithUser(int newContainerCount) {
- LOG.info(String.format("After update there will be %d more containers. "
- + "Please make sure there are sufficient resources to update this job. "
- + "Continue update? [y/N]: ", newContainerCount));
- Scanner scanner = new Scanner(System.in);
- String userInput = scanner.nextLine();
- return CONFIRMED_YES.equalsIgnoreCase(userInput);
- }
-
- @Override
- public Set<PackingPlan.ContainerPlan> addContainers(
- Set<PackingPlan.ContainerPlan> containersToAdd) {
- Set<PackingPlan.ContainerPlan> remapping = new HashSet<>();
- if ("prompt".equalsIgnoreCase(Context.updatePrompt(config))
- && !hasConfirmedWithUser(containersToAdd.size())) {
- LOG.warning("Scheduler updated topology canceled.");
- return remapping;
- }
-
- // Do the actual containers adding
- LinkedList<Integer> newAddedContainerIds = new LinkedList<>(
- controller.addContainers(containersToAdd.size()));
- if (newAddedContainerIds.size() != containersToAdd.size()) {
- throw new RuntimeException(
- "Aurora returned different container count " + newAddedContainerIds.size()
- + "; input count was " + containersToAdd.size());
- }
- // Do the remapping:
- // use the `newAddedContainerIds` to replace the container id in the `containersToAdd`
- for (PackingPlan.ContainerPlan cp : containersToAdd) {
- PackingPlan.ContainerPlan newContainerPlan =
- new PackingPlan.ContainerPlan(
- newAddedContainerIds.pop(), cp.getInstances(),
- cp.getRequiredResource(), cp.getScheduledResource().orNull());
- remapping.add(newContainerPlan);
- }
- LOG.info("The remapping structure: " + remapping);
- return remapping;
- }
-
- @Override
- public void removeContainers(Set<PackingPlan.ContainerPlan> containersToRemove) {
- controller.removeContainers(containersToRemove);
- }
-
- protected Map<AuroraField, String> createAuroraProperties(Resource containerResource) {
- Map<AuroraField, String> auroraProperties = new HashMap<>();
-
- TopologyAPI.Topology topology = Runtime.topology(runtime);
-
- auroraProperties.put(AuroraField.EXECUTOR_BINARY,
- Context.executorBinary(config));
-
- List<String> topologyArgs = new ArrayList<>();
- SchedulerUtils.addExecutorTopologyArgs(topologyArgs, config, runtime);
- String args = String.join(" ", topologyArgs);
- auroraProperties.put(AuroraField.TOPOLOGY_ARGUMENTS, args);
-
- auroraProperties.put(AuroraField.CLUSTER, Context.cluster(config));
- auroraProperties.put(AuroraField.ENVIRON, Context.environ(config));
- auroraProperties.put(AuroraField.ROLE, Context.role(config));
- auroraProperties.put(AuroraField.TOPOLOGY_NAME, topology.getName());
-
- auroraProperties.put(AuroraField.CPUS_PER_CONTAINER,
- Double.toString(containerResource.getCpu()));
- auroraProperties.put(AuroraField.DISK_PER_CONTAINER,
- Long.toString(containerResource.getDisk().asBytes()));
- auroraProperties.put(AuroraField.RAM_PER_CONTAINER,
- Long.toString(containerResource.getRam().asBytes()));
-
- auroraProperties.put(AuroraField.NUM_CONTAINERS,
- Integer.toString(1 + TopologyUtils.getNumContainers(topology)));
-
- // Job configuration attribute 'production' is deprecated.
- // Use 'tier' attribute instead
- // See: http://aurora.apache.org/documentation/latest/reference/configuration/#job-objects
- if ("prod".equals(Context.environ(config))) {
- auroraProperties.put(AuroraField.TIER, "preferred");
- } else {
- auroraProperties.put(AuroraField.TIER, "preemptible");
- }
-
- String heronCoreReleasePkgURI = Context.corePackageUri(config);
- String topologyPkgURI = Runtime.topologyPackageUri(runtime).toString();
-
- auroraProperties.put(AuroraField.CORE_PACKAGE_URI, heronCoreReleasePkgURI);
- auroraProperties.put(AuroraField.TOPOLOGY_PACKAGE_URI, topologyPkgURI);
-
- return auroraProperties;
- }
-
- protected Map<String, String> createExtraProperties(Resource containerResource) {
- Map<String, String> extraProperties = new HashMap<>();
-
- if (config.containsKey(Key.SCHEDULER_PROPERTIES)) {
- String[] meta = config.getStringValue(Key.SCHEDULER_PROPERTIES).split(",");
- extraProperties.put(AuroraContext.JOB_TEMPLATE, meta[0]);
- for (int idx = 1; idx < meta.length; idx++) {
- extraProperties.put("AURORA_METADATA_" + idx, meta[idx]);
- }
- }
-
- return extraProperties;
- }
-}
diff --git a/heron/schedulers/tests/java/BUILD b/heron/schedulers/tests/java/BUILD
index 3ad9ffedf36..f3122052f0c 100644
--- a/heron/schedulers/tests/java/BUILD
+++ b/heron/schedulers/tests/java/BUILD
@@ -25,9 +25,6 @@ scheduler_deps_files = \
common_deps_files + \
spi_deps_files
-aurora_deps_files = [
- "//heron/schedulers/src/java:aurora-scheduler-java",
-]
yarn_deps_files = [
"//heron/packing/src/java:roundrobin-packing",
@@ -89,22 +86,6 @@ nomad_deps_files = \
"//heron/schedulers/src/java:scheduler-utils-java",
]
-java_library(
- name = "aurora-tests",
- srcs = glob(["**/aurora/*.java"]),
- deps = scheduler_deps_files + aurora_deps_files + ["@maven//:commons_cli_commons_cli"],
-)
-
-java_tests(
- size = "small",
- test_classes = [
- "org.apache.heron.scheduler.aurora.AuroraSchedulerTest",
- "org.apache.heron.scheduler.aurora.AuroraLauncherTest",
- "org.apache.heron.scheduler.aurora.AuroraCLIControllerTest",
- "org.apache.heron.scheduler.aurora.AuroraContextTest",
- ],
- runtime_deps = [":aurora-tests"],
-)
java_library(
name = "yarn-tests",
diff --git a/heron/schedulers/tests/java/org/apache/heron/scheduler/aurora/AuroraCLIControllerTest.java b/heron/schedulers/tests/java/org/apache/heron/scheduler/aurora/AuroraCLIControllerTest.java
deleted file mode 100644
index 4fad3998915..00000000000
--- a/heron/schedulers/tests/java/org/apache/heron/scheduler/aurora/AuroraCLIControllerTest.java
+++ /dev/null
@@ -1,200 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.heron.scheduler.aurora;
-
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Comparator;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.SortedSet;
-import java.util.TreeSet;
-
-import org.junit.After;
-import org.junit.AfterClass;
-import org.junit.Assert;
-import org.junit.Before;
-import org.junit.BeforeClass;
-import org.junit.Test;
-import org.mockito.Matchers;
-import org.mockito.Mockito;
-import org.mockito.invocation.InvocationOnMock;
-import org.mockito.stubbing.Answer;
-
-import org.apache.heron.spi.packing.PackingPlan;
-import org.apache.heron.spi.utils.PackingTestUtils;
-
-public class AuroraCLIControllerTest {
- private static final String JOB_NAME = "jobName";
- private static final String CLUSTER = "cluster";
- private static final String ROLE = "role";
- private static final String ENV = "gz";
- private static final String AURORA_FILENAME = "file.aurora";
- private static final String VERBOSE_CONFIG = "--verbose";
- private static final String BATCH_CONFIG = "--batch-size";
- private static final String JOB_SPEC = String.format("%s/%s/%s/%s", CLUSTER, ROLE, ENV, JOB_NAME);
- private static final boolean IS_VERBOSE = true;
-
- private AuroraCLIController controller;
-
- @BeforeClass
- public static void beforeClass() throws Exception {
-
- }
-
- @AfterClass
- public static void afterClass() throws Exception {
- }
-
- @Before
- public void setUp() throws Exception {
- controller = Mockito.spy(
- new AuroraCLIController(JOB_NAME, CLUSTER, ROLE, ENV, AURORA_FILENAME, IS_VERBOSE));
- }
-
- @After
- public void after() throws Exception {
- }
-
- @Test
- public void testCreateJob() throws Exception {
- Map<AuroraField, String> bindings = new HashMap<>();
- Map<String, String> bindings2 = new HashMap<>();
- List<String> expectedCommand = asList("aurora job create --wait-until RUNNING %s %s %s",
- JOB_SPEC, AURORA_FILENAME, VERBOSE_CONFIG);
-
- // Failed
- Mockito.doReturn(false).when(controller).runProcess(Matchers.anyListOf(String.class));
- Assert.assertFalse(controller.createJob(bindings, bindings2));
- Mockito.verify(controller).runProcess(Mockito.eq(expectedCommand));
-
- // Happy path
- Mockito.doReturn(true).when(controller).runProcess(Matchers.anyListOf(String.class));
- Assert.assertTrue(controller.createJob(bindings, bindings2));
- Mockito.verify(controller, Mockito.times(2)).runProcess(expectedCommand);
- }
-
- @Test
- public void testKillJob() throws Exception {
- List<String> expectedCommand = asList("aurora job killall %s %s %s %d",
- JOB_SPEC, VERBOSE_CONFIG, BATCH_CONFIG, Integer.MAX_VALUE);
-
- // Failed
- Mockito.doReturn(false).when(controller).runProcess(Matchers.anyListOf(String.class));
- Assert.assertFalse(controller.killJob());
- Mockito.verify(controller).runProcess(Mockito.eq(expectedCommand));
-
- // Happy path
- Mockito.doReturn(true).when(controller).runProcess(Matchers.anyListOf(String.class));
- Assert.assertTrue(controller.killJob());
- Mockito.verify(controller, Mockito.times(2)).runProcess(expectedCommand);
- }
-
- @Test
- public void testRestartJob() throws Exception {
- int containerId = 1;
- List<String> expectedCommand = asList("aurora job restart %s/%s %s %s %d",
- JOB_SPEC, containerId, VERBOSE_CONFIG, BATCH_CONFIG, Integer.MAX_VALUE);
-
- // Failed
- Mockito.doReturn(false).when(controller).runProcess(Matchers.anyListOf(String.class));
- Assert.assertFalse(controller.restart(containerId));
- Mockito.verify(controller).runProcess(Mockito.eq(expectedCommand));
-
- // Happy path
- Mockito.doReturn(true).when(controller).runProcess(Matchers.anyListOf(String.class));
- Assert.assertTrue(controller.restart(containerId));
- Mockito.verify(controller, Mockito.times(2)).runProcess(expectedCommand);
- }
-
- @Test
- public void testRemoveContainers() {
- class ContainerPlanComparator implements Comparator<PackingPlan.ContainerPlan> {
- @Override
- public int compare(PackingPlan.ContainerPlan o1, PackingPlan.ContainerPlan o2) {
- return ((Integer) o1.getId()).compareTo(o2.getId());
- }
- }
- SortedSet<PackingPlan.ContainerPlan> containers = new TreeSet<>(new ContainerPlanComparator());
- containers.add(PackingTestUtils.testContainerPlan(3));
- containers.add(PackingTestUtils.testContainerPlan(5));
-
- List<String> expectedCommand = asList("aurora job kill %s/3,5 %s %s %d",
- JOB_SPEC, VERBOSE_CONFIG, BATCH_CONFIG, Integer.MAX_VALUE);
-
- Mockito.doReturn(true).when(controller).runProcess(Matchers.anyListOf(String.class));
- controller.removeContainers(containers);
- Mockito.verify(controller).runProcess(Mockito.eq(expectedCommand));
- }
-
- @Test
- public void testAddContainers() {
- Integer containersToAdd = 3;
- List<String> expectedCommand = asList(
- "aurora job add --wait-until RUNNING %s/0 %s %s",
- JOB_SPEC, containersToAdd.toString(), VERBOSE_CONFIG);
-
- Mockito.doAnswer(new Answer<Boolean>() {
- @Override
- public Boolean answer(InvocationOnMock arg0) throws Throwable {
- final StringBuilder originalArgument = (StringBuilder) (arg0.getArguments())[2];
- originalArgument.append("Querying instance statuses: [1, 2, 3]");
- return true;
- }
- }).when(controller).runProcess(
- Matchers.anyListOf(String.class),
- Matchers.any(StringBuilder.class),
- Matchers.any(StringBuilder.class));
- Set<Integer> ret = controller.addContainers(containersToAdd);
- Assert.assertEquals(containersToAdd.intValue(), ret.size());
- Mockito.verify(controller)
- .runProcess(Matchers.eq(expectedCommand), Matchers.any(), Matchers.any());
- }
-
- @Test
- public void testAddContainersFailure() {
- Integer containersToAdd = 3;
- List<String> expectedCommand = asList(
- "aurora job add --wait-until RUNNING %s/0 %s %s",
- JOB_SPEC, containersToAdd.toString(), VERBOSE_CONFIG);
-
- Mockito.doAnswer(new Answer<Boolean>() {
- @Override
- public Boolean answer(InvocationOnMock arg0) throws Throwable {
- final StringBuilder originalArgument = (StringBuilder) (arg0.getArguments())[2];
- originalArgument.append("Querying instance statuses: x");
- return true;
- }
- }).when(controller).runProcess(
- Matchers.anyListOf(String.class),
- Matchers.any(StringBuilder.class),
- Matchers.any(StringBuilder.class));
- Set<Integer> ret = controller.addContainers(containersToAdd);
- Assert.assertEquals(0, ret.size());
- Mockito.verify(controller)
- .runProcess(Matchers.eq(expectedCommand), Matchers.any(), Matchers.any());
- }
-
- private static List<String> asList(String command, Object... values) {
- return new ArrayList<>(Arrays.asList(String.format(command, values).split(" ")));
- }
-}
diff --git a/heron/schedulers/tests/java/org/apache/heron/scheduler/aurora/AuroraContextTest.java b/heron/schedulers/tests/java/org/apache/heron/scheduler/aurora/AuroraContextTest.java
deleted file mode 100644
index 4f4017f41a7..00000000000
--- a/heron/schedulers/tests/java/org/apache/heron/scheduler/aurora/AuroraContextTest.java
+++ /dev/null
@@ -1,49 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.heron.scheduler.aurora;
-
-import org.junit.Assert;
-import org.junit.Test;
-
-import org.apache.heron.spi.common.Config;
-import org.apache.heron.spi.common.Key;
-
-public class AuroraContextTest {
-
- @Test
- public void testUsesConfigString() {
- final String auroraTemplate = "/dir/test.aurora";
- Config config = Config.newBuilder()
- .put(AuroraContext.JOB_TEMPLATE, auroraTemplate)
- .put(Key.HERON_CONF, "/test")
- .build();
- Assert.assertEquals("Expected to use value from JOB_TEMPLATE config",
- auroraTemplate, AuroraContext.getHeronAuroraPath(config));
- }
-
- @Test
- public void testFallback() {
- Config config = Config.newBuilder()
- .put(Key.HERON_CONF, "/test")
- .build();
- Assert.assertEquals("Expected to use heron_conf/heron.aurora", "/test/heron.aurora",
- AuroraContext.getHeronAuroraPath(config));
- }
-}
diff --git a/heron/schedulers/tests/java/org/apache/heron/scheduler/aurora/AuroraLauncherTest.java b/heron/schedulers/tests/java/org/apache/heron/scheduler/aurora/AuroraLauncherTest.java
deleted file mode 100644
index 76e94a5d9e9..00000000000
--- a/heron/schedulers/tests/java/org/apache/heron/scheduler/aurora/AuroraLauncherTest.java
+++ /dev/null
@@ -1,80 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.heron.scheduler.aurora;
-
-import org.junit.Assert;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.mockito.Mockito;
-import org.powermock.api.mockito.PowerMockito;
-import org.powermock.core.classloader.annotations.PowerMockIgnore;
-import org.powermock.core.classloader.annotations.PrepareForTest;
-import org.powermock.modules.junit4.PowerMockRunner;
-
-import org.apache.heron.scheduler.utils.LauncherUtils;
-import org.apache.heron.spi.common.Config;
-import org.apache.heron.spi.packing.PackingPlan;
-import org.apache.heron.spi.scheduler.IScheduler;
-
-@RunWith(PowerMockRunner.class)
-@PowerMockIgnore("jdk.internal.reflect.*")
-@PrepareForTest(LauncherUtils.class)
-public class AuroraLauncherTest {
- @Test
- public void testLaunch() throws Exception {
- Config config = Config.newBuilder().build();
- AuroraLauncher launcher = Mockito.spy(AuroraLauncher.class);
- launcher.initialize(config, config);
-
- LauncherUtils mockLauncherUtils = Mockito.mock(LauncherUtils.class);
- PowerMockito.spy(LauncherUtils.class);
- PowerMockito.doReturn(mockLauncherUtils).when(LauncherUtils.class, "getInstance");
-
- // Failed to schedule
- Mockito.when(mockLauncherUtils.onScheduleAsLibrary(
- Mockito.any(Config.class),
- Mockito.any(Config.class),
- Mockito.any(IScheduler.class),
- Mockito.any(PackingPlan.class))).thenReturn(false);
-
- Assert.assertFalse(launcher.launch(Mockito.mock(PackingPlan.class)));
- Mockito.verify(mockLauncherUtils).onScheduleAsLibrary(
- Mockito.any(Config.class),
- Mockito.any(Config.class),
- Mockito.any(IScheduler.class),
- Mockito.any(PackingPlan.class));
-
- // Happy path
- Mockito.when(mockLauncherUtils.onScheduleAsLibrary(
- Mockito.any(Config.class),
- Mockito.any(Config.class),
- Mockito.any(IScheduler.class),
- Mockito.any(PackingPlan.class))).thenReturn(true);
-
- Assert.assertTrue(launcher.launch(Mockito.mock(PackingPlan.class)));
- Mockito.verify(mockLauncherUtils, Mockito.times(2)).onScheduleAsLibrary(
- Mockito.any(Config.class),
- Mockito.any(Config.class),
- Mockito.any(IScheduler.class),
- Mockito.any(PackingPlan.class));
-
- launcher.close();
- }
-}
diff --git a/heron/schedulers/tests/java/org/apache/heron/scheduler/aurora/AuroraSchedulerTest.java b/heron/schedulers/tests/java/org/apache/heron/scheduler/aurora/AuroraSchedulerTest.java
deleted file mode 100644
index a73a7e0676d..00000000000
--- a/heron/schedulers/tests/java/org/apache/heron/scheduler/aurora/AuroraSchedulerTest.java
+++ /dev/null
@@ -1,372 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.heron.scheduler.aurora;
-
-import java.net.URI;
-import java.net.URISyntaxException;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-
-import org.junit.After;
-import org.junit.AfterClass;
-import org.junit.Assert;
-import org.junit.Before;
-import org.junit.BeforeClass;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.mockito.Matchers;
-import org.mockito.Mockito;
-import org.powermock.api.mockito.PowerMockito;
-import org.powermock.core.classloader.annotations.PowerMockIgnore;
-import org.powermock.core.classloader.annotations.PrepareForTest;
-import org.powermock.modules.junit4.PowerMockRunner;
-
-import org.apache.commons.cli.CommandLine;
-import org.apache.heron.api.generated.TopologyAPI;
-import org.apache.heron.common.basics.ByteAmount;
-import org.apache.heron.common.utils.topology.TopologyTests;
-import org.apache.heron.proto.scheduler.Scheduler;
-import org.apache.heron.proto.system.PackingPlans;
-import org.apache.heron.scheduler.SubmitterMain;
-import org.apache.heron.spi.common.Config;
-import org.apache.heron.spi.common.Key;
-import org.apache.heron.spi.common.TokenSub;
-import org.apache.heron.spi.packing.PackingPlan;
-import org.apache.heron.spi.packing.Resource;
-import org.apache.heron.spi.statemgr.SchedulerStateManagerAdaptor;
-import org.apache.heron.spi.utils.PackingTestUtils;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
-import static org.mockito.Matchers.anyString;
-import static org.mockito.Mockito.any;
-import static org.mockito.Mockito.doReturn;
-import static org.mockito.Mockito.eq;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
-
-@RunWith(PowerMockRunner.class)
-@PowerMockIgnore("jdk.internal.reflect.*")
-@PrepareForTest({TokenSub.class, Config.class})
-public class AuroraSchedulerTest {
- private static final String AURORA_PATH = "path.aurora";
- private static final String PACKING_PLAN_ID = "packing.plan.id";
- private static final String TOPOLOGY_NAME = "topologyName";
- private static final int CONTAINER_ID = 7;
-
- private static AuroraScheduler scheduler;
-
- @Before
- public void setUp() throws Exception {
- }
-
- @After
- public void after() throws Exception {
- }
-
- @BeforeClass
- public static void beforeClass() throws Exception {
- scheduler = Mockito.spy(AuroraScheduler.class);
- doReturn(new HashMap<String, String>())
- .when(scheduler).createAuroraProperties(Mockito.any(Resource.class));
- }
-
- @AfterClass
- public static void afterClass() throws Exception {
- scheduler.close();
- }
-
- /**
- * Tests that we can schedule
- */
- @Test
- public void testOnSchedule() throws Exception {
- AuroraController controller = Mockito.mock(AuroraController.class);
- doReturn(controller).when(scheduler).getController();
-
- SchedulerStateManagerAdaptor stateManager = mock(SchedulerStateManagerAdaptor.class);
- Config runtime = Mockito.mock(Config.class);
- when(runtime.get(Key.SCHEDULER_STATE_MANAGER_ADAPTOR)).thenReturn(stateManager);
- when(runtime.getStringValue(Key.TOPOLOGY_NAME)).thenReturn(TOPOLOGY_NAME);
-
- Config mConfig = Mockito.mock(Config.class);
- PowerMockito.mockStatic(Config.class);
- when(Config.toClusterMode(mConfig)).thenReturn(mConfig);
-
- when(mConfig.getStringValue(eq(AuroraContext.JOB_TEMPLATE),
- anyString())).thenReturn(AURORA_PATH);
-
- scheduler.initialize(mConfig, runtime);
-
- // Fail to schedule due to null PackingPlan
- Assert.assertFalse(scheduler.onSchedule(null));
-
- PackingPlan plan = new PackingPlan(PACKING_PLAN_ID, new HashSet<PackingPlan.ContainerPlan>());
- assertTrue(plan.getContainers().isEmpty());
-
- // Fail to schedule due to PackingPlan is empty
- Assert.assertFalse(scheduler.onSchedule(plan));
-
- // Construct valid PackingPlan
- Set<PackingPlan.ContainerPlan> containers = new HashSet<>();
- containers.add(PackingTestUtils.testContainerPlan(CONTAINER_ID));
- PackingPlan validPlan = new PackingPlan(PACKING_PLAN_ID, containers);
-
- // Failed to create job via controller
- doReturn(false).when(controller)
- .createJob(Matchers.anyMapOf(AuroraField.class, String.class),
- Matchers.anyMapOf(String.class, String.class));
- doReturn(true).when(stateManager)
- .updatePackingPlan(any(PackingPlans.PackingPlan.class), eq(TOPOLOGY_NAME));
-
- Assert.assertFalse(scheduler.onSchedule(validPlan));
-
- Mockito.verify(controller)
- .createJob(Matchers.anyMapOf(AuroraField.class, String.class),
- Matchers.anyMapOf(String.class, String.class));
- Mockito.verify(stateManager)
- .updatePackingPlan(any(PackingPlans.PackingPlan.class), eq(TOPOLOGY_NAME));
-
- // Happy path
- doReturn(true).when(controller)
- .createJob(Matchers.anyMapOf(AuroraField.class, String.class),
- Matchers.anyMapOf(String.class, String.class));
- assertTrue(scheduler.onSchedule(validPlan));
-
- Mockito.verify(controller, Mockito.times(2))
- .createJob(Matchers.anyMapOf(AuroraField.class, String.class),
- Matchers.anyMapOf(String.class, String.class));
- Mockito.verify(stateManager, Mockito.times(2))
- .updatePackingPlan(any(PackingPlans.PackingPlan.class), eq(TOPOLOGY_NAME));
- }
-
- @Test
- public void testOnKill() throws Exception {
- Config mockConfig = Mockito.mock(Config.class);
- PowerMockito.mockStatic(Config.class);
- when(Config.toClusterMode(mockConfig)).thenReturn(mockConfig);
-
- AuroraController controller = Mockito.mock(AuroraController.class);
- doReturn(controller).when(scheduler).getController();
- scheduler.initialize(mockConfig, Mockito.mock(Config.class));
-
- // Failed to kill job via controller
- doReturn(false).when(controller).killJob();
- Assert.assertFalse(scheduler.onKill(Scheduler.KillTopologyRequest.getDefaultInstance()));
- Mockito.verify(controller).killJob();
-
- // Happy path
- doReturn(true).when(controller).killJob();
- assertTrue(scheduler.onKill(Scheduler.KillTopologyRequest.getDefaultInstance()));
- Mockito.verify(controller, Mockito.times(2)).killJob();
- }
-
- @Test
- public void testOnRestart() throws Exception {
- Config mockConfig = Mockito.mock(Config.class);
- PowerMockito.mockStatic(Config.class);
- when(Config.toClusterMode(mockConfig)).thenReturn(mockConfig);
-
- AuroraController controller = Mockito.mock(AuroraController.class);
- doReturn(controller).when(scheduler).getController();
- scheduler.initialize(mockConfig, Mockito.mock(Config.class));
-
- // Construct the RestartTopologyRequest
- int containerToRestart = 1;
- Scheduler.RestartTopologyRequest restartTopologyRequest =
- Scheduler.RestartTopologyRequest.newBuilder().
- setTopologyName(TOPOLOGY_NAME).setContainerIndex(containerToRestart).
- build();
-
- // Failed to kill job via controller
- doReturn(false).when(
- controller).restart(containerToRestart);
- Assert.assertFalse(scheduler.onRestart(restartTopologyRequest));
- Mockito.verify(controller).restart(containerToRestart);
-
- // Happy path
- doReturn(true).when(
- controller).restart(containerToRestart);
- assertTrue(scheduler.onRestart(restartTopologyRequest));
- Mockito.verify(controller, Mockito.times(2)).restart(containerToRestart);
- }
-
- @Test
- public void testGetJobLinks() throws Exception {
- final String JOB_LINK_FORMAT = "http://go/${CLUSTER}/${ROLE}/${ENVIRON}/${TOPOLOGY}";
- final String SUBSTITUTED_JOB_LINK = "http://go/local/heron/test/test_topology";
-
- Config mockConfig = Mockito.mock(Config.class);
- when(mockConfig.getStringValue(AuroraContext.JOB_LINK_TEMPLATE))
- .thenReturn(JOB_LINK_FORMAT);
-
- PowerMockito.mockStatic(Config.class);
- when(Config.toClusterMode(mockConfig)).thenReturn(mockConfig);
-
- AuroraController controller = Mockito.mock(AuroraController.class);
- doReturn(controller).when(scheduler).getController();
- scheduler.initialize(mockConfig, Mockito.mock(Config.class));
-
- PowerMockito.spy(TokenSub.class);
- PowerMockito.doReturn(SUBSTITUTED_JOB_LINK)
- .when(TokenSub.class, "substitute", mockConfig, JOB_LINK_FORMAT);
-
- List<String> result = scheduler.getJobLinks();
-
- assertEquals(1, result.size());
- assertTrue(result.get(0).equals(SUBSTITUTED_JOB_LINK));
- }
-
-
- @Test
- public void testProperties() throws URISyntaxException {
- TopologyAPI.Topology topology = TopologyTests.createTopology(
- TOPOLOGY_NAME, new org.apache.heron.api.Config(),
- "spoutName", "boltName", 1, 1);
-
- Config runtime = mock(Config.class);
- when(runtime.get(Key.TOPOLOGY_DEFINITION)).thenReturn(topology);
- when(runtime.get(Key.TOPOLOGY_PACKAGE_URI)).thenReturn(new URI("http://foo/bar"));
-
- // This must mimic how SubmitterMain loads configs
- CommandLine commandLine = mock(CommandLine.class);
- when(commandLine.getOptionValue("cluster")).thenReturn("some_cluster");
- when(commandLine.getOptionValue("role")).thenReturn("some_role");
- when(commandLine.getOptionValue("environment")).thenReturn("some_env");
- when(commandLine.getOptionValue("heron_home")).thenReturn("/some/heron/home");
- when(commandLine.getOptionValue("config_path")).thenReturn("/some/config/path");
- when(commandLine.getOptionValue("topology_package")).thenReturn("jar");
- when(commandLine.getOptionValue("topology_defn")).thenReturn("/mock/defnFile.defn");
- when(commandLine.getOptionValue("topology_bin")).thenReturn("binaryFile.jar");
- Config config = Mockito.spy(SubmitterMain.loadConfig(commandLine, topology));
-
- AuroraScheduler testScheduler = new AuroraScheduler();
- testScheduler.initialize(config, runtime);
- Resource containerResource =
- new Resource(2.3, ByteAmount.fromGigabytes(2), ByteAmount.fromGigabytes(3));
- Map<AuroraField, String> properties = testScheduler.createAuroraProperties(containerResource);
-
- // this part is key, the conf path in the config is absolute to the install dir, but what
- // aurora properties get below is the relative ./heron-conf path to be used when run remotely
- assertEquals("Invalid value for key " + Key.HERON_CONF,
- "/some/config/path", config.getStringValue(Key.HERON_CONF));
-
- String expectedConf = "./heron-conf";
- String expectedBin = "./heron-core/bin";
- String expectedLib = "./heron-core/lib";
- String expectedDist = "./heron-core/dist";
- for (AuroraField field : AuroraField.values()) {
- boolean asserted = false;
- Object expected = null;
- Object found = properties.get(field);
- switch (field) {
- case CORE_PACKAGE_URI:
- expected = expectedDist + "/heron-core.tar.gz";
- break;
- case CPUS_PER_CONTAINER:
- expected = Double.valueOf(containerResource.getCpu()).toString();
- break;
- case DISK_PER_CONTAINER:
- expected = Long.valueOf(containerResource.getDisk().asBytes()).toString();
- break;
- case RAM_PER_CONTAINER:
- expected = Long.valueOf(containerResource.getRam().asBytes()).toString();
- break;
- case TIER:
- expected = "preemptible";
- break;
- case NUM_CONTAINERS:
- expected = "2";
- break;
- case EXECUTOR_BINARY:
- expected = expectedBin + "/heron-executor";
- break;
- case TOPOLOGY_PACKAGE_URI:
- expected = "http://foo/bar";
- break;
- case TOPOLOGY_ARGUMENTS:
- expected = "--topology-name=topologyName"
- + " --topology-id=" + topology.getId()
- + " --topology-defn-file=defnFile.defn"
- + " --state-manager-connection=null"
- + " --state-manager-root=null"
- + " --state-manager-config-file=" + expectedConf + "/statemgr.yaml"
- + " --tmanager-binary=" + expectedBin + "/heron-tmanager"
- + " --stmgr-binary=" + expectedBin + "/heron-stmgr"
- + " --metrics-manager-classpath=" + expectedLib + "/metricsmgr/*"
- + " --instance-jvm-opts=\"\""
- + " --classpath=binaryFile.jar"
- + " --heron-internals-config-file=" + expectedConf + "/heron_internals.yaml"
- + " --override-config-file=" + expectedConf + "/override.yaml"
- + " --component-ram-map=null"
- + " --component-jvm-opts=\"\""
- + " --pkg-type=jar"
- + " --topology-binary-file=binaryFile.jar"
- + " --heron-java-home=/usr/lib/jvm/default-java"
- + " --heron-shell-binary=" + expectedBin + "/heron-shell"
- + " --cluster=some_cluster"
- + " --role=some_role"
- + " --environment=some_env"
- + " --instance-classpath=" + expectedLib + "/instance/*"
- + " --metrics-sinks-config-file=" + expectedConf + "/metrics_sinks.yaml"
- + " --scheduler-classpath=" + expectedLib + "/scheduler/*:./heron-core"
- + "/lib/packing/*:" + expectedLib + "/statemgr/*"
- + " --python-instance-binary=" + expectedBin + "/heron-python-instance"
- + " --cpp-instance-binary=" + expectedBin + "/heron-cpp-instance"
- + " --metricscache-manager-classpath=" + expectedLib + "/metricscachemgr/*"
- + " --metricscache-manager-mode=disabled"
- + " --is-stateful=false"
- + " --checkpoint-manager-classpath=" + expectedLib + "/ckptmgr/*:"
- + expectedLib + "/statefulstorage/*:"
- + " --stateful-config-file=" + expectedConf + "/stateful.yaml"
- + " --checkpoint-manager-ram=1073741824"
- + " --health-manager-mode=disabled"
- + " --health-manager-classpath=" + expectedLib + "/healthmgr/*";
- break;
- case CLUSTER:
- expected = "some_cluster";
- break;
- case ENVIRON:
- expected = "some_env";
- break;
- case ROLE:
- expected = "some_role";
- break;
- case TOPOLOGY_NAME:
- expected = "topologyName";
- break;
- default:
- fail(String.format(
- "Expected value for Aurora field %s not found in test (found=%s)", field, found));
- }
- if (!asserted) {
- assertEquals("Incorrect value found for field " + field, expected, found);
- }
- properties.remove(field);
- }
-
- assertTrue("The following aurora fields were not set by the scheduler: " + properties,
- properties.isEmpty());
- }
-}