You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@asterixdb.apache.org by im...@apache.org on 2017/11/30 02:01:05 UTC
[4/5] asterixdb git commit: [NO ISSUE] Remove asterix-yarn
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/6b765f34/asterixdb/asterix-yarn/src/main/java/org/apache/asterix/aoya/AsterixApplicationMaster.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-yarn/src/main/java/org/apache/asterix/aoya/AsterixApplicationMaster.java b/asterixdb/asterix-yarn/src/main/java/org/apache/asterix/aoya/AsterixApplicationMaster.java
deleted file mode 100644
index 8aeb718..0000000
--- a/asterixdb/asterix-yarn/src/main/java/org/apache/asterix/aoya/AsterixApplicationMaster.java
+++ /dev/null
@@ -1,1334 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.asterix.aoya;
-
-import java.io.BufferedReader;
-import java.io.File;
-import java.io.FileNotFoundException;
-import java.io.IOException;
-import java.io.InputStreamReader;
-import java.io.PrintWriter;
-import java.io.StringWriter;
-import java.net.InetAddress;
-import java.net.URI;
-import java.net.URISyntaxException;
-import java.net.UnknownHostException;
-import java.nio.ByteBuffer;
-import java.util.ArrayDeque;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Queue;
-import java.util.Random;
-import java.util.Vector;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.CopyOnWriteArrayList;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicInteger;
-
-import org.apache.asterix.common.config.GlobalConfig;
-import org.apache.asterix.common.utils.StorageConstants;
-import org.apache.asterix.event.schema.yarnCluster.Cluster;
-import org.apache.asterix.event.schema.yarnCluster.MasterNode;
-import org.apache.asterix.event.schema.yarnCluster.Node;
-import org.apache.asterix.hyracks.bootstrap.CCApplication;
-import org.apache.asterix.hyracks.bootstrap.NCApplication;
-import org.apache.commons.cli.CommandLine;
-import org.apache.commons.cli.GnuParser;
-import org.apache.commons.cli.HelpFormatter;
-import org.apache.commons.cli.Options;
-import org.apache.commons.cli.ParseException;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.yarn.api.ApplicationConstants;
-import org.apache.hadoop.yarn.api.ApplicationConstants.Environment;
-import org.apache.hadoop.yarn.api.ContainerManagementProtocol;
-import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
-import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
-import org.apache.hadoop.yarn.api.records.Container;
-import org.apache.hadoop.yarn.api.records.ContainerId;
-import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
-import org.apache.hadoop.yarn.api.records.ContainerState;
-import org.apache.hadoop.yarn.api.records.ContainerStatus;
-import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
-import org.apache.hadoop.yarn.api.records.LocalResource;
-import org.apache.hadoop.yarn.api.records.LocalResourceType;
-import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
-import org.apache.hadoop.yarn.api.records.NodeReport;
-import org.apache.hadoop.yarn.api.records.Priority;
-import org.apache.hadoop.yarn.api.records.Resource;
-import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest;
-import org.apache.hadoop.yarn.client.api.async.AMRMClientAsync;
-import org.apache.hadoop.yarn.client.api.async.NMClientAsync;
-import org.apache.hadoop.yarn.client.api.async.impl.NMClientAsyncImpl;
-import org.apache.hadoop.yarn.conf.YarnConfiguration;
-import org.apache.hadoop.yarn.exceptions.YarnException;
-import org.apache.hadoop.yarn.util.ConverterUtils;
-import org.apache.hadoop.yarn.util.Records;
-import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
-import org.apache.log4j.ConsoleAppender;
-import org.apache.log4j.Level;
-import org.apache.log4j.Logger;
-import org.apache.log4j.PatternLayout;
-
-public class AsterixApplicationMaster {
-
- static {
- Logger rootLogger = Logger.getRootLogger();
- rootLogger.setLevel(Level.INFO);
- rootLogger.addAppender(new ConsoleAppender(new PatternLayout("%-6r [%p] %c - %m%n")));
- }
-
- private static final Log LOG = LogFactory.getLog(AsterixApplicationMaster.class);
- private static final String CLUSTER_DESC_PATH = "cluster-config.xml";
- private static final String ASTERIX_CONF_NAME = "asterix-configuration.xml";
- private static final String ASTERIX_ZIP_NAME = "asterix-server.zip";
-
- private static final int CC_MEMORY_MBS_DEFAULT = 1024;
- private static final int NC_MEMORY_MBS_DEFAULT = 1536;
- private static final String EXTERNAL_CC_JAVA_OPTS_DEFAULT = "-Xmx" + CC_MEMORY_MBS_DEFAULT + "m";
- private static final String EXTERNAL_NC_JAVA_OPTS_DEFAULT = "-Xmx" + NC_MEMORY_MBS_DEFAULT + "m";
- private static final String OBLITERATOR_CLASSNAME = "org.apache.asterix.aoya.Deleter";
- private static final String HDFS_BACKUP_CLASSNAME = "org.apache.asterix.aoya.HDFSBackup";
- private static final String NC_CLASSNAME = "org.apache.hyracks.control.nc.NCDriver";
- private static final String CC_CLASSNAME = "org.apache.hyracks.control.cc.CCDriver";
- private static final String JAVA_HOME = System.getProperty("java.home");
- private boolean doneAllocating = false;
-
- // Configuration
- private Configuration conf;
-
- // Handle to communicate with the Resource Manager
- private AMRMClientAsync<ContainerRequest> resourceManager;
-
- // Handle to communicate with the Node Manager
- private NMClientAsync nmClientAsync;
- // Listen to process the response from the Node Manager
- private NMCallbackHandler containerListener;
- // Application Attempt Id ( combination of attemptId and fail count )
- private ApplicationAttemptId appAttemptID;
-
- // TODO
- // For status update for clients - yet to be implemented
- // Hostname of the container
- private String appMasterHostname = "";
- // Port on which the app master listens for status updates from clients
- private int appMasterRpcPort = new Random().nextInt(65535 - 49152);
- // Tracking url to which app master publishes info for clients to monitor
- private String appMasterTrackingUrl = "";
-
- // Counter for completed containers ( complete denotes successful or failed )
- private AtomicInteger numCompletedContainers = new AtomicInteger();
- // Allocated container count so that we know how many containers has the RM
- // allocated to us
- private AtomicInteger numAllocatedContainers = new AtomicInteger();
- // Count of failed containers
- private AtomicInteger numFailedContainers = new AtomicInteger();
- // Count of containers already requested from the RM
- // Needed as once requested, we should not request for containers again.
- // Only request for more if the original requirement changes.
- private AtomicInteger numRequestedContainers = new AtomicInteger();
- //Tells us whether the Cluster Controller is up so we can safely start some Node Controllers
- private AtomicBoolean ccUp = new AtomicBoolean();
- private AtomicBoolean ccStarted = new AtomicBoolean();
- private Queue<Node> pendingNCs = new ArrayDeque<>();
-
- //HDFS path to AsterixDB distributable zip
- private String asterixZipPath = "";
- // Timestamp needed for creating a local resource
- private long asterixZipTimestamp = 0;
- // File length needed for local resource
- private long asterixZipLen = 0;
-
- //HDFS path to AsterixDB cluster description
- private String asterixConfPath = "";
- // Timestamp needed for creating a local resource
- private long asterixConfTimestamp = 0;
- // File length needed for local resource
- private long asterixConfLen = 0;
-
- private String instanceConfPath = "";
-
- //base dir under which all configs and binaries lie
- private String dfsBasePath;
-
- private int numTotalContainers = 0;
-
- // Set the local resources
- private Map<String, LocalResource> localResources = new HashMap<>();
-
- private Cluster clusterDesc = null;
- private MasterNode cC = null;
- private String ccJavaOpts = null;
- private int ccMem = 0;
- private String ncJavaOpts = null;
- private int ncMem = 0;
- private volatile boolean done;
- private volatile boolean success;
-
- private boolean obliterate = false;
- private Path appMasterJar = null;
- private boolean backup = false;
- long backupTimestamp;
- String snapName;
- private boolean restore = false;
- private boolean initial = false;
-
- // Launch threads
- private List<Thread> launchThreads = new CopyOnWriteArrayList<>();
-
- public static void main(String[] args) {
-
- boolean result = false;
- try {
-
- AsterixApplicationMaster appMaster = new AsterixApplicationMaster();
- LOG.info("Initializing ApplicationMaster");
- appMaster.setEnvs(appMaster.setArgs(args));
- boolean doRun = appMaster.init();
- if (!doRun) {
- System.exit(0);
- }
- result = appMaster.run();
- } catch (Exception e) {
- LOG.fatal("Error running ApplicationMaster", e);
- System.exit(1);
- }
- if (result) {
- LOG.info("Application Master completed successfully. exiting");
- System.exit(0);
- } else {
- LOG.info("Application Master failed. exiting");
- System.exit(2);
- }
- }
-
- private void dumpOutDebugInfo() {
-
- LOG.info("Dump debug output");
- Map<String, String> envs = System.getenv();
- envs.forEach((key, value) -> {
- LOG.info("System env: key=" + key + ", val=" + value);
- System.out.println("System env: key=" + key + ", val=" + value);
- });
-
- String cmd = "ls -alhLR";
- Runtime run = Runtime.getRuntime();
- Process pr = null;
- try {
- pr = run.exec(cmd);
- pr.waitFor();
-
- BufferedReader buf = new BufferedReader(new InputStreamReader(pr.getInputStream()));
- String line = "";
- while ((line = buf.readLine()) != null) {
- LOG.info("System CWD content: " + line);
- System.out.println("System CWD content: " + line);
- }
- buf.close();
- } catch (IOException e) {
- LOG.info(e);
- } catch (InterruptedException e) {
- LOG.info(e);
- }
- }
-
- public AsterixApplicationMaster() {
- // Set up the configuration and RPC
- conf = new YarnConfiguration();
-
- }
-
- public CommandLine setArgs(String[] args) throws ParseException {
- Options opts = new Options();
- opts.addOption("app_attempt_id", true, "App Attempt ID. Not to be used unless for testing purposes");
- opts.addOption("priority", true, "Application Priority. Default 0");
- opts.addOption("debug", false, "Dump out debug information");
- opts.addOption("help", false, "Print usage");
- opts.addOption("initial", false, "Initialize existing Asterix instance.");
- opts.addOption("obliterate", false, "Delete asterix instance completely.");
- opts.addOption("backup", false, "Back up AsterixDB instance");
- opts.addOption("restore", true, "Restore an AsterixDB instance");
-
- CommandLine cliParser = new GnuParser().parse(opts, args);
-
- if (cliParser.hasOption("help")) {
- printUsage(opts);
- }
-
- if (cliParser.hasOption("debug")) {
- dumpOutDebugInfo();
- }
-
- if (cliParser.hasOption("obliterate")) {
- obliterate = true;
- }
- if (cliParser.hasOption("initial")) {
- initial = true;
- }
-
- if (cliParser.hasOption("backup")) {
- backup = true;
- backupTimestamp = System.currentTimeMillis();
- }
- if (cliParser.hasOption("restore")) {
- restore = true;
- snapName = cliParser.getOptionValue("restore");
- LOG.info(snapName);
- }
- return cliParser;
- }
-
- public void setEnvs(CommandLine cliParser) {
- Map<String, String> envs = System.getenv();
- if (envs.containsKey("HADOOP_CONF_DIR")) {
- File hadoopConfDir = new File(envs.get("HADOOP_CONF_DIR"));
- if (hadoopConfDir.isDirectory()) {
- for (File config : hadoopConfDir.listFiles()) {
- if (config.getName().matches("^.*(xml)$")) {
- conf.addResource(new Path(config.getAbsolutePath()));
- }
- }
- }
- }
- //the containerID might be in the arguments or the environment
- if (!envs.containsKey(Environment.CONTAINER_ID.name())) {
- if (cliParser.hasOption("app_attempt_id")) {
- String appIdStr = cliParser.getOptionValue("app_attempt_id", "");
- appAttemptID = ConverterUtils.toApplicationAttemptId(appIdStr);
- } else {
-
- throw new IllegalArgumentException(
- "Environment is not set correctly- please check client submission settings");
- }
- } else {
- ContainerId containerId = ConverterUtils.toContainerId(envs.get(Environment.CONTAINER_ID.name()));
- appAttemptID = containerId.getApplicationAttemptId();
- }
-
- if (!envs.containsKey(ApplicationConstants.APP_SUBMIT_TIME_ENV) || !envs.containsKey(Environment.NM_HOST.name())
- || !envs.containsKey(Environment.NM_HTTP_PORT.name())
- || !envs.containsKey(Environment.NM_PORT.name())) {
- throw new IllegalArgumentException(
- "Environment is not set correctly- please check client submission settings");
- }
- System.setProperty(GlobalConfig.CONFIG_FILE_PROPERTY,
- envs.get("PWD") + File.separator + "bin" + File.separator + ASTERIX_CONF_NAME);
-
- LOG.info("Application master for app" + ", appId=" + appAttemptID.getApplicationId().getId()
- + ", clustertimestamp=" + appAttemptID.getApplicationId().getClusterTimestamp() + ", attemptId="
- + appAttemptID.getAttemptId());
-
- asterixZipPath = envs.get(AConstants.TARLOCATION);
- asterixZipTimestamp = Long.parseLong(envs.get(AConstants.TARTIMESTAMP));
- asterixZipLen = Long.parseLong(envs.get(AConstants.TARLEN));
-
- asterixConfPath = envs.get(AConstants.CONFLOCATION);
- asterixConfTimestamp = Long.parseLong(envs.get(AConstants.CONFTIMESTAMP));
- asterixConfLen = Long.parseLong(envs.get(AConstants.CONFLEN));
-
- instanceConfPath = envs.get(AConstants.INSTANCESTORE);
- //the only time this is null is during testing, when asterix-yarn isn't packaged in a JAR yet.
- if (envs.get(AConstants.APPLICATIONMASTERJARLOCATION) != null
- && !envs.get(AConstants.APPLICATIONMASTERJARLOCATION).endsWith(File.separator)) {
- appMasterJar = new Path(envs.get(AConstants.APPLICATIONMASTERJARLOCATION));
- } else {
- appMasterJar = null;
- }
-
- dfsBasePath = envs.get(AConstants.DFS_BASE);
- //If the NM has an odd environment where the proper hadoop XML configs dont get imported, we can end up not being able to talk to the RM
- // this solves that!
- //in a testing environment these can be null however.
- if (envs.get(AConstants.RMADDRESS) != null) {
- conf.set("yarn.resourcemanager.address", envs.get(AConstants.RMADDRESS));
- LOG.info("RM Address: " + envs.get(AConstants.RMADDRESS));
- }
- if (envs.get(AConstants.RMADDRESS) != null) {
- conf.set("yarn.resourcemanager.scheduler.address", envs.get(AConstants.RMSCHEDULERADDRESS));
- }
- ccJavaOpts = envs.get(AConstants.CC_JAVA_OPTS);
- //set defaults if no special given options
- if (ccJavaOpts == null) {
- ccJavaOpts = EXTERNAL_CC_JAVA_OPTS_DEFAULT;
- }
- ncJavaOpts = envs.get(AConstants.NC_JAVA_OPTS);
- if (ncJavaOpts == null) {
- ncJavaOpts = EXTERNAL_NC_JAVA_OPTS_DEFAULT;
- }
-
- LOG.info("Path suffix: " + instanceConfPath);
- }
-
- public boolean init() throws ParseException, IOException, AlgebricksException, YarnException {
- try {
- localizeDFSResources();
- clusterDesc = Utils.parseYarnClusterConfig(CLUSTER_DESC_PATH);
- cC = clusterDesc.getMasterNode();
- appMasterTrackingUrl = "http://" + cC.getClientIp() + ":" + cC.getClientPort() + Path.SEPARATOR;
- distributeAsterixConfig();
- //now let's read what's in there so we can set the JVM opts right
- LOG.debug("config file loc: " + System.getProperty(GlobalConfig.CONFIG_FILE_PROPERTY));
- } catch (FileNotFoundException | IllegalStateException e) {
- LOG.error("Could not deserialize Cluster Config from disk- aborting!");
- LOG.error(e);
- throw e;
- }
-
- return true;
- }
-
- /**
- * Sets up the parameters for the Asterix config.
- *
- * @throws IOException
- */
- private void distributeAsterixConfig() throws IOException {
- FileSystem fs = FileSystem.get(conf);
- String pathSuffix = instanceConfPath + File.separator + ASTERIX_CONF_NAME;
- Path dst = new Path(dfsBasePath, pathSuffix);
- URI paramLocation = dst.toUri();
- FileStatus paramFileStatus = fs.getFileStatus(dst);
- Long paramLen = paramFileStatus.getLen();
- Long paramTimestamp = paramFileStatus.getModificationTime();
- LocalResource asterixParamLoc = Records.newRecord(LocalResource.class);
- asterixParamLoc.setType(LocalResourceType.FILE);
- asterixParamLoc.setVisibility(LocalResourceVisibility.PRIVATE);
- asterixParamLoc.setResource(ConverterUtils.getYarnUrlFromURI(paramLocation));
- asterixParamLoc.setTimestamp(paramTimestamp);
- asterixParamLoc.setSize(paramLen);
- localResources.put(ASTERIX_CONF_NAME, asterixParamLoc);
-
- }
-
- /**
- * @param c
- * The cluster exception to attempt to alocate with the RM
- * @throws YarnException
- */
- private void requestResources(Cluster c) throws YarnException, UnknownHostException {
- //set memory
- if (c.getCcContainerMem() != null) {
- ccMem = Integer.parseInt(c.getCcContainerMem());
- } else {
- ccMem = CC_MEMORY_MBS_DEFAULT;
- }
- if (c.getNcContainerMem() != null) {
- ncMem = Integer.parseInt(c.getNcContainerMem());
- } else {
- ncMem = CC_MEMORY_MBS_DEFAULT;
- }
- //request CC
- int numNodes = 0;
- ContainerRequest ccAsk = hostToRequest(cC.getClusterIp(), true);
- resourceManager.addContainerRequest(ccAsk);
- LOG.info("Asked for CC: " + Arrays.toString(ccAsk.getNodes().toArray()));
- numNodes++;
- //now we wait to be given the CC before starting the NCs...
- //we will wait a minute.
- int deathClock = 60;
- while (ccUp.get() == false && deathClock > 0) {
- try {
- Thread.sleep(1000);
- } catch (InterruptedException ex) {
- LOG.debug(ex);
- }
- --deathClock;
- }
- if (deathClock == 0 && ccUp.get() == false) {
- throw new YarnException("Couldn't allocate container for CC. Abort!");
- }
- LOG.info("Waiting for CC process to start");
- //TODO: inspect for actual liveness instead of waiting.
- // is there a good way to do this? maybe try opening a socket to it...
- try {
- Thread.sleep(10000);
- } catch (InterruptedException ex) {
- LOG.debug(ex);
- }
- //request NCs
- for (Node n : c.getNode()) {
- resourceManager.addContainerRequest(hostToRequest(n.getClusterIp(), false));
- LOG.info("Asked for NC: " + n.getClusterIp());
- numNodes++;
- synchronized (pendingNCs) {
- pendingNCs.add(n);
- }
- }
- LOG.info("Requested all NCs and CCs. Wait for things to settle!");
- numRequestedContainers.set(numNodes);
- numTotalContainers = numNodes;
- doneAllocating = true;
-
- }
-
- /**
- * Asks the RM for a particular host, nicely.
- *
- * @param host
- * The host to request
- * @param cc
- * Whether or not the host is the CC
- * @return A container request that is (hopefully) for the host we asked for.
- */
- private ContainerRequest hostToRequest(String host, boolean cc) throws UnknownHostException {
- InetAddress hostIp = InetAddress.getByName(host);
- Priority pri = Records.newRecord(Priority.class);
- pri.setPriority(0);
- Resource capability = Records.newRecord(Resource.class);
- if (cc) {
- capability.setMemory(ccMem);
- } else {
- capability.setMemory(ncMem);
- }
- //we dont set anything else because we don't care about that and yarn doesn't honor it yet
- String[] hosts = new String[1];
- //TODO this is silly
- hosts[0] = hostIp.getHostName();
- LOG.info("IP addr: " + host + " resolved to " + hostIp.getHostName());
- ContainerRequest request = new ContainerRequest(capability, hosts, null, pri, false);
- LOG.info("Requested host ask: " + request.getNodes());
- return request;
- }
-
- /**
- * Determines whether or not a container is the one on which the CC should reside
- *
- * @param c
- * The container in question
- * @return True if the container should have the CC process on it, false otherwise.
- */
- boolean containerIsCC(Container c) {
- String containerHost = c.getNodeId().getHost();
- try {
- InetAddress containerIp = InetAddress.getByName(containerHost);
- LOG.info(containerIp.getCanonicalHostName());
- InetAddress ccIp = InetAddress.getByName(cC.getClusterIp());
- LOG.info(ccIp.getCanonicalHostName());
- return containerIp.getCanonicalHostName().equals(ccIp.getCanonicalHostName());
- } catch (UnknownHostException e) {
- return false;
- }
- }
-
- /**
- * Attempts to find the Node in the Cluster Description that matches this container
- *
- * @param c
- * The container to resolve
- * @return The node this container corresponds to
- * @throws java.net.UnknownHostException
- * if the container isn't present in the description
- */
- Node containerToNode(Container c, Cluster cl) throws UnknownHostException {
- String containerHost = c.getNodeId().getHost();
- InetAddress containerIp = InetAddress.getByName(containerHost);
- LOG.info("Resolved Container IP: " + containerIp);
- for (Node node : cl.getNode()) {
- InetAddress nodeIp = InetAddress.getByName(node.getClusterIp());
- LOG.info(nodeIp + "?=" + containerIp);
- if (nodeIp.equals(containerIp)) {
- return node;
- }
- }
- //if we find nothing, this is bad...
- throw new java.net.UnknownHostException("Could not resolve container" + containerHost + " to node");
- }
-
- /**
- * Here I am just pointing the Containers to the exisiting HDFS resources given by the Client
- * filesystem of the nodes.
- *
- * @throws IOException
- */
- private void localizeDFSResources() throws IOException {
- //if performing an 'offline' task, skip a lot of resource distribution
- if (obliterate || backup || restore) {
- if (appMasterJar == null || ("").equals(appMasterJar)) {
- //this can happen in a jUnit testing environment. we don't need to set it there.
- if (!conf.getBoolean(YarnConfiguration.IS_MINI_YARN_CLUSTER, false)) {
- throw new IllegalStateException("AM jar not provided in environment.");
- } else {
- return;
- }
- }
- FileSystem fs = FileSystem.get(conf);
- FileStatus appMasterJarStatus = fs.getFileStatus(appMasterJar);
- LocalResource obliteratorJar = Records.newRecord(LocalResource.class);
- obliteratorJar.setType(LocalResourceType.FILE);
- obliteratorJar.setVisibility(LocalResourceVisibility.PRIVATE);
- obliteratorJar.setResource(ConverterUtils.getYarnUrlFromPath(appMasterJar));
- obliteratorJar.setTimestamp(appMasterJarStatus.getModificationTime());
- obliteratorJar.setSize(appMasterJarStatus.getLen());
- localResources.put("asterix-yarn.jar", obliteratorJar);
- LOG.info(localResources.values());
- return;
- }
- //otherwise, distribute evertything to start up asterix
-
- LocalResource asterixZip = Records.newRecord(LocalResource.class);
-
- //this un-tar's the asterix distribution
- asterixZip.setType(LocalResourceType.ARCHIVE);
-
- asterixZip.setVisibility(LocalResourceVisibility.PRIVATE);
- try {
- asterixZip.setResource(ConverterUtils.getYarnUrlFromURI(new URI(asterixZipPath)));
-
- } catch (URISyntaxException e) {
- LOG.error("Error locating Asterix zip" + " in env, path=" + asterixZipPath);
- throw new IOException(e);
- }
-
- asterixZip.setTimestamp(asterixZipTimestamp);
- asterixZip.setSize(asterixZipLen);
- localResources.put(ASTERIX_ZIP_NAME, asterixZip);
-
- //now let's do the same for the cluster description XML
- LocalResource asterixConf = Records.newRecord(LocalResource.class);
- asterixConf.setType(LocalResourceType.FILE);
-
- asterixConf.setVisibility(LocalResourceVisibility.PRIVATE);
- try {
- asterixConf.setResource(ConverterUtils.getYarnUrlFromURI(new URI(asterixConfPath)));
- } catch (URISyntaxException e) {
- LOG.error("Error locating Asterix config" + " in env, path=" + asterixConfPath);
- throw new IOException(e);
- }
- //TODO: I could avoid localizing this everywhere by only calling this block on the metadata node.
- asterixConf.setTimestamp(asterixConfTimestamp);
- asterixConf.setSize(asterixConfLen);
- localResources.put("cluster-config.xml", asterixConf);
- //now add the libraries if there are any
- try {
- FileSystem fs = FileSystem.get(conf);
- Path p = new Path(dfsBasePath, instanceConfPath + File.separator + "library" + Path.SEPARATOR);
- if (fs.exists(p)) {
- FileStatus[] dataverses = fs.listStatus(p);
- for (FileStatus d : dataverses) {
- if (!d.isDirectory()) {
- throw new IOException("Library configuration directory structure is incorrect");
- }
- FileStatus[] libraries = fs.listStatus(d.getPath());
- for (FileStatus l : libraries) {
- if (l.isDirectory()) {
- throw new IOException("Library configuration directory structure is incorrect");
- }
- LocalResource lr = Records.newRecord(LocalResource.class);
- lr.setResource(ConverterUtils.getYarnUrlFromURI(l.getPath().toUri()));
- lr.setSize(l.getLen());
- lr.setTimestamp(l.getModificationTime());
- lr.setType(LocalResourceType.ARCHIVE);
- lr.setVisibility(LocalResourceVisibility.PRIVATE);
- localResources.put("library" + Path.SEPARATOR + d.getPath().getName() + Path.SEPARATOR
- + l.getPath().getName().split("\\.")[0], lr);
- LOG.info("Found library: " + l.getPath().toString());
- LOG.info(l.getPath().getName());
- }
- }
- }
- } catch (FileNotFoundException e) {
- LOG.info("No external libraries present");
- //do nothing, it just means there aren't libraries. that is possible and ok
- // it should be handled by the fs.exists(p) check though.
- }
- LOG.info(localResources.values());
-
- }
-
- private void printUsage(Options opts) {
- new HelpFormatter().printHelp("ApplicationMaster", opts);
- }
-
- /**
- * Start the AM and request all necessary resources.
- *
- * @return True if the run fully succeeded, false otherwise.
- * @throws YarnException
- * @throws IOException
- */
- public boolean run() throws YarnException, IOException {
- LOG.info("Starting ApplicationMaster");
-
- AMRMClientAsync.CallbackHandler allocListener = new RMCallbackHandler();
- resourceManager = AMRMClientAsync.createAMRMClientAsync(1000, allocListener);
- resourceManager.init(conf);
- resourceManager.start();
-
- containerListener = new NMCallbackHandler();
- nmClientAsync = new NMClientAsyncImpl(containerListener);
- nmClientAsync.init(conf);
- nmClientAsync.start();
-
- // Register self with ResourceManager
- // This will start heartbeating to the RM
- try {
- appMasterHostname = InetAddress.getLocalHost().toString();
- } catch (java.net.UnknownHostException uhe) {
- appMasterHostname = uhe.toString();
- }
- RegisterApplicationMasterResponse response = resourceManager.registerApplicationMaster(appMasterHostname,
- appMasterRpcPort, appMasterTrackingUrl);
-
- // Dump out information about cluster capability as seen by the
- // resource manager
- int maxMem = response.getMaximumResourceCapability().getMemory();
- LOG.info("Max mem capabililty of resources in this cluster " + maxMem);
-
- try {
- requestResources(clusterDesc);
- } catch (YarnException e) {
- LOG.error("Could not allocate resources properly:" + e.getMessage());
- done = true;
- throw e;
- }
- //now we just sit and listen for messages from the RM
-
- while (!done) {
- try {
- Thread.sleep(200);
- } catch (InterruptedException ex) {
- }
- }
- finish();
- return success;
- }
-
- /**
- * Clean up, whether or not we were successful.
- */
- private void finish() {
- // Join all launched threads
- // needed for when we time out
- // and we need to release containers
- for (Thread launchThread : launchThreads) {
- try {
- launchThread.join(10000);
- } catch (InterruptedException e) {
- LOG.info("Exception thrown in thread join: " + e.getMessage());
- //from https://stackoverflow.com/questions/4812570/how-to-store-printstacktrace-into-a-string
- StringWriter errors = new StringWriter();
- e.printStackTrace(new PrintWriter(errors));
- LOG.error(errors.toString());
- }
- }
-
- // When the application completes, it should stop all running containers
- LOG.info("Application completed. Stopping running containers");
- nmClientAsync.stop();
-
- // When the application completes, it should send a finish application
- // signal to the RM
- LOG.info("Application completed. Signalling finish to RM");
-
- FinalApplicationStatus appStatus;
- String appMessage = null;
- success = true;
- if (numFailedContainers.get() == 0 && numCompletedContainers.get() == numTotalContainers) {
- appStatus = FinalApplicationStatus.SUCCEEDED;
- } else {
- appStatus = FinalApplicationStatus.FAILED;
- appMessage = "Diagnostics." + ", total=" + numTotalContainers + ", completed="
- + numCompletedContainers.get() + ", allocated=" + numAllocatedContainers.get() + ", failed="
- + numFailedContainers.get();
- success = false;
- }
- try {
- resourceManager.unregisterApplicationMaster(appStatus, appMessage, null);
- } catch (YarnException ex) {
- LOG.error("Failed to unregister application", ex);
- } catch (IOException e) {
- LOG.error("Failed to unregister application", e);
- }
- done = true;
- resourceManager.stop();
- }
-
- /**
- * This handles the information that comes in from the RM while the AM
- * is running.
- */
- private class RMCallbackHandler implements AMRMClientAsync.CallbackHandler {
- @Override
- public void onContainersCompleted(List<ContainerStatus> completedContainers) {
- LOG.info("Got response from RM for container ask, completedCnt=" + completedContainers.size());
- for (ContainerStatus containerStatus : completedContainers) {
- LOG.info("Got container status for containerID=" + containerStatus.getContainerId() + ", state="
- + containerStatus.getState() + ", exitStatus=" + containerStatus.getExitStatus()
- + ", diagnostics=" + containerStatus.getDiagnostics());
-
- // non complete containers should not be here
- if (containerStatus.getState() != ContainerState.COMPLETE) {
- throw new IllegalStateException("Non-completed container given as completed by RM.");
- }
-
- // increment counters for completed/failed containers
- int exitStatus = containerStatus.getExitStatus();
- if (0 != exitStatus) {
- // container failed
- numCompletedContainers.incrementAndGet();
- numFailedContainers.incrementAndGet();
- } else {
- // nothing to do
- // container completed successfully
- numCompletedContainers.incrementAndGet();
- LOG.info("Container completed successfully." + ", containerId=" + containerStatus.getContainerId());
- }
- }
- //stop infinite looping of run()
- if (numCompletedContainers.get() + numFailedContainers.get() == numAllocatedContainers.get()
- && doneAllocating) {
- done = true;
- }
- }
-
- @Override
- public void onContainersAllocated(List<Container> allocatedContainers) {
- LOG.info("Got response from RM for container ask, allocatedCnt=" + allocatedContainers.size());
- numAllocatedContainers.addAndGet(allocatedContainers.size());
- for (Container allocatedContainer : allocatedContainers) {
- synchronized (pendingNCs) {
- try {
- if (!pendingNCs.contains(containerToNode(allocatedContainer, clusterDesc)) && ccUp.get()) {
- nmClientAsync.stopContainerAsync(allocatedContainer.getId(),
- allocatedContainer.getNodeId());
- continue;
- }
- } catch (UnknownHostException ex) {
- LOG.error("Unknown host allocated for us by RM- this shouldn't happen.", ex);
- }
- }
- LOG.info("Launching shell command on a new container." + ", containerId=" + allocatedContainer.getId()
- + ", containerNode=" + allocatedContainer.getNodeId().getHost() + ":"
- + allocatedContainer.getNodeId().getPort() + ", containerNodeURI="
- + allocatedContainer.getNodeHttpAddress() + ", containerResourceMemory"
- + allocatedContainer.getResource().getMemory());
-
- LaunchAsterixContainer runnableLaunchContainer = new LaunchAsterixContainer(allocatedContainer,
- containerListener);
- Thread launchThread = new Thread(runnableLaunchContainer, "Asterix CC/NC");
-
- // I want to know if this node is the CC, because it must start before the NCs.
- LOG.info("Allocated: " + allocatedContainer.getNodeId().getHost());
- LOG.info("CC : " + cC.getId());
- synchronized (pendingNCs) {
- try {
- if (ccUp.get()) {
- pendingNCs.remove(containerToNode(allocatedContainer, clusterDesc));
- }
- } catch (UnknownHostException ex) {
- LOG.error("Unknown host allocated for us by RM- this shouldn't happen.", ex);
- }
- }
-
- if (containerIsCC(allocatedContainer)) {
- ccUp.set(true);
- }
- // launch and start the container on a separate thread to keep
- // the main thread unblocked
- // as all containers may not be allocated at one go.
- launchThreads.add(launchThread);
- launchThread.start();
- }
- }
-
- /**
- * Ask the processes on the container to gracefully exit.
- */
- @Override
- public void onShutdownRequest() {
- LOG.info("AM shutting down per request");
- done = true;
- }
-
- @Override
- public void onNodesUpdated(List<NodeReport> updatedNodes) {
- //TODO: This will become important when we deal with what happens if an NC dies
- }
-
- @Override
- public float getProgress() {
- //return half way because progress is basically meaningless for us
- if (!doneAllocating) {
- return 0.0f;
- }
- return (float) 0.5;
- }
-
- @Override
- public void onError(Throwable arg0) {
- LOG.error("Fatal Error recieved by AM: " + arg0);
- done = true;
- }
- }
-
- private class NMCallbackHandler implements NMClientAsync.CallbackHandler {
-
- private ConcurrentMap<ContainerId, Container> containers = new ConcurrentHashMap<>();
-
- public void addContainer(ContainerId containerId, Container container) {
- containers.putIfAbsent(containerId, container);
- }
-
- @Override
- public void onContainerStopped(ContainerId containerId) {
- if (LOG.isDebugEnabled()) {
- LOG.debug("Succeeded to stop Container " + containerId);
- }
- containers.remove(containerId);
- }
-
- @Override
- public void onContainerStatusReceived(ContainerId containerId, ContainerStatus containerStatus) {
- if (LOG.isDebugEnabled()) {
- LOG.debug("Container Status: id=" + containerId + ", status=" + containerStatus);
- }
- }
-
- @Override
- public void onContainerStarted(ContainerId containerId, Map<String, ByteBuffer> allServiceResponse) {
- if (LOG.isDebugEnabled()) {
- LOG.debug("Succeeded to start Container " + containerId);
- }
- Container container = containers.get(containerId);
- if (container != null) {
- nmClientAsync.getContainerStatusAsync(containerId, container.getNodeId());
- }
- }
-
- @Override
- public void onStartContainerError(ContainerId containerId, Throwable t) {
- LOG.error("Failed to start Container " + containerId);
- containers.remove(containerId);
- }
-
- @Override
- public void onGetContainerStatusError(ContainerId containerId, Throwable t) {
- LOG.error("Failed to query the status of Container " + containerId);
- }
-
- @Override
- public void onStopContainerError(ContainerId containerId, Throwable t) {
- LOG.error("Failed to stop Container " + containerId);
- containers.remove(containerId);
- }
- }
-
- /**
- * Thread to connect to the {@link ContainerManagementProtocol} and launch the container
- * that will execute the shell command.
- */
- private class LaunchAsterixContainer implements Runnable {
-
- // Allocated container
- final Container container;
-
- final NMCallbackHandler containerListener;
-
- /**
- * @param lcontainer
- * Allocated container
- * @param containerListener
- * Callback handler of the container
- */
- public LaunchAsterixContainer(Container lcontainer, NMCallbackHandler containerListener) {
- this.container = lcontainer;
- this.containerListener = containerListener;
- }
-
- /**
- * Connects to CM, sets up container launch context
- * for shell command and eventually dispatches the container
- * start request to the CM.
- */
- @Override
- public void run() {
- LOG.info("Setting up container launch container for containerid=" + container.getId());
- ContainerLaunchContext ctx = Records.newRecord(ContainerLaunchContext.class);
- // Set the local resources
- ctx.setLocalResources(localResources);
-
- //Set the env variables to be setup in the env where the application master will be run
- LOG.info("Set the environment for the node");
- Map<String, String> env = new HashMap<>();
-
- // Add AppMaster.jar location to classpath
- // At some point we should not be required to add
- // the hadoop specific classpaths to the env.
- // It should be provided out of the box.
- // For now setting all required classpaths including
- // the classpath to "." for the application jar
- StringBuilder classPathEnv = new StringBuilder(Environment.CLASSPATH.$()).append(File.pathSeparatorChar)
- .append("." + File.pathSeparatorChar + "*");
- for (String c : conf.getStrings(YarnConfiguration.YARN_APPLICATION_CLASSPATH,
- YarnConfiguration.DEFAULT_YARN_APPLICATION_CLASSPATH)) {
- classPathEnv.append(File.pathSeparatorChar);
- classPathEnv.append(c.trim());
- }
- classPathEnv.append('.').append(File.pathSeparatorChar).append("log4j.properties");
-
- // add the runtime classpath needed for tests to work
- if (conf.getBoolean(YarnConfiguration.IS_MINI_YARN_CLUSTER, false)) {
- classPathEnv.append(System.getProperty("path.separator"));
- classPathEnv.append(System.getProperty("java.class.path"));
- env.put("HADOOP_CONF_DIR", System.getProperty("user.dir") + File.separator + "target" + File.separator);
- }
-
- env.put("CLASSPATH", classPathEnv.toString());
-
- ctx.setEnvironment(env);
- LOG.info(ctx.getEnvironment().toString());
- List<String> startCmd = null;
- if (obliterate) {
- LOG.debug("AM in obliterate mode");
- startCmd = produceObliterateCommand(container);
- } else if (backup) {
- startCmd = produceBackupCommand(container);
- LOG.debug("AM in backup mode");
- } else if (restore) {
- startCmd = produceRestoreCommand(container);
- LOG.debug("AM in restore mode");
- } else {
- startCmd = produceStartCmd(container);
- }
-
- if (startCmd == null || startCmd.size() == 0) {
- LOG.fatal("Could not map one or more NCs to NM container hosts- aborting!");
- return;
- }
-
- for (String s : startCmd) {
- LOG.info("Command to execute: " + s);
- }
- ctx.setCommands(startCmd);
- containerListener.addContainer(container.getId(), container);
- //finally start the container!?
- nmClientAsync.startContainerAsync(container, ctx);
- }
-
- /**
- * Determines for a given container what the necessary command line
- * arguments are to start the Asterix processes on that instance
- *
- * @param container
- * The container to produce the commands for
- * @return A list of the commands that should be executed
- */
- private List<String> produceStartCmd(Container container) {
- List<String> commands = new ArrayList<>();
- // Set the necessary command to execute on the allocated container
- List<CharSequence> vargs = new ArrayList<>(5);
-
- vargs.add(JAVA_HOME + File.separator + "bin" + File.separator + "java");
- vargs.add("-classpath " + '\'' + ASTERIX_ZIP_NAME + File.separator + "repo" + File.separator + "*\'");
- vargs.add("-Dapp.repo=" + ASTERIX_ZIP_NAME + File.separator + "repo" + File.separator);
- //first see if this node is the CC
- if (containerIsCC(container) && (ccStarted.get() == false)) {
- LOG.info("CC found on container" + container.getNodeId().getHost());
- //get our java opts
- vargs.add(ccJavaOpts);
- vargs.add(CC_CLASSNAME);
- vargs.add("-app-class " + CCApplication.class.getName());
- vargs.add("-address " + cC.getClusterIp());
- vargs.add("-client-listen-address " + cC.getClientIp());
- //pass CC optional parameters
- if (clusterDesc.getHeartbeatPeriod() != null) {
- vargs.add("-heartbeat-period " + String.valueOf(clusterDesc.getHeartbeatPeriod().intValue()));
- }
- if (clusterDesc.getMaxHeartbeatLapsePeriods() != null) {
- vargs.add("-heartbeat-max-misses "
- + String.valueOf(clusterDesc.getMaxHeartbeatLapsePeriods().intValue()));
- }
- if (clusterDesc.getProfileDumpPeriod() != null) {
- vargs.add("-profile-dump-period " + String.valueOf(clusterDesc.getProfileDumpPeriod().intValue()));
- }
- if (clusterDesc.getJobHistorySize() != null) {
- vargs.add("-job-history-size " + String.valueOf(clusterDesc.getJobHistorySize().intValue()));
- }
- if (clusterDesc.getResultTimeToLive() != null) {
- vargs.add("-result-ttl " + String.valueOf(clusterDesc.getResultTimeToLive().intValue()));
- }
- if (clusterDesc.getResultSweepThreshold() != null) {
- vargs.add("-result-sweep-threshold "
- + String.valueOf(clusterDesc.getResultSweepThreshold().intValue()));
- }
- if (clusterDesc.getCcRoot() != null) {
- vargs.add("-root-dir " + clusterDesc.getCcRoot());
- }
- ccStarted.set(true);
-
- } else {
- //now we need to know what node we are on, so we can apply the correct properties
-
- Node local;
- try {
- local = containerToNode(container, clusterDesc);
- LOG.info("Attempting to start NC on host " + local.getId());
- String iodevice = local.getIodevices();
- if (iodevice == null) {
- iodevice = clusterDesc.getIodevices();
- }
- vargs.add(ncJavaOpts);
- vargs.add(NC_CLASSNAME);
- vargs.add("-app-class " + NCApplication.class.getName());
- vargs.add("-node-id " + local.getId());
- vargs.add("-cluster-address " + cC.getClusterIp());
- vargs.add("-iodevices " + iodevice);
- vargs.add("-address " + local.getClusterIp());
- vargs.add("-data-listen-address " + local.getClusterIp());
- vargs.add("-result-listen-address " + local.getClusterIp());
- if (initial) {
- vargs.add("-initial-run ");
- }
- } catch (UnknownHostException e) {
- LOG.error("Unable to find NC or CC configured for host: " + container.getId() + " " + e);
- }
- }
-
- // Add log redirect params
- vargs.add("1>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + File.separator + "stdout");
- vargs.add("2>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + File.separator + "stderr");
-
- // Get final commmand
- StringBuilder command = new StringBuilder();
- for (CharSequence str : vargs) {
- command.append(str).append(" ");
- }
- commands.add(command.toString());
- LOG.error(Arrays.toString(commands.toArray()));
- return commands;
- }
-
- private List<String> produceObliterateCommand(Container container) {
- //if this container has no NCs on it, nothing will be there to delete.
- Node local = null;
- List<String> iodevices = null;
- try {
- local = containerToNode(container, clusterDesc);
- if (local.getIodevices() == null) {
- iodevices = Arrays.asList(clusterDesc.getIodevices().split(",", -1));
- } else {
- iodevices = Arrays.asList(local.getIodevices().split(",", -1));
- }
- } catch (UnknownHostException e) {
- //we expect this may happen for the CC if it isn't colocated with an NC. otherwise it is not suppose to happen.
- if (!containerIsCC(container)) {
- LOG.error("Unable to find NC configured for host: " + container.getId() + e);
- return null;
- } else {
- return Arrays.asList("");
- }
- }
- StringBuilder classPathEnv = new StringBuilder("").append("*");
- classPathEnv.append(File.pathSeparatorChar).append("log4j.properties");
-
- List<String> commands = new ArrayList<>();
- Vector<CharSequence> vargs = new Vector<>(5);
- vargs.add(JAVA_HOME + File.separator + "bin" + File.separator + "java");
- vargs.add("-cp " + classPathEnv.toString());
- vargs.add(OBLITERATOR_CLASSNAME);
- for (String s : iodevices) {
- vargs.add(s + File.separator + clusterDesc.getStore());
- LOG.debug("Deleting from: " + s);
- //logs only exist on 1st iodevice
- if (iodevices.indexOf(s) == 0) {
- vargs.add(clusterDesc.getTxnLogDir() + "txnLogs" + File.separator);
- LOG.debug("Deleting logs from: " + clusterDesc.getTxnLogDir());
- }
- }
- vargs.add("1>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + File.separator + "stdout");
- vargs.add("2>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + File.separator + "stderr");
- StringBuilder command = new StringBuilder();
- for (CharSequence str : vargs) {
- command.append(str).append(" ");
- }
- commands.add(command.toString());
- return commands;
- }
-
- private List<String> produceBackupCommand(Container container) {
- Node local = null;
- List<String> iodevices = null;
- try {
- local = containerToNode(container, clusterDesc);
- if (local.getIodevices() == null) {
- iodevices = Arrays.asList(clusterDesc.getIodevices().split(",", -1));
- } else {
- iodevices = Arrays.asList(local.getIodevices().split(",", -1));
- }
- } catch (UnknownHostException e) {
- //we expect this may happen for the CC if it isn't colocated with an NC. otherwise it is not suppose to happen.
- if (!containerIsCC(container)) {
- LOG.error("Unable to find NC configured for host: " + container.getId() + e);
- return null;
- } else {
- return Arrays.asList("");
- }
- }
- StringBuilder classPathEnv = new StringBuilder("").append("." + File.separator + "*");
- for (String c : conf.getStrings(YarnConfiguration.YARN_APPLICATION_CLASSPATH,
- YarnConfiguration.DEFAULT_YARN_APPLICATION_CLASSPATH)) {
- classPathEnv.append(File.pathSeparatorChar);
- classPathEnv.append(c.trim());
- }
- classPathEnv.append(File.pathSeparatorChar).append("." + File.separator + "log4j.properties");
-
- List<String> commands = new ArrayList<>();
- Vector<CharSequence> vargs = new Vector<>(5);
- vargs.add(JAVA_HOME + File.separator + "bin" + File.separator + "java");
- vargs.add("-cp " + classPathEnv.toString());
- vargs.add(HDFS_BACKUP_CLASSNAME);
- vargs.add("-backup");
-
- String dstBase = instanceConfPath + "backups" + Path.SEPARATOR + backupTimestamp + Path.SEPARATOR
- + local.getId();
- try {
- createBackupFolder(dstBase);
- } catch (IOException e) {
- //something very bad happened- return null to cause attempt to abort
- return null;
- }
- for (String s : iodevices) {
- List<String> ioComponents = Arrays.asList(s.split("\\/"));
- StringBuilder dst = new StringBuilder().append(dstBase);
- for (String io : ioComponents) {
- dst.append(io);
- if (ioComponents.indexOf(io) != ioComponents.size() - 1) {
- dst.append("_");
- }
- }
- dst.append(Path.SEPARATOR);
- vargs.add(s + File.separator + clusterDesc.getStore() + "," + dst);
- LOG.debug("Backing up from: " + s);
- //logs only exist on 1st iodevice
- if (iodevices.indexOf(s) == 0) {
- LOG.debug("Backing up logs from: " + clusterDesc.getTxnLogDir());
- vargs.add(clusterDesc.getTxnLogDir() + "txnLogs" + File.separator + "," + dst);
- }
- }
- LOG.debug("Backing up to: " + instanceConfPath + "backups" + Path.SEPARATOR + local.getId());
-
- vargs.add("1>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + File.separator + "stdout");
- vargs.add("2>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + File.separator + "stderr");
- StringBuilder command = new StringBuilder();
- for (CharSequence str : vargs) {
- command.append(str).append(" ");
- }
- commands.add(command.toString());
- return commands;
- }
-
- private void createBackupFolder(String path) throws IOException {
- FileSystem fs = FileSystem.get(conf);
- Path backupFolder = new Path(path);
- fs.mkdirs(backupFolder);
- }
-
- private List<String> produceRestoreCommand(Container container) {
- if (containerIsCC(container)) {
- List<String> blank = new ArrayList<>();
- blank.add("");
- return blank;
- }
- Node local = null;
- List<String> iodevices = null;
- try {
- local = containerToNode(container, clusterDesc);
- if (local.getIodevices() == null) {
- iodevices = Arrays.asList(clusterDesc.getIodevices().split(",", -1));
- } else {
- iodevices = Arrays.asList(local.getIodevices().split(",", -1));
- }
- } catch (UnknownHostException e) {
- //we expect this may happen for the CC if it isn't colocated with an NC. otherwise it is not suppose to happen.
- if (!containerIsCC(container)) {
- LOG.error("Unable to find NC configured for host: " + container.getId() + e);
- return null;
- } else {
- return Arrays.asList("");
- }
- }
- StringBuilder classPathEnv = new StringBuilder("").append("." + File.separator + "*");
- for (String c : conf.getStrings(YarnConfiguration.YARN_APPLICATION_CLASSPATH,
- YarnConfiguration.DEFAULT_YARN_APPLICATION_CLASSPATH)) {
- classPathEnv.append(File.pathSeparatorChar);
- classPathEnv.append(c.trim());
- }
- classPathEnv.append(File.pathSeparatorChar).append("." + File.separator + "log4j.properties");
-
- List<String> commands = new ArrayList<>();
- Vector<CharSequence> vargs = new Vector<>(5);
- vargs.add(JAVA_HOME + File.separator + "bin" + File.separator + "java");
- vargs.add("-cp " + classPathEnv.toString());
- vargs.add(HDFS_BACKUP_CLASSNAME);
- vargs.add("-restore");
- String srcBase = instanceConfPath + "backups" + Path.SEPARATOR + Long.parseLong(snapName) + Path.SEPARATOR
- + local.getId();
- for (String s : iodevices) {
- List<String> ioComponents = Arrays.asList(s.split("\\/"));
- StringBuilder src = new StringBuilder().append(srcBase);
- for (String io : ioComponents) {
- src.append(io);
- if (ioComponents.indexOf(io) != ioComponents.size() - 1) {
- src.append("_");
- }
- }
- src.append(Path.SEPARATOR);
- try {
- FileSystem fs = FileSystem.get(conf);
- FileStatus[] backups = fs.listStatus(new Path(src.toString()));
- for (FileStatus b : backups) {
- if (!b.getPath().toString().contains("txnLogs")
- && !b.getPath().toString().contains(File.separator + StorageConstants.METADATA_ROOT)) {
- vargs.add(b.getPath() + "," + s + File.separator + clusterDesc.getStore());
- }
- }
- } catch (IOException e) {
- LOG.error("Could not stat backup directory in DFS");
- }
- vargs.add(src + "," + s + clusterDesc.getStore());
- LOG.debug("Restoring from: " + s);
- //logs only exist on 1st iodevice
- if (iodevices.indexOf(s) == 0) {
- vargs.add(src + "txnLogs" + File.separator + "," + clusterDesc.getTxnLogDir() + File.separator);
-
- LOG.debug("Restoring logs from: " + clusterDesc.getTxnLogDir());
- }
- }
- LOG.debug("Restoring to: " + instanceConfPath + "backups" + Path.SEPARATOR + local.getId());
-
- vargs.add("1>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + File.separator + "stdout");
- vargs.add("2>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + File.separator + "stderr");
- StringBuilder command = new StringBuilder();
- for (CharSequence str : vargs) {
- command.append(str).append(" ");
- }
- commands.add(command.toString());
- return commands;
- }
-
- }
-}