You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@heron.apache.org by sa...@apache.org on 2022/05/09 21:35:56 UTC

[incubator-heron] 01/07: [Sched] Removed Aurora from schedulers.

This is an automated email from the ASF dual-hosted git repository.

saadurrahman pushed a commit to branch saadurrahman/3829-Deprecate-Apache-Aurora-dev
in repository https://gitbox.apache.org/repos/asf/incubator-heron.git

commit bfd151cf14034660ef7e5d767984d24f4f4a93a1
Author: Saad Ur Rahman <sa...@apache.org>
AuthorDate: Mon May 9 17:18:23 2022 -0400

    [Sched] Removed Aurora from schedulers.
    
    Apache Aurora source and tests removed from scheduler.
---
 heron/schedulers/src/java/BUILD                    |  21 --
 .../scheduler/aurora/AuroraCLIController.java      | 214 ------------
 .../heron/scheduler/aurora/AuroraContext.java      |  53 ---
 .../heron/scheduler/aurora/AuroraController.java   |  44 ---
 .../apache/heron/scheduler/aurora/AuroraField.java |  39 ---
 .../aurora/AuroraHeronShellController.java         | 132 --------
 .../heron/scheduler/aurora/AuroraLauncher.java     |  58 ----
 .../heron/scheduler/aurora/AuroraScheduler.java    | 299 -----------------
 heron/schedulers/tests/java/BUILD                  |  19 --
 .../scheduler/aurora/AuroraCLIControllerTest.java  | 200 -----------
 .../heron/scheduler/aurora/AuroraContextTest.java  |  49 ---
 .../heron/scheduler/aurora/AuroraLauncherTest.java |  80 -----
 .../scheduler/aurora/AuroraSchedulerTest.java      | 372 ---------------------
 13 files changed, 1580 deletions(-)

diff --git a/heron/schedulers/src/java/BUILD b/heron/schedulers/src/java/BUILD
index b223c7bc440..caa66719e13 100644
--- a/heron/schedulers/src/java/BUILD
+++ b/heron/schedulers/src/java/BUILD
@@ -93,27 +93,6 @@ genrule(
     cmd = "cp $< $@",
 )
 
-java_library(
-    name = "aurora-scheduler-java",
-    srcs = glob(["**/aurora/*.java"]),
-    resources = glob(["**/aurora/*.aurora"]),
-    deps = scheduler_deps_files,
-)
-
-java_binary(
-    name = "aurora-scheduler-unshaded",
-    srcs = glob(["**/aurora/*.java"]),
-    resources = glob(["**/aurora/*.aurora"]),
-    deps = scheduler_deps_files,
-)
-
-genrule(
-    name = "heron-aurora-scheduler",
-    srcs = [":aurora-scheduler-unshaded_deploy.jar"],
-    outs = ["heron-aurora-scheduler.jar"],
-    cmd = "cp $< $@",
-)
-
 java_library(
     name = "null-scheduler-java",
     srcs = glob(
diff --git a/heron/schedulers/src/java/org/apache/heron/scheduler/aurora/AuroraCLIController.java b/heron/schedulers/src/java/org/apache/heron/scheduler/aurora/AuroraCLIController.java
deleted file mode 100644
index 5cd710a7bc0..00000000000
--- a/heron/schedulers/src/java/org/apache/heron/scheduler/aurora/AuroraCLIController.java
+++ /dev/null
@@ -1,214 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.heron.scheduler.aurora;
-
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.logging.Logger;
-import java.util.stream.Collectors;
-
-import com.google.common.annotations.VisibleForTesting;
-
-import org.apache.heron.spi.packing.PackingPlan;
-import org.apache.heron.spi.utils.ShellUtils;
-
-/**
- * Implementation of AuroraController that shells out to the Aurora CLI to control the Aurora
- * scheduler workflow of a topology.
- */
-class AuroraCLIController implements AuroraController {
-  private static final Logger LOG = Logger.getLogger(AuroraCLIController.class.getName());
-
-  private final String jobSpec;
-  private final boolean isVerbose;
-  private String auroraFilename;
-
-  AuroraCLIController(
-      String jobName,
-      String cluster,
-      String role,
-      String env,
-      String auroraFilename,
-      boolean isVerbose) {
-    this.auroraFilename = auroraFilename;
-    this.isVerbose = isVerbose;
-    this.jobSpec = String.format("%s/%s/%s/%s", cluster, role, env, jobName);
-  }
-
-  @Override
-  public boolean createJob(Map<AuroraField, String> bindings, Map<String, String> extra) {
-    List<String> auroraCmd =
-        new ArrayList<>(Arrays.asList("aurora", "job", "create", "--wait-until", "RUNNING"));
-
-    for (AuroraField field : bindings.keySet()) {
-      auroraCmd.add("--bind");
-      auroraCmd.add(String.format("%s=%s", field, bindings.get(field)));
-    }
-
-    if (!extra.isEmpty()) {
-      for (String field : extra.keySet()) {
-        if (field.equals(AuroraContext.JOB_TEMPLATE)) {
-          auroraFilename = auroraFilename.replace("heron.aurora", extra.get(field));
-        } else {
-          auroraCmd.add("--bind");
-          auroraCmd.add(String.format("%s=%s", field, extra.get(field)));
-        }
-      }
-    }
-
-    auroraCmd.add(jobSpec);
-    auroraCmd.add(auroraFilename);
-
-    if (isVerbose) {
-      auroraCmd.add("--verbose");
-    }
-
-    return runProcess(auroraCmd);
-  }
-
-  // Kill an aurora job
-  @Override
-  public boolean killJob() {
-    List<String> auroraCmd = new ArrayList<>(Arrays.asList("aurora", "job", "killall"));
-    auroraCmd.add(jobSpec);
-
-    appendAuroraCommandOptions(auroraCmd, isVerbose);
-
-    return runProcess(auroraCmd);
-  }
-
-  // Restart an aurora job
-  @Override
-  public boolean restart(Integer containerId) {
-    List<String> auroraCmd = new ArrayList<>(Arrays.asList("aurora", "job", "restart"));
-    if (containerId != null) {
-      auroraCmd.add(String.format("%s/%d", jobSpec, containerId));
-    } else {
-      auroraCmd.add(jobSpec);
-    }
-
-    appendAuroraCommandOptions(auroraCmd, isVerbose);
-
-    return runProcess(auroraCmd);
-  }
-
-  @Override
-  public void removeContainers(Set<PackingPlan.ContainerPlan> containersToRemove) {
-    String instancesToKill = getInstancesIdsToKill(containersToRemove);
-    //aurora job kill <cluster>/<role>/<env>/<name>/<instance_ids>
-    List<String> auroraCmd = new ArrayList<>(Arrays.asList(
-        "aurora", "job", "kill", jobSpec + "/" + instancesToKill));
-
-    appendAuroraCommandOptions(auroraCmd, isVerbose);
-    LOG.info(String.format(
-        "Killing %s aurora containers: %s", containersToRemove.size(), auroraCmd));
-    if (!runProcess(auroraCmd)) {
-      throw new RuntimeException("Failed to kill freed aurora instances: " + instancesToKill);
-    }
-  }
-
-  private static final String ERR_PROMPT =
-      "The topology can be in a strange stage. Please check carefully or redeploy the topology !!";
-
-  @Override
-  public Set<Integer> addContainers(Integer count) {
-    //aurora job add <cluster>/<role>/<env>/<name>/<instance_id> <count>
-    //clone instance 0
-    List<String> auroraCmd = new ArrayList<>(Arrays.asList(
-        "aurora", "job", "add", "--wait-until", "RUNNING",
-        jobSpec + "/0", count.toString(), "--verbose"));
-
-    LOG.info(String.format("Requesting %s new aurora containers %s", count, auroraCmd));
-    StringBuilder stderr = new StringBuilder();
-    if (!runProcess(auroraCmd, null, stderr)) {
-      throw new RuntimeException(
-          "Failed to create " + count + " new aurora instances. " + ERR_PROMPT);
-    }
-
-    if (stderr.length() <= 0) { // no container was added
-      throw new RuntimeException("Empty output by Aurora. " + ERR_PROMPT);
-    }
-    return extractContainerIds(stderr.toString());
-  }
-
-  private Set<Integer> extractContainerIds(String auroraOutputStr) {
-    String pattern = "Querying instance statuses: [";
-    int idx1 = auroraOutputStr.indexOf(pattern);
-    if (idx1 < 0) { // no container was added
-      LOG.info("stdout & stderr by Aurora " + auroraOutputStr);
-      return new HashSet<Integer>();
-    }
-    idx1 += pattern.length();
-    int idx2 = auroraOutputStr.indexOf("]", idx1);
-    String containerIdStr = auroraOutputStr.substring(idx1, idx2);
-    LOG.info("container IDs returned by Aurora " + containerIdStr);
-    return Arrays.asList(containerIdStr.split(", "))
-        .stream().map(x->Integer.valueOf(x)).collect(Collectors.toSet());
-  }
-
-  // Utils method for unit tests
-  @VisibleForTesting
-  boolean runProcess(List<String> auroraCmd, StringBuilder stdout, StringBuilder stderr) {
-    int status =
-        ShellUtils.runProcess(auroraCmd.toArray(new String[auroraCmd.size()]),
-            stderr != null ? stderr : new StringBuilder());
-
-    if (status != 0) {
-      LOG.severe(String.format(
-          "Failed to run process. Command=%s, STDOUT=%s, STDERR=%s", auroraCmd, stdout, stderr));
-    }
-    return status == 0;
-  }
-
-  // Utils method for unit tests
-  @VisibleForTesting
-  boolean runProcess(List<String> auroraCmd) {
-    return runProcess(auroraCmd, null, null);
-  }
-
-  private static String getInstancesIdsToKill(Set<PackingPlan.ContainerPlan> containersToRemove) {
-    StringBuilder ids = new StringBuilder();
-    for (PackingPlan.ContainerPlan containerPlan : containersToRemove) {
-      if (ids.length() > 0) {
-        ids.append(",");
-      }
-      ids.append(containerPlan.getId());
-    }
-    return ids.toString();
-  }
-
-  // Static method to append verbose and batching options if needed
-  private static void appendAuroraCommandOptions(List<String> auroraCmd, boolean isVerbose) {
-    // Append verbose if needed
-    if (isVerbose) {
-      auroraCmd.add("--verbose");
-    }
-
-    // Append batch size.
-    // Note that we can not use "--no-batching" since "restart" command does not accept it.
-    // So we play a small trick here by setting batch size Integer.MAX_VALUE.
-    auroraCmd.add("--batch-size");
-    auroraCmd.add(Integer.toString(Integer.MAX_VALUE));
-  }
-}
diff --git a/heron/schedulers/src/java/org/apache/heron/scheduler/aurora/AuroraContext.java b/heron/schedulers/src/java/org/apache/heron/scheduler/aurora/AuroraContext.java
deleted file mode 100644
index d48cf8d7580..00000000000
--- a/heron/schedulers/src/java/org/apache/heron/scheduler/aurora/AuroraContext.java
+++ /dev/null
@@ -1,53 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.heron.scheduler.aurora;
-
-import java.io.File;
-
-import org.apache.heron.spi.common.Config;
-import org.apache.heron.spi.common.Context;
-
-public final class AuroraContext extends Context {
-  public static final String JOB_LINK_TEMPLATE = "heron.scheduler.job.link.template";
-  public static final String JOB_TEMPLATE = "heron.scheduler.job.template";
-  public static final String JOB_MAX_KILL_ATTEMPTS = "heron.scheduler.job.max.kill.attempts";
-  public static final String JOB_KILL_RETRY_INTERVAL_MS =
-      "heron.scheduler.job.kill.retry.interval.ms";
-
-  private AuroraContext() {
-  }
-
-  public static String getJobLinkTemplate(Config config) {
-    return config.getStringValue(JOB_LINK_TEMPLATE);
-  }
-
-  public static String getHeronAuroraPath(Config config) {
-    return config.getStringValue(JOB_TEMPLATE,
-        new File(Context.heronConf(config), "heron.aurora").getPath());
-  }
-
-  public static int getJobMaxKillAttempts(Config config) {
-    return config.getIntegerValue(JOB_MAX_KILL_ATTEMPTS, 5);
-  }
-
-  public static long getJobKillRetryIntervalMs(Config config) {
-    return config.getLongValue(JOB_KILL_RETRY_INTERVAL_MS, 2000);
-  }
-}
diff --git a/heron/schedulers/src/java/org/apache/heron/scheduler/aurora/AuroraController.java b/heron/schedulers/src/java/org/apache/heron/scheduler/aurora/AuroraController.java
deleted file mode 100644
index a70333908a0..00000000000
--- a/heron/schedulers/src/java/org/apache/heron/scheduler/aurora/AuroraController.java
+++ /dev/null
@@ -1,44 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.heron.scheduler.aurora;
-
-import java.util.Map;
-import java.util.Set;
-
-import org.apache.heron.spi.packing.PackingPlan;
-
-/**
- * Interface that defines how a client interacts with aurora to control the job lifecycle
- */
-public interface AuroraController {
-
-  boolean createJob(Map<AuroraField, String> auroraProperties, Map<String, String> extra);
-  boolean killJob();
-
-  /**
-   * Restarts a given container, or the entire job if containerId is null
-   * @param containerId ID of container to restart, or entire job if null
-   * @return the boolean return value
-   */
-  boolean restart(Integer containerId);
-
-  void removeContainers(Set<PackingPlan.ContainerPlan> containersToRemove);
-  Set<Integer> addContainers(Integer count);
-}
diff --git a/heron/schedulers/src/java/org/apache/heron/scheduler/aurora/AuroraField.java b/heron/schedulers/src/java/org/apache/heron/scheduler/aurora/AuroraField.java
deleted file mode 100644
index cc04ec10c98..00000000000
--- a/heron/schedulers/src/java/org/apache/heron/scheduler/aurora/AuroraField.java
+++ /dev/null
@@ -1,39 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.heron.scheduler.aurora;
-
-/**
- * Field names passed to aurora controllers during job creation
- */
-public enum AuroraField {
-  CLUSTER,
-  CORE_PACKAGE_URI,
-  CPUS_PER_CONTAINER,
-  DISK_PER_CONTAINER,
-  ENVIRON,
-  EXECUTOR_BINARY,
-  NUM_CONTAINERS,
-  RAM_PER_CONTAINER,
-  ROLE,
-  TIER,
-  TOPOLOGY_ARGUMENTS,
-  TOPOLOGY_NAME,
-  TOPOLOGY_PACKAGE_URI
-}
diff --git a/heron/schedulers/src/java/org/apache/heron/scheduler/aurora/AuroraHeronShellController.java b/heron/schedulers/src/java/org/apache/heron/scheduler/aurora/AuroraHeronShellController.java
deleted file mode 100644
index aa6c708ca7a..00000000000
--- a/heron/schedulers/src/java/org/apache/heron/scheduler/aurora/AuroraHeronShellController.java
+++ /dev/null
@@ -1,132 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.heron.scheduler.aurora;
-
-import java.net.HttpURLConnection;
-import java.util.Map;
-import java.util.Set;
-import java.util.logging.Logger;
-
-import org.apache.heron.proto.system.PhysicalPlans.StMgr;
-import org.apache.heron.spi.common.Config;
-import org.apache.heron.spi.common.Context;
-import org.apache.heron.spi.packing.PackingPlan;
-import org.apache.heron.spi.statemgr.IStateManager;
-import org.apache.heron.spi.statemgr.SchedulerStateManagerAdaptor;
-import org.apache.heron.spi.utils.NetworkUtils;
-import org.apache.heron.spi.utils.ReflectionUtils;
-
-/**
- * Implementation of AuroraController that is a wrapper of AuroraCLIController.
- * The difference is `restart` command:
- * 1. restart whole topology: delegate to AuroraCLIController
- * 2. restart container 0: delegate to AuroraCLIController
- * 3. restart container x(x>0): call heron-shell endpoint `/killexecutor`
- * For backpressure, only containers with heron-stmgr may send out backpressure.
- * This class is to handle `restart backpressure containers inside container`,
- * while delegating to AuroraCLIController for all the other scenarios.
- */
-class AuroraHeronShellController implements AuroraController {
-  private static final Logger LOG = Logger.getLogger(AuroraHeronShellController.class.getName());
-
-  private final String topologyName;
-  private final AuroraCLIController cliController;
-  private final SchedulerStateManagerAdaptor stateMgrAdaptor;
-
-  AuroraHeronShellController(String jobName, String cluster, String role, String env,
-      String auroraFilename, boolean isVerbose, Config localConfig)
-      throws ClassNotFoundException, InstantiationException, IllegalAccessException {
-    this.topologyName = jobName;
-    this.cliController =
-        new AuroraCLIController(jobName, cluster, role, env, auroraFilename, isVerbose);
-
-    Config config = Config.toClusterMode(localConfig);
-    String stateMgrClass = Context.stateManagerClass(config);
-    IStateManager stateMgr = ReflectionUtils.newInstance(stateMgrClass);
-    stateMgr.initialize(config);
-    stateMgrAdaptor = new SchedulerStateManagerAdaptor(stateMgr, 5000);
-  }
-
-  @Override
-  public boolean createJob(Map<AuroraField, String> bindings, Map<String, String> extra) {
-    return cliController.createJob(bindings, extra);
-  }
-
-  @Override
-  public boolean killJob() {
-    return cliController.killJob();
-  }
-
-  private StMgr searchContainer(Integer id) {
-    String prefix = "stmgr-" + id;
-    for (StMgr sm : stateMgrAdaptor.getPhysicalPlan(topologyName).getStmgrsList()) {
-      if (sm.getId().equals(prefix)) {
-        return sm;
-      }
-    }
-    return null;
-  }
-
-  // Restart an aurora container
-  @Override
-  public boolean restart(Integer containerId) {
-    // there is no backpressure for container 0, delegate to aurora client
-    if (containerId == null || containerId == 0) {
-      return cliController.restart(containerId);
-    }
-
-    if (stateMgrAdaptor == null) {
-      LOG.warning("SchedulerStateManagerAdaptor not initialized");
-      return false;
-    }
-
-    StMgr sm = searchContainer(containerId);
-    if (sm == null) {
-      LOG.warning("container not found in pplan " + containerId);
-      return false;
-    }
-
-    String url = "http://" + sm.getHostName() + ":" + sm.getShellPort() + "/killexecutor";
-    String payload = "secret=" + stateMgrAdaptor.getExecutionState(topologyName).getTopologyId();
-    LOG.info("sending `kill container` to " + url + "; payload: " + payload);
-
-    HttpURLConnection con = NetworkUtils.getHttpConnection(url);
-    try {
-      if (NetworkUtils.sendHttpPostRequest(con, "X", payload.getBytes())) {
-        return NetworkUtils.checkHttpResponseCode(con, 200);
-      } else { // if heron-shell command fails, delegate to aurora client
-        LOG.info("heron-shell killexecutor failed; try aurora client ..");
-        return cliController.restart(containerId);
-      }
-    } finally {
-      con.disconnect();
-    }
-  }
-
-  @Override
-  public void removeContainers(Set<PackingPlan.ContainerPlan> containersToRemove) {
-    cliController.removeContainers(containersToRemove);
-  }
-
-  @Override
-  public Set<Integer> addContainers(Integer count) {
-    return cliController.addContainers(count);
-  }
-}
diff --git a/heron/schedulers/src/java/org/apache/heron/scheduler/aurora/AuroraLauncher.java b/heron/schedulers/src/java/org/apache/heron/scheduler/aurora/AuroraLauncher.java
deleted file mode 100644
index 023fcd98626..00000000000
--- a/heron/schedulers/src/java/org/apache/heron/scheduler/aurora/AuroraLauncher.java
+++ /dev/null
@@ -1,58 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.heron.scheduler.aurora;
-
-import org.apache.heron.scheduler.utils.LauncherUtils;
-import org.apache.heron.spi.common.Config;
-import org.apache.heron.spi.packing.PackingPlan;
-import org.apache.heron.spi.scheduler.ILauncher;
-import org.apache.heron.spi.scheduler.IScheduler;
-
-/**
- * Launch topology locally to Aurora.
- */
-
-public class AuroraLauncher implements ILauncher {
-  private Config config;
-  private Config runtime;
-
-  @Override
-  public void initialize(Config mConfig, Config mRuntime) {
-    this.config = mConfig;
-    this.runtime = mRuntime;
-  }
-
-  @Override
-  public void close() {
-
-  }
-
-  @Override
-  public boolean launch(PackingPlan packing) {
-    LauncherUtils launcherUtils = LauncherUtils.getInstance();
-    Config ytruntime = launcherUtils.createConfigWithPackingDetails(runtime, packing);
-    return launcherUtils.onScheduleAsLibrary(config, ytruntime, getScheduler(), packing);
-  }
-
-  // Get AuroraScheduler
-  protected IScheduler getScheduler() {
-    return new AuroraScheduler();
-  }
-}
diff --git a/heron/schedulers/src/java/org/apache/heron/scheduler/aurora/AuroraScheduler.java b/heron/schedulers/src/java/org/apache/heron/scheduler/aurora/AuroraScheduler.java
deleted file mode 100644
index 9c386f22a95..00000000000
--- a/heron/schedulers/src/java/org/apache/heron/scheduler/aurora/AuroraScheduler.java
+++ /dev/null
@@ -1,299 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.heron.scheduler.aurora;
-
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Map;
-import java.util.Scanner;
-import java.util.Set;
-import java.util.concurrent.ExecutionException;
-import java.util.logging.Level;
-import java.util.logging.Logger;
-
-import com.google.common.base.Optional;
-
-import org.apache.heron.api.generated.TopologyAPI;
-import org.apache.heron.api.utils.TopologyUtils;
-import org.apache.heron.api.utils.Utils;
-import org.apache.heron.proto.scheduler.Scheduler;
-import org.apache.heron.scheduler.UpdateTopologyManager;
-import org.apache.heron.scheduler.utils.Runtime;
-import org.apache.heron.scheduler.utils.SchedulerUtils;
-import org.apache.heron.spi.common.Config;
-import org.apache.heron.spi.common.Context;
-import org.apache.heron.spi.common.Key;
-import org.apache.heron.spi.common.TokenSub;
-import org.apache.heron.spi.packing.PackingPlan;
-import org.apache.heron.spi.packing.Resource;
-import org.apache.heron.spi.scheduler.IScalable;
-import org.apache.heron.spi.scheduler.IScheduler;
-
-public class AuroraScheduler implements IScheduler, IScalable {
-  private static final Logger LOG = Logger.getLogger(AuroraLauncher.class.getName());
-
-  private Config config;
-  private Config runtime;
-  private AuroraController controller;
-  private UpdateTopologyManager updateTopologyManager;
-
-  @Override
-  public void initialize(Config mConfig, Config mRuntime) {
-    this.config = Config.toClusterMode(mConfig);
-    this.runtime = mRuntime;
-    try {
-      this.controller = getController();
-    } catch (ClassNotFoundException | InstantiationException | IllegalAccessException e) {
-      LOG.severe("AuroraController initialization failed " + e.getMessage());
-    }
-    this.updateTopologyManager =
-        new UpdateTopologyManager(config, runtime, Optional.<IScalable>of(this));
-  }
-
-  /**
-   * Get an AuroraController based on the config and runtime
-   *
-   * @return AuroraController
-   */
-  protected AuroraController getController()
-      throws ClassNotFoundException, InstantiationException, IllegalAccessException {
-    Boolean cliController = config.getBooleanValue(Key.AURORA_CONTROLLER_CLASS);
-    Config localConfig = Config.toLocalMode(this.config);
-    if (cliController) {
-      return new AuroraCLIController(
-          Runtime.topologyName(runtime),
-          Context.cluster(localConfig),
-          Context.role(localConfig),
-          Context.environ(localConfig),
-          AuroraContext.getHeronAuroraPath(localConfig),
-          Context.verbose(localConfig));
-    } else {
-      return new AuroraHeronShellController(
-          Runtime.topologyName(runtime),
-          Context.cluster(localConfig),
-          Context.role(localConfig),
-          Context.environ(localConfig),
-          AuroraContext.getHeronAuroraPath(localConfig),
-          Context.verbose(localConfig),
-          localConfig);
-    }
-  }
-
-  @Override
-  public void close() {
-    if (updateTopologyManager != null) {
-      updateTopologyManager.close();
-    }
-  }
-
-  @Override
-  public boolean onSchedule(PackingPlan packing) {
-    if (packing == null || packing.getContainers().isEmpty()) {
-      LOG.severe("No container requested. Can't schedule");
-      return false;
-    }
-
-    LOG.info("Launching topology in aurora");
-
-    // Align the cpu, RAM, disk to the maximal one, and set them to ScheduledResource
-    PackingPlan updatedPackingPlan = packing.cloneWithHomogeneousScheduledResource();
-    SchedulerUtils.persistUpdatedPackingPlan(Runtime.topologyName(runtime), updatedPackingPlan,
-        Runtime.schedulerStateManagerAdaptor(runtime));
-
-    // Use the ScheduledResource to create aurora properties
-    // the ScheduledResource is guaranteed to be set after calling
-    // cloneWithHomogeneousScheduledResource in the above code
-    Resource containerResource =
-        updatedPackingPlan.getContainers().iterator().next().getScheduledResource().get();
-    Map<AuroraField, String> auroraProperties = createAuroraProperties(containerResource);
-    Map<String, String> extraProperties = createExtraProperties(containerResource);
-
-    return controller.createJob(auroraProperties, extraProperties);
-  }
-
-  @Override
-  public List<String> getJobLinks() {
-    List<String> jobLinks = new ArrayList<>();
-
-    //Only the aurora job page is returned
-    String jobLinkFormat = AuroraContext.getJobLinkTemplate(config);
-    if (jobLinkFormat != null && !jobLinkFormat.isEmpty()) {
-      String jobLink = TokenSub.substitute(config, jobLinkFormat);
-      jobLinks.add(jobLink);
-    }
-
-    return jobLinks;
-  }
-
-  @Override
-  public boolean onKill(Scheduler.KillTopologyRequest request) {
-    // The aurora service can be unavailable or unstable for a while,
-    // we will try to kill the job with multiple attempts
-    int attempts = AuroraContext.getJobMaxKillAttempts(config);
-    long retryIntervalMs = AuroraContext.getJobKillRetryIntervalMs(config);
-    LOG.info("Will try " + attempts + " attempts at interval: " + retryIntervalMs + " ms");
-
-    // First attempt
-    boolean res = controller.killJob();
-    attempts--;
-
-    // Failure retry
-    while (!res && attempts > 0) {
-      LOG.warning("Failed to kill the topology. Will retry in " + retryIntervalMs + " ms...");
-      Utils.sleep(retryIntervalMs);
-
-      // Retry the killJob()
-      res = controller.killJob();
-      attempts--;
-    }
-
-    return res;
-  }
-
-  @Override
-  public boolean onRestart(Scheduler.RestartTopologyRequest request) {
-    Integer containerId = null;
-    if (request.getContainerIndex() != -1) {
-      containerId = request.getContainerIndex();
-    }
-    return controller.restart(containerId);
-  }
-
-  @Override
-  public boolean onUpdate(Scheduler.UpdateTopologyRequest request) {
-    try {
-      updateTopologyManager.updateTopology(
-          request.getCurrentPackingPlan(), request.getProposedPackingPlan());
-    } catch (ExecutionException | InterruptedException e) {
-      LOG.log(Level.SEVERE, "Could not update topology for request: " + request, e);
-      return false;
-    }
-    return true;
-  }
-
-  private static final String CONFIRMED_YES = "y";
-  boolean hasConfirmedWithUser(int newContainerCount) {
-    LOG.info(String.format("After update there will be %d more containers. "
-        + "Please make sure there are sufficient resources to update this job. "
-        + "Continue update? [y/N]: ", newContainerCount));
-    Scanner scanner = new Scanner(System.in);
-    String userInput = scanner.nextLine();
-    return CONFIRMED_YES.equalsIgnoreCase(userInput);
-  }
-
-  @Override
-  public Set<PackingPlan.ContainerPlan> addContainers(
-      Set<PackingPlan.ContainerPlan> containersToAdd) {
-    Set<PackingPlan.ContainerPlan> remapping = new HashSet<>();
-    if ("prompt".equalsIgnoreCase(Context.updatePrompt(config))
-        && !hasConfirmedWithUser(containersToAdd.size())) {
-      LOG.warning("Scheduler updated topology canceled.");
-      return remapping;
-    }
-
-    // Do the actual containers adding
-    LinkedList<Integer> newAddedContainerIds = new LinkedList<>(
-        controller.addContainers(containersToAdd.size()));
-    if (newAddedContainerIds.size() != containersToAdd.size()) {
-      throw new RuntimeException(
-          "Aurora returned different container count " + newAddedContainerIds.size()
-          + "; input count was " + containersToAdd.size());
-    }
-    // Do the remapping:
-    // use the `newAddedContainerIds` to replace the container id in the `containersToAdd`
-    for (PackingPlan.ContainerPlan cp : containersToAdd) {
-      PackingPlan.ContainerPlan newContainerPlan =
-          new PackingPlan.ContainerPlan(
-              newAddedContainerIds.pop(), cp.getInstances(),
-              cp.getRequiredResource(), cp.getScheduledResource().orNull());
-      remapping.add(newContainerPlan);
-    }
-    LOG.info("The remapping structure: " + remapping);
-    return remapping;
-  }
-
-  @Override
-  public void removeContainers(Set<PackingPlan.ContainerPlan> containersToRemove) {
-    controller.removeContainers(containersToRemove);
-  }
-
-  protected Map<AuroraField, String> createAuroraProperties(Resource containerResource) {
-    Map<AuroraField, String> auroraProperties = new HashMap<>();
-
-    TopologyAPI.Topology topology = Runtime.topology(runtime);
-
-    auroraProperties.put(AuroraField.EXECUTOR_BINARY,
-        Context.executorBinary(config));
-
-    List<String> topologyArgs = new ArrayList<>();
-    SchedulerUtils.addExecutorTopologyArgs(topologyArgs, config, runtime);
-    String args = String.join(" ", topologyArgs);
-    auroraProperties.put(AuroraField.TOPOLOGY_ARGUMENTS, args);
-
-    auroraProperties.put(AuroraField.CLUSTER, Context.cluster(config));
-    auroraProperties.put(AuroraField.ENVIRON, Context.environ(config));
-    auroraProperties.put(AuroraField.ROLE, Context.role(config));
-    auroraProperties.put(AuroraField.TOPOLOGY_NAME, topology.getName());
-
-    auroraProperties.put(AuroraField.CPUS_PER_CONTAINER,
-        Double.toString(containerResource.getCpu()));
-    auroraProperties.put(AuroraField.DISK_PER_CONTAINER,
-        Long.toString(containerResource.getDisk().asBytes()));
-    auroraProperties.put(AuroraField.RAM_PER_CONTAINER,
-        Long.toString(containerResource.getRam().asBytes()));
-
-    auroraProperties.put(AuroraField.NUM_CONTAINERS,
-        Integer.toString(1 + TopologyUtils.getNumContainers(topology)));
-
-    // Job configuration attribute 'production' is deprecated.
-    // Use 'tier' attribute instead
-    // See: http://aurora.apache.org/documentation/latest/reference/configuration/#job-objects
-    if ("prod".equals(Context.environ(config))) {
-      auroraProperties.put(AuroraField.TIER, "preferred");
-    } else {
-      auroraProperties.put(AuroraField.TIER, "preemptible");
-    }
-
-    String heronCoreReleasePkgURI = Context.corePackageUri(config);
-    String topologyPkgURI = Runtime.topologyPackageUri(runtime).toString();
-
-    auroraProperties.put(AuroraField.CORE_PACKAGE_URI, heronCoreReleasePkgURI);
-    auroraProperties.put(AuroraField.TOPOLOGY_PACKAGE_URI, topologyPkgURI);
-
-    return auroraProperties;
-  }
-
-  protected Map<String, String> createExtraProperties(Resource containerResource) {
-    Map<String, String> extraProperties = new HashMap<>();
-
-    if (config.containsKey(Key.SCHEDULER_PROPERTIES)) {
-      String[] meta = config.getStringValue(Key.SCHEDULER_PROPERTIES).split(",");
-      extraProperties.put(AuroraContext.JOB_TEMPLATE, meta[0]);
-      for (int idx = 1; idx < meta.length; idx++) {
-        extraProperties.put("AURORA_METADATA_" + idx, meta[idx]);
-      }
-    }
-
-    return extraProperties;
-  }
-}
diff --git a/heron/schedulers/tests/java/BUILD b/heron/schedulers/tests/java/BUILD
index 3ad9ffedf36..f3122052f0c 100644
--- a/heron/schedulers/tests/java/BUILD
+++ b/heron/schedulers/tests/java/BUILD
@@ -25,9 +25,6 @@ scheduler_deps_files = \
     common_deps_files + \
     spi_deps_files
 
-aurora_deps_files = [
-    "//heron/schedulers/src/java:aurora-scheduler-java",
-]
 
 yarn_deps_files = [
     "//heron/packing/src/java:roundrobin-packing",
@@ -89,22 +86,6 @@ nomad_deps_files = \
         "//heron/schedulers/src/java:scheduler-utils-java",
     ]
 
-java_library(
-    name = "aurora-tests",
-    srcs = glob(["**/aurora/*.java"]),
-    deps = scheduler_deps_files + aurora_deps_files + ["@maven//:commons_cli_commons_cli"],
-)
-
-java_tests(
-    size = "small",
-    test_classes = [
-        "org.apache.heron.scheduler.aurora.AuroraSchedulerTest",
-        "org.apache.heron.scheduler.aurora.AuroraLauncherTest",
-        "org.apache.heron.scheduler.aurora.AuroraCLIControllerTest",
-        "org.apache.heron.scheduler.aurora.AuroraContextTest",
-    ],
-    runtime_deps = [":aurora-tests"],
-)
 
 java_library(
     name = "yarn-tests",
diff --git a/heron/schedulers/tests/java/org/apache/heron/scheduler/aurora/AuroraCLIControllerTest.java b/heron/schedulers/tests/java/org/apache/heron/scheduler/aurora/AuroraCLIControllerTest.java
deleted file mode 100644
index 4fad3998915..00000000000
--- a/heron/schedulers/tests/java/org/apache/heron/scheduler/aurora/AuroraCLIControllerTest.java
+++ /dev/null
@@ -1,200 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.heron.scheduler.aurora;
-
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Comparator;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.SortedSet;
-import java.util.TreeSet;
-
-import org.junit.After;
-import org.junit.AfterClass;
-import org.junit.Assert;
-import org.junit.Before;
-import org.junit.BeforeClass;
-import org.junit.Test;
-import org.mockito.Matchers;
-import org.mockito.Mockito;
-import org.mockito.invocation.InvocationOnMock;
-import org.mockito.stubbing.Answer;
-
-import org.apache.heron.spi.packing.PackingPlan;
-import org.apache.heron.spi.utils.PackingTestUtils;
-
-public class AuroraCLIControllerTest {
-  private static final String JOB_NAME = "jobName";
-  private static final String CLUSTER = "cluster";
-  private static final String ROLE = "role";
-  private static final String ENV = "gz";
-  private static final String AURORA_FILENAME = "file.aurora";
-  private static final String VERBOSE_CONFIG = "--verbose";
-  private static final String BATCH_CONFIG = "--batch-size";
-  private static final String JOB_SPEC = String.format("%s/%s/%s/%s", CLUSTER, ROLE, ENV, JOB_NAME);
-  private static final boolean IS_VERBOSE = true;
-
-  private AuroraCLIController controller;
-
-  @BeforeClass
-  public static void beforeClass() throws Exception {
-
-  }
-
-  @AfterClass
-  public static void afterClass() throws Exception {
-  }
-
-  @Before
-  public void setUp() throws Exception {
-    controller = Mockito.spy(
-        new AuroraCLIController(JOB_NAME, CLUSTER, ROLE, ENV, AURORA_FILENAME, IS_VERBOSE));
-  }
-
-  @After
-  public void after() throws Exception {
-  }
-
-  @Test
-  public void testCreateJob() throws Exception {
-    Map<AuroraField, String> bindings = new HashMap<>();
-    Map<String, String> bindings2 = new HashMap<>();
-    List<String> expectedCommand = asList("aurora job create --wait-until RUNNING %s %s %s",
-        JOB_SPEC, AURORA_FILENAME, VERBOSE_CONFIG);
-
-    // Failed
-    Mockito.doReturn(false).when(controller).runProcess(Matchers.anyListOf(String.class));
-    Assert.assertFalse(controller.createJob(bindings, bindings2));
-    Mockito.verify(controller).runProcess(Mockito.eq(expectedCommand));
-
-    // Happy path
-    Mockito.doReturn(true).when(controller).runProcess(Matchers.anyListOf(String.class));
-    Assert.assertTrue(controller.createJob(bindings, bindings2));
-    Mockito.verify(controller, Mockito.times(2)).runProcess(expectedCommand);
-  }
-
-  @Test
-  public void testKillJob() throws Exception {
-    List<String> expectedCommand = asList("aurora job killall %s %s %s %d",
-        JOB_SPEC, VERBOSE_CONFIG, BATCH_CONFIG, Integer.MAX_VALUE);
-
-    // Failed
-    Mockito.doReturn(false).when(controller).runProcess(Matchers.anyListOf(String.class));
-    Assert.assertFalse(controller.killJob());
-    Mockito.verify(controller).runProcess(Mockito.eq(expectedCommand));
-
-    // Happy path
-    Mockito.doReturn(true).when(controller).runProcess(Matchers.anyListOf(String.class));
-    Assert.assertTrue(controller.killJob());
-    Mockito.verify(controller, Mockito.times(2)).runProcess(expectedCommand);
-  }
-
-  @Test
-  public void testRestartJob() throws Exception {
-    int containerId = 1;
-    List<String> expectedCommand = asList("aurora job restart %s/%s %s %s %d",
-        JOB_SPEC, containerId, VERBOSE_CONFIG, BATCH_CONFIG, Integer.MAX_VALUE);
-
-    // Failed
-    Mockito.doReturn(false).when(controller).runProcess(Matchers.anyListOf(String.class));
-    Assert.assertFalse(controller.restart(containerId));
-    Mockito.verify(controller).runProcess(Mockito.eq(expectedCommand));
-
-    // Happy path
-    Mockito.doReturn(true).when(controller).runProcess(Matchers.anyListOf(String.class));
-    Assert.assertTrue(controller.restart(containerId));
-    Mockito.verify(controller, Mockito.times(2)).runProcess(expectedCommand);
-  }
-
-  @Test
-  public void testRemoveContainers() {
-    class ContainerPlanComparator implements Comparator<PackingPlan.ContainerPlan> {
-      @Override
-      public int compare(PackingPlan.ContainerPlan o1, PackingPlan.ContainerPlan o2) {
-        return ((Integer) o1.getId()).compareTo(o2.getId());
-      }
-    }
-    SortedSet<PackingPlan.ContainerPlan> containers = new TreeSet<>(new ContainerPlanComparator());
-    containers.add(PackingTestUtils.testContainerPlan(3));
-    containers.add(PackingTestUtils.testContainerPlan(5));
-
-    List<String> expectedCommand = asList("aurora job kill %s/3,5 %s %s %d",
-        JOB_SPEC, VERBOSE_CONFIG, BATCH_CONFIG, Integer.MAX_VALUE);
-
-    Mockito.doReturn(true).when(controller).runProcess(Matchers.anyListOf(String.class));
-    controller.removeContainers(containers);
-    Mockito.verify(controller).runProcess(Mockito.eq(expectedCommand));
-  }
-
-  @Test
-  public void testAddContainers() {
-    Integer containersToAdd = 3;
-    List<String> expectedCommand = asList(
-        "aurora job add --wait-until RUNNING %s/0 %s %s",
-        JOB_SPEC, containersToAdd.toString(), VERBOSE_CONFIG);
-
-    Mockito.doAnswer(new Answer<Boolean>() {
-      @Override
-      public Boolean answer(InvocationOnMock arg0) throws Throwable {
-        final StringBuilder originalArgument = (StringBuilder) (arg0.getArguments())[2];
-        originalArgument.append("Querying instance statuses: [1, 2, 3]");
-        return true;
-      }
-    }).when(controller).runProcess(
-                    Matchers.anyListOf(String.class),
-                    Matchers.any(StringBuilder.class),
-                    Matchers.any(StringBuilder.class));
-    Set<Integer> ret = controller.addContainers(containersToAdd);
-    Assert.assertEquals(containersToAdd.intValue(), ret.size());
-    Mockito.verify(controller)
-        .runProcess(Matchers.eq(expectedCommand), Matchers.any(), Matchers.any());
-  }
-
-  @Test
-  public void testAddContainersFailure() {
-    Integer containersToAdd = 3;
-    List<String> expectedCommand = asList(
-        "aurora job add --wait-until RUNNING %s/0 %s %s",
-        JOB_SPEC, containersToAdd.toString(), VERBOSE_CONFIG);
-
-    Mockito.doAnswer(new Answer<Boolean>() {
-      @Override
-      public Boolean answer(InvocationOnMock arg0) throws Throwable {
-        final StringBuilder originalArgument = (StringBuilder) (arg0.getArguments())[2];
-        originalArgument.append("Querying instance statuses: x");
-        return true;
-      }
-    }).when(controller).runProcess(
-                    Matchers.anyListOf(String.class),
-                    Matchers.any(StringBuilder.class),
-                    Matchers.any(StringBuilder.class));
-    Set<Integer> ret = controller.addContainers(containersToAdd);
-    Assert.assertEquals(0, ret.size());
-    Mockito.verify(controller)
-        .runProcess(Matchers.eq(expectedCommand), Matchers.any(), Matchers.any());
-  }
-
-  private static List<String> asList(String command, Object... values) {
-    return new ArrayList<>(Arrays.asList(String.format(command, values).split(" ")));
-  }
-}
diff --git a/heron/schedulers/tests/java/org/apache/heron/scheduler/aurora/AuroraContextTest.java b/heron/schedulers/tests/java/org/apache/heron/scheduler/aurora/AuroraContextTest.java
deleted file mode 100644
index 4f4017f41a7..00000000000
--- a/heron/schedulers/tests/java/org/apache/heron/scheduler/aurora/AuroraContextTest.java
+++ /dev/null
@@ -1,49 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.heron.scheduler.aurora;
-
-import org.junit.Assert;
-import org.junit.Test;
-
-import org.apache.heron.spi.common.Config;
-import org.apache.heron.spi.common.Key;
-
-public class AuroraContextTest {
-
-  @Test
-  public void testUsesConfigString() {
-    final String auroraTemplate = "/dir/test.aurora";
-    Config config = Config.newBuilder()
-        .put(AuroraContext.JOB_TEMPLATE, auroraTemplate)
-        .put(Key.HERON_CONF, "/test")
-        .build();
-    Assert.assertEquals("Expected to use value from JOB_TEMPLATE config",
-        auroraTemplate, AuroraContext.getHeronAuroraPath(config));
-  }
-
-  @Test
-  public void testFallback() {
-    Config config = Config.newBuilder()
-        .put(Key.HERON_CONF, "/test")
-        .build();
-    Assert.assertEquals("Expected to use heron_conf/heron.aurora", "/test/heron.aurora",
-        AuroraContext.getHeronAuroraPath(config));
-  }
-}
diff --git a/heron/schedulers/tests/java/org/apache/heron/scheduler/aurora/AuroraLauncherTest.java b/heron/schedulers/tests/java/org/apache/heron/scheduler/aurora/AuroraLauncherTest.java
deleted file mode 100644
index 76e94a5d9e9..00000000000
--- a/heron/schedulers/tests/java/org/apache/heron/scheduler/aurora/AuroraLauncherTest.java
+++ /dev/null
@@ -1,80 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.heron.scheduler.aurora;
-
-import org.junit.Assert;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.mockito.Mockito;
-import org.powermock.api.mockito.PowerMockito;
-import org.powermock.core.classloader.annotations.PowerMockIgnore;
-import org.powermock.core.classloader.annotations.PrepareForTest;
-import org.powermock.modules.junit4.PowerMockRunner;
-
-import org.apache.heron.scheduler.utils.LauncherUtils;
-import org.apache.heron.spi.common.Config;
-import org.apache.heron.spi.packing.PackingPlan;
-import org.apache.heron.spi.scheduler.IScheduler;
-
-@RunWith(PowerMockRunner.class)
-@PowerMockIgnore("jdk.internal.reflect.*")
-@PrepareForTest(LauncherUtils.class)
-public class AuroraLauncherTest {
-  @Test
-  public void testLaunch() throws Exception {
-    Config config = Config.newBuilder().build();
-    AuroraLauncher launcher = Mockito.spy(AuroraLauncher.class);
-    launcher.initialize(config, config);
-
-    LauncherUtils mockLauncherUtils = Mockito.mock(LauncherUtils.class);
-    PowerMockito.spy(LauncherUtils.class);
-    PowerMockito.doReturn(mockLauncherUtils).when(LauncherUtils.class, "getInstance");
-
-    // Failed to schedule
-    Mockito.when(mockLauncherUtils.onScheduleAsLibrary(
-        Mockito.any(Config.class),
-        Mockito.any(Config.class),
-        Mockito.any(IScheduler.class),
-        Mockito.any(PackingPlan.class))).thenReturn(false);
-
-    Assert.assertFalse(launcher.launch(Mockito.mock(PackingPlan.class)));
-    Mockito.verify(mockLauncherUtils).onScheduleAsLibrary(
-        Mockito.any(Config.class),
-        Mockito.any(Config.class),
-        Mockito.any(IScheduler.class),
-        Mockito.any(PackingPlan.class));
-
-    // Happy path
-    Mockito.when(mockLauncherUtils.onScheduleAsLibrary(
-        Mockito.any(Config.class),
-        Mockito.any(Config.class),
-        Mockito.any(IScheduler.class),
-        Mockito.any(PackingPlan.class))).thenReturn(true);
-
-    Assert.assertTrue(launcher.launch(Mockito.mock(PackingPlan.class)));
-    Mockito.verify(mockLauncherUtils, Mockito.times(2)).onScheduleAsLibrary(
-        Mockito.any(Config.class),
-        Mockito.any(Config.class),
-        Mockito.any(IScheduler.class),
-        Mockito.any(PackingPlan.class));
-
-    launcher.close();
-  }
-}
diff --git a/heron/schedulers/tests/java/org/apache/heron/scheduler/aurora/AuroraSchedulerTest.java b/heron/schedulers/tests/java/org/apache/heron/scheduler/aurora/AuroraSchedulerTest.java
deleted file mode 100644
index a73a7e0676d..00000000000
--- a/heron/schedulers/tests/java/org/apache/heron/scheduler/aurora/AuroraSchedulerTest.java
+++ /dev/null
@@ -1,372 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.heron.scheduler.aurora;
-
-import java.net.URI;
-import java.net.URISyntaxException;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-
-import org.junit.After;
-import org.junit.AfterClass;
-import org.junit.Assert;
-import org.junit.Before;
-import org.junit.BeforeClass;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.mockito.Matchers;
-import org.mockito.Mockito;
-import org.powermock.api.mockito.PowerMockito;
-import org.powermock.core.classloader.annotations.PowerMockIgnore;
-import org.powermock.core.classloader.annotations.PrepareForTest;
-import org.powermock.modules.junit4.PowerMockRunner;
-
-import org.apache.commons.cli.CommandLine;
-import org.apache.heron.api.generated.TopologyAPI;
-import org.apache.heron.common.basics.ByteAmount;
-import org.apache.heron.common.utils.topology.TopologyTests;
-import org.apache.heron.proto.scheduler.Scheduler;
-import org.apache.heron.proto.system.PackingPlans;
-import org.apache.heron.scheduler.SubmitterMain;
-import org.apache.heron.spi.common.Config;
-import org.apache.heron.spi.common.Key;
-import org.apache.heron.spi.common.TokenSub;
-import org.apache.heron.spi.packing.PackingPlan;
-import org.apache.heron.spi.packing.Resource;
-import org.apache.heron.spi.statemgr.SchedulerStateManagerAdaptor;
-import org.apache.heron.spi.utils.PackingTestUtils;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
-import static org.mockito.Matchers.anyString;
-import static org.mockito.Mockito.any;
-import static org.mockito.Mockito.doReturn;
-import static org.mockito.Mockito.eq;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
-
-@RunWith(PowerMockRunner.class)
-@PowerMockIgnore("jdk.internal.reflect.*")
-@PrepareForTest({TokenSub.class, Config.class})
-public class AuroraSchedulerTest {
-  private static final String AURORA_PATH = "path.aurora";
-  private static final String PACKING_PLAN_ID = "packing.plan.id";
-  private static final String TOPOLOGY_NAME = "topologyName";
-  private static final int CONTAINER_ID = 7;
-
-  private static AuroraScheduler scheduler;
-
-  @Before
-  public void setUp() throws Exception {
-  }
-
-  @After
-  public void after() throws Exception {
-  }
-
-  @BeforeClass
-  public static void beforeClass() throws Exception {
-    scheduler = Mockito.spy(AuroraScheduler.class);
-    doReturn(new HashMap<String, String>())
-        .when(scheduler).createAuroraProperties(Mockito.any(Resource.class));
-  }
-
-  @AfterClass
-  public static void afterClass() throws Exception {
-    scheduler.close();
-  }
-
-  /**
-   * Tests that we can schedule
-   */
-  @Test
-  public void testOnSchedule() throws Exception {
-    AuroraController controller = Mockito.mock(AuroraController.class);
-    doReturn(controller).when(scheduler).getController();
-
-    SchedulerStateManagerAdaptor stateManager = mock(SchedulerStateManagerAdaptor.class);
-    Config runtime = Mockito.mock(Config.class);
-    when(runtime.get(Key.SCHEDULER_STATE_MANAGER_ADAPTOR)).thenReturn(stateManager);
-    when(runtime.getStringValue(Key.TOPOLOGY_NAME)).thenReturn(TOPOLOGY_NAME);
-
-    Config mConfig = Mockito.mock(Config.class);
-    PowerMockito.mockStatic(Config.class);
-    when(Config.toClusterMode(mConfig)).thenReturn(mConfig);
-
-    when(mConfig.getStringValue(eq(AuroraContext.JOB_TEMPLATE),
-        anyString())).thenReturn(AURORA_PATH);
-
-    scheduler.initialize(mConfig, runtime);
-
-    // Fail to schedule due to null PackingPlan
-    Assert.assertFalse(scheduler.onSchedule(null));
-
-    PackingPlan plan = new PackingPlan(PACKING_PLAN_ID, new HashSet<PackingPlan.ContainerPlan>());
-    assertTrue(plan.getContainers().isEmpty());
-
-    // Fail to schedule due to PackingPlan is empty
-    Assert.assertFalse(scheduler.onSchedule(plan));
-
-    // Construct valid PackingPlan
-    Set<PackingPlan.ContainerPlan> containers = new HashSet<>();
-    containers.add(PackingTestUtils.testContainerPlan(CONTAINER_ID));
-    PackingPlan validPlan = new PackingPlan(PACKING_PLAN_ID, containers);
-
-    // Failed to create job via controller
-    doReturn(false).when(controller)
-        .createJob(Matchers.anyMapOf(AuroraField.class, String.class),
-            Matchers.anyMapOf(String.class, String.class));
-    doReturn(true).when(stateManager)
-        .updatePackingPlan(any(PackingPlans.PackingPlan.class), eq(TOPOLOGY_NAME));
-
-    Assert.assertFalse(scheduler.onSchedule(validPlan));
-
-    Mockito.verify(controller)
-        .createJob(Matchers.anyMapOf(AuroraField.class, String.class),
-            Matchers.anyMapOf(String.class, String.class));
-    Mockito.verify(stateManager)
-        .updatePackingPlan(any(PackingPlans.PackingPlan.class), eq(TOPOLOGY_NAME));
-
-    // Happy path
-    doReturn(true).when(controller)
-        .createJob(Matchers.anyMapOf(AuroraField.class, String.class),
-            Matchers.anyMapOf(String.class, String.class));
-    assertTrue(scheduler.onSchedule(validPlan));
-
-    Mockito.verify(controller, Mockito.times(2))
-        .createJob(Matchers.anyMapOf(AuroraField.class, String.class),
-            Matchers.anyMapOf(String.class, String.class));
-    Mockito.verify(stateManager, Mockito.times(2))
-        .updatePackingPlan(any(PackingPlans.PackingPlan.class), eq(TOPOLOGY_NAME));
-  }
-
-  @Test
-  public void testOnKill() throws Exception {
-    Config mockConfig = Mockito.mock(Config.class);
-    PowerMockito.mockStatic(Config.class);
-    when(Config.toClusterMode(mockConfig)).thenReturn(mockConfig);
-
-    AuroraController controller = Mockito.mock(AuroraController.class);
-    doReturn(controller).when(scheduler).getController();
-    scheduler.initialize(mockConfig, Mockito.mock(Config.class));
-
-    // Failed to kill job via controller
-    doReturn(false).when(controller).killJob();
-    Assert.assertFalse(scheduler.onKill(Scheduler.KillTopologyRequest.getDefaultInstance()));
-    Mockito.verify(controller).killJob();
-
-    // Happy path
-    doReturn(true).when(controller).killJob();
-    assertTrue(scheduler.onKill(Scheduler.KillTopologyRequest.getDefaultInstance()));
-    Mockito.verify(controller, Mockito.times(2)).killJob();
-  }
-
-  @Test
-  public void testOnRestart() throws Exception {
-    Config mockConfig = Mockito.mock(Config.class);
-    PowerMockito.mockStatic(Config.class);
-    when(Config.toClusterMode(mockConfig)).thenReturn(mockConfig);
-
-    AuroraController controller = Mockito.mock(AuroraController.class);
-    doReturn(controller).when(scheduler).getController();
-    scheduler.initialize(mockConfig, Mockito.mock(Config.class));
-
-    // Construct the RestartTopologyRequest
-    int containerToRestart = 1;
-    Scheduler.RestartTopologyRequest restartTopologyRequest =
-        Scheduler.RestartTopologyRequest.newBuilder().
-            setTopologyName(TOPOLOGY_NAME).setContainerIndex(containerToRestart).
-            build();
-
-    // Failed to kill job via controller
-    doReturn(false).when(
-        controller).restart(containerToRestart);
-    Assert.assertFalse(scheduler.onRestart(restartTopologyRequest));
-    Mockito.verify(controller).restart(containerToRestart);
-
-    // Happy path
-    doReturn(true).when(
-        controller).restart(containerToRestart);
-    assertTrue(scheduler.onRestart(restartTopologyRequest));
-    Mockito.verify(controller, Mockito.times(2)).restart(containerToRestart);
-  }
-
-  @Test
-  public void testGetJobLinks() throws Exception {
-    final String JOB_LINK_FORMAT = "http://go/${CLUSTER}/${ROLE}/${ENVIRON}/${TOPOLOGY}";
-    final String SUBSTITUTED_JOB_LINK = "http://go/local/heron/test/test_topology";
-
-    Config mockConfig = Mockito.mock(Config.class);
-    when(mockConfig.getStringValue(AuroraContext.JOB_LINK_TEMPLATE))
-        .thenReturn(JOB_LINK_FORMAT);
-
-    PowerMockito.mockStatic(Config.class);
-    when(Config.toClusterMode(mockConfig)).thenReturn(mockConfig);
-
-    AuroraController controller = Mockito.mock(AuroraController.class);
-    doReturn(controller).when(scheduler).getController();
-    scheduler.initialize(mockConfig, Mockito.mock(Config.class));
-
-    PowerMockito.spy(TokenSub.class);
-    PowerMockito.doReturn(SUBSTITUTED_JOB_LINK)
-        .when(TokenSub.class, "substitute", mockConfig, JOB_LINK_FORMAT);
-
-    List<String> result = scheduler.getJobLinks();
-
-    assertEquals(1, result.size());
-    assertTrue(result.get(0).equals(SUBSTITUTED_JOB_LINK));
-  }
-
-
-  @Test
-  public void testProperties() throws URISyntaxException {
-    TopologyAPI.Topology topology = TopologyTests.createTopology(
-        TOPOLOGY_NAME, new org.apache.heron.api.Config(),
-        "spoutName", "boltName", 1, 1);
-
-    Config runtime = mock(Config.class);
-    when(runtime.get(Key.TOPOLOGY_DEFINITION)).thenReturn(topology);
-    when(runtime.get(Key.TOPOLOGY_PACKAGE_URI)).thenReturn(new URI("http://foo/bar"));
-
-    // This must mimic how SubmitterMain loads configs
-    CommandLine commandLine = mock(CommandLine.class);
-    when(commandLine.getOptionValue("cluster")).thenReturn("some_cluster");
-    when(commandLine.getOptionValue("role")).thenReturn("some_role");
-    when(commandLine.getOptionValue("environment")).thenReturn("some_env");
-    when(commandLine.getOptionValue("heron_home")).thenReturn("/some/heron/home");
-    when(commandLine.getOptionValue("config_path")).thenReturn("/some/config/path");
-    when(commandLine.getOptionValue("topology_package")).thenReturn("jar");
-    when(commandLine.getOptionValue("topology_defn")).thenReturn("/mock/defnFile.defn");
-    when(commandLine.getOptionValue("topology_bin")).thenReturn("binaryFile.jar");
-    Config config = Mockito.spy(SubmitterMain.loadConfig(commandLine, topology));
-
-    AuroraScheduler testScheduler = new AuroraScheduler();
-    testScheduler.initialize(config, runtime);
-    Resource containerResource =
-        new Resource(2.3, ByteAmount.fromGigabytes(2), ByteAmount.fromGigabytes(3));
-    Map<AuroraField, String> properties = testScheduler.createAuroraProperties(containerResource);
-
-    // this part is key, the conf path in the config is absolute to the install dir, but what
-    // aurora properties get below is the relative ./heron-conf path to be used when run remotely
-    assertEquals("Invalid value for key " + Key.HERON_CONF,
-        "/some/config/path", config.getStringValue(Key.HERON_CONF));
-
-    String expectedConf = "./heron-conf";
-    String expectedBin = "./heron-core/bin";
-    String expectedLib = "./heron-core/lib";
-    String expectedDist = "./heron-core/dist";
-    for (AuroraField field : AuroraField.values()) {
-      boolean asserted = false;
-      Object expected = null;
-      Object found = properties.get(field);
-      switch (field) {
-        case CORE_PACKAGE_URI:
-          expected = expectedDist + "/heron-core.tar.gz";
-          break;
-        case CPUS_PER_CONTAINER:
-          expected = Double.valueOf(containerResource.getCpu()).toString();
-          break;
-        case DISK_PER_CONTAINER:
-          expected = Long.valueOf(containerResource.getDisk().asBytes()).toString();
-          break;
-        case RAM_PER_CONTAINER:
-          expected = Long.valueOf(containerResource.getRam().asBytes()).toString();
-          break;
-        case TIER:
-          expected = "preemptible";
-          break;
-        case NUM_CONTAINERS:
-          expected = "2";
-          break;
-        case EXECUTOR_BINARY:
-          expected = expectedBin + "/heron-executor";
-          break;
-        case TOPOLOGY_PACKAGE_URI:
-          expected = "http://foo/bar";
-          break;
-        case TOPOLOGY_ARGUMENTS:
-          expected = "--topology-name=topologyName"
-            + " --topology-id=" + topology.getId()
-            + " --topology-defn-file=defnFile.defn"
-            + " --state-manager-connection=null"
-            + " --state-manager-root=null"
-            + " --state-manager-config-file=" + expectedConf + "/statemgr.yaml"
-            + " --tmanager-binary=" + expectedBin + "/heron-tmanager"
-            + " --stmgr-binary=" + expectedBin + "/heron-stmgr"
-            + " --metrics-manager-classpath=" + expectedLib + "/metricsmgr/*"
-            + " --instance-jvm-opts=\"\""
-            + " --classpath=binaryFile.jar"
-            + " --heron-internals-config-file=" + expectedConf + "/heron_internals.yaml"
-            + " --override-config-file=" + expectedConf + "/override.yaml"
-            + " --component-ram-map=null"
-            + " --component-jvm-opts=\"\""
-            + " --pkg-type=jar"
-            + " --topology-binary-file=binaryFile.jar"
-            + " --heron-java-home=/usr/lib/jvm/default-java"
-            + " --heron-shell-binary=" + expectedBin + "/heron-shell"
-            + " --cluster=some_cluster"
-            + " --role=some_role"
-            + " --environment=some_env"
-            + " --instance-classpath=" + expectedLib + "/instance/*"
-            + " --metrics-sinks-config-file=" + expectedConf + "/metrics_sinks.yaml"
-            + " --scheduler-classpath=" + expectedLib + "/scheduler/*:./heron-core"
-            + "/lib/packing/*:" + expectedLib + "/statemgr/*"
-            + " --python-instance-binary=" + expectedBin + "/heron-python-instance"
-            + " --cpp-instance-binary=" + expectedBin + "/heron-cpp-instance"
-            + " --metricscache-manager-classpath=" + expectedLib + "/metricscachemgr/*"
-            + " --metricscache-manager-mode=disabled"
-            + " --is-stateful=false"
-            + " --checkpoint-manager-classpath=" + expectedLib + "/ckptmgr/*:"
-            + expectedLib + "/statefulstorage/*:"
-            + " --stateful-config-file=" + expectedConf + "/stateful.yaml"
-            + " --checkpoint-manager-ram=1073741824"
-            + " --health-manager-mode=disabled"
-            + " --health-manager-classpath=" + expectedLib + "/healthmgr/*";
-          break;
-        case CLUSTER:
-          expected = "some_cluster";
-          break;
-        case ENVIRON:
-          expected = "some_env";
-          break;
-        case ROLE:
-          expected = "some_role";
-          break;
-        case TOPOLOGY_NAME:
-          expected = "topologyName";
-          break;
-        default:
-          fail(String.format(
-              "Expected value for Aurora field %s not found in test (found=%s)", field, found));
-      }
-      if (!asserted) {
-        assertEquals("Incorrect value found for field " + field, expected, found);
-      }
-      properties.remove(field);
-    }
-
-    assertTrue("The following aurora fields were not set by the scheduler: " + properties,
-        properties.isEmpty());
-  }
-}