You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@drill.apache.org by ar...@apache.org on 2018/03/04 17:13:45 UTC
[05/12] drill git commit: DRILL-1170: YARN integration for Drill
http://git-wip-us.apache.org/repos/asf/drill/blob/f2ac8749/drill-yarn/src/main/java/org/apache/drill/yarn/client/AMRunner.java
----------------------------------------------------------------------
diff --git a/drill-yarn/src/main/java/org/apache/drill/yarn/client/AMRunner.java b/drill-yarn/src/main/java/org/apache/drill/yarn/client/AMRunner.java
new file mode 100644
index 0000000..5252b88
--- /dev/null
+++ b/drill-yarn/src/main/java/org/apache/drill/yarn/client/AMRunner.java
@@ -0,0 +1,368 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.yarn.client;
+
+import java.io.File;
+import java.io.FileWriter;
+import java.io.IOException;
+import java.io.PrintStream;
+import java.io.PrintWriter;
+import java.util.Map;
+
+import org.apache.drill.yarn.core.AppSpec;
+import org.apache.drill.yarn.core.DoYUtil;
+import org.apache.drill.yarn.core.DrillOnYarnConfig;
+import org.apache.drill.yarn.core.LaunchSpec;
+import org.apache.drill.yarn.core.YarnClientException;
+import org.apache.drill.yarn.core.YarnRMClient;
+import org.apache.hadoop.yarn.api.ApplicationConstants;
+import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationResponse;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.LocalResource;
+import org.apache.hadoop.yarn.api.records.YarnApplicationState;
+
+import com.typesafe.config.Config;
+
+/**
+ * Launch the AM through YARN. Builds the launch description, then tracks
+ * the launch operation itself. Finally, provides the user with links to
+ * track the AM both through YARN and via the AM's own web UI.
+ */
+
+public class AMRunner {
+ private Config config;
+ private boolean verbose;
+ private ApplicationId appId;
+ public Map<String, LocalResource> resources;
+ public String drillArchivePath;
+ public String siteArchivePath;
+ public String remoteDrillHome;
+ public String remoteSiteDir;
+ private YarnRMClient client;
+ private GetNewApplicationResponse appResponse;
+ private boolean dryRun;
+
+ public AMRunner(Config config, boolean verbose, boolean dryRun) {
+ this.config = config;
+ this.verbose = verbose;
+ this.dryRun = dryRun;
+ }
+
+ public void run() throws ClientException {
+ connectToYarn();
+ if (dryRun) {
+ doDryRun();
+ } else {
+ doLaunch();
+ }
+ }
+
+ private void connectToYarn() {
+ System.out.print("Loading YARN Config...");
+ client = new YarnRMClient();
+ System.out.println(" Loaded.");
+ }
+
+ private void doDryRun() throws ClientException {
+ AppSpec master = buildSpec();
+ dump(master, System.out);
+ }
+
+ private void doLaunch() throws ClientException {
+ createApp();
+ AppSpec master = buildSpec();
+ if (verbose) {
+ dump(master, System.out);
+ }
+ validateResources(master);
+ launchApp(master);
+ waitForStartAndReport(master.appName);
+ writeAppIdFile();
+ }
+
+ private void dump(AppSpec master, PrintStream out) {
+ out.println("----------------------------------------------");
+ out.println("Application Master Launch Spec");
+ master.dump(out);
+ out.println("----------------------------------------------");
+ }
+
+ private AppSpec buildSpec() throws ClientException {
+ AppSpec master = new AppSpec();
+
+ // Heap memory
+
+ String heapMem = config.getString( DrillOnYarnConfig.AM_HEAP );
+ master.env.put( DrillOnYarnConfig.AM_HEAP_ENV_VAR, heapMem );
+
+ // Any additional VM arguments from the config file.
+
+ addIfSet( master, DrillOnYarnConfig.AM_VM_ARGS, DrillOnYarnConfig.AM_JAVA_OPTS_ENV_VAR );
+
+ // Any user specified override jars
+ // Not really needed by the AM.
+
+ addIfSet( master, DrillOnYarnConfig.AM_PREFIX_CLASSPATH, DrillOnYarnConfig.DRILL_CLASSPATH_PREFIX_ENV_VAR );
+
+ // Any user specified classpath.
+
+ addIfSet( master, DrillOnYarnConfig.AM_CLASSPATH, DrillOnYarnConfig.DRILL_CLASSPATH_ENV_VAR );
+
+ // Any user-specified library path
+
+ addIfSet( master, DrillOnYarnConfig.JAVA_LIB_PATH, DrillOnYarnConfig.DOY_LIBPATH_ENV_VAR );
+
+ // AM logs (of which there are none.
+ // Relies on the LOG_DIR_EXPANSION_VAR marker which is replaced by
+ // the container log directory.
+ // Must be set for the AM to prevent drill-config.sh from trying to create
+ // the log directory in $DRILL_HOME (which won't be writable under YARN.)
+
+ if (!config.getBoolean(DrillOnYarnConfig.DISABLE_YARN_LOGS)) {
+ master.env.put("DRILL_YARN_LOG_DIR",
+ ApplicationConstants.LOG_DIR_EXPANSION_VAR);
+ }
+
+ // AM launch script
+ // The drill home location is either a non-localized location,
+ // or, more typically, the expanded Drill directory under the
+ // container's working directory. When the localized directory,
+ // we rely on the fact that the current working directory is
+ // set to the container directory, so we just need the name
+ // of the Drill folder under the cwd.
+
+ master.command = remoteDrillHome + "/bin/drill-am.sh";
+
+ // If site dir, add that as an argument.
+
+ if ( remoteSiteDir != null ) {
+ master.cmdArgs.add( "--site" );
+ master.cmdArgs.add( remoteSiteDir );
+ }
+
+ // Strangely, YARN has no way to tell an AM what its app ID
+ // is. So, we pass it along here.
+
+ String appIdStr = dryRun ? "Unknown" : appId.toString();
+ master.env.put( DrillOnYarnConfig.APP_ID_ENV_VAR, appIdStr );
+
+ // Debug launch: dumps environment variables and other information
+ // in the launch script.
+
+ if ( config.getBoolean( DrillOnYarnConfig.AM_DEBUG_LAUNCH ) ) {
+ master.env.put( DrillOnYarnConfig.DRILL_DEBUG_ENV_VAR, "1" );
+ }
+
+ // If localized, add the drill and optionally site archive.
+
+ if ( config.getBoolean( DrillOnYarnConfig.LOCALIZE_DRILL) ) {
+
+ // Also, YARN has no way to tell an AM what localized resources are
+ // available, so we pass them along as environment variables.
+
+ master.env.put( DrillOnYarnConfig.DRILL_ARCHIVE_ENV_VAR, drillArchivePath );
+ if ( siteArchivePath != null ) {
+ master.env.put( DrillOnYarnConfig.SITE_ARCHIVE_ENV_VAR, siteArchivePath );
+ }
+ }
+
+ // Localized resources
+
+ master.resources.putAll( resources );
+
+ // Container specification.
+
+ master.memoryMb = config.getInt( DrillOnYarnConfig.AM_MEMORY );
+ master.vCores = config.getInt( DrillOnYarnConfig.AM_VCORES );
+ master.disks = config.getDouble( DrillOnYarnConfig.AM_DISKS );
+ master.appName = config.getString( DrillOnYarnConfig.APP_NAME );
+ master.queueName = config.getString( DrillOnYarnConfig.YARN_QUEUE );
+ master.priority = config.getInt( DrillOnYarnConfig.YARN_PRIORITY );
+ master.nodeLabelExpr = config.getString( DrillOnYarnConfig.AM_NODE_LABEL_EXPR );
+ return master;
+ }
+
+ private void addIfSet(LaunchSpec spec, String configParam, String envVar) {
+ String value = config.getString(configParam);
+ if (!DoYUtil.isBlank(value)) {
+ spec.env.put(envVar, value);
+ }
+ }
+
+ private void createApp() throws ClientException {
+ try {
+ appResponse = client.createAppMaster();
+ } catch (YarnClientException e) {
+ throw new ClientException("Failed to allocate Drill application master",
+ e);
+ }
+ appId = appResponse.getApplicationId();
+ System.out.println("Application ID: " + appId.toString());
+ }
+
+ private void validateResources( AppSpec master ) throws ClientException {
+
+ // Memory and core checks per YARN app specs.
+
+ int maxMemory = appResponse.getMaximumResourceCapability().getMemory();
+ int maxCores = appResponse.getMaximumResourceCapability().getVirtualCores();
+ if (verbose) {
+ System.out.println("Max Memory: " + maxMemory);
+ System.out.println("Max Cores: " + maxCores);
+ }
+
+ // YARN behaves very badly if we request a container larger than the
+ // maximum.
+
+ if (master.memoryMb > maxMemory) {
+ throw new ClientException( "YARN maximum memory is " + maxMemory
+ + " but the application master requests " + master.memoryMb );
+ }
+ if (master.vCores > maxCores) {
+ throw new ClientException("YARN maximum vcores is " + maxCores
+ + " but the application master requests " + master.vCores);
+ }
+
+ // Verify the limits for the Drillbit as well.
+
+ if (config.getInt(DrillOnYarnConfig.DRILLBIT_MEMORY) > maxMemory) {
+ throw new ClientException(
+ "YARN maximum memory is " + maxMemory + " but the Drillbit requests "
+ + config.getInt(DrillOnYarnConfig.DRILLBIT_MEMORY));
+ }
+ if (config.getInt(DrillOnYarnConfig.DRILLBIT_VCORES) > maxCores) {
+ throw new ClientException("YARN maximum vcores is " + maxCores
+ + " but the Drillbit requests "
+ + config.getInt(DrillOnYarnConfig.DRILLBIT_VCORES));
+ }
+ }
+
+ private void launchApp(AppSpec master) throws ClientException {
+ try {
+ client.submitAppMaster(master);
+ } catch (YarnClientException e) {
+ throw new ClientException("Failed to start Drill application master", e);
+ }
+ }
+
+ /**
+ * Write the app id file needed for subsequent commands. The app id file is
+ * the only way we know the YARN application associated with our Drill-on-YARN
+ * session. This file is ready by subsequent status, resize and stop commands
+ * so we can find our Drill AM on the YARN cluster.
+ *
+ * @throws ClientException
+ */
+
+ private void writeAppIdFile() throws ClientException {
+ // Write the appid file that lets us work with the app later
+ // (Analogous to a pid file.)
+ // File goes into the directory above Drill Home (which should be the
+ // folder that contains the localized archive) and is named for the
+ // ZK cluster (to ensure that the name is a valid file name.)
+
+ File appIdFile = ClientCommand.getAppIdFile();
+ try {
+ PrintWriter writer = new PrintWriter(new FileWriter(appIdFile));
+ writer.println(appId);
+ writer.close();
+ } catch (IOException e) {
+ throw new ClientException(
+ "Failed to write appid file: " + appIdFile.getAbsolutePath());
+ }
+ }
+
+ /**
+ * Poll YARN to track the launch process of the application so that we can
+ * wait until the AM is live before pointing the user to the AM's web UI.
+ */
+
+ private class StartMonitor {
+ StatusCommand.Reporter reporter;
+ private YarnApplicationState state;
+ private int pollWaitSec;
+ private int startupWaitSec;
+
+ public StartMonitor() {
+ pollWaitSec = config.getInt(DrillOnYarnConfig.CLIENT_POLL_SEC);
+ if (pollWaitSec < 1) {
+ pollWaitSec = 1;
+ }
+ startupWaitSec = config.getInt(DrillOnYarnConfig.CLIENT_START_WAIT_SEC);
+ }
+
+ void run(String appName) throws ClientException {
+ System.out.print("Launching " + appName + "...");
+ reporter = new StatusCommand.Reporter(client);
+ reporter.getReport();
+ if (!reporter.isStarting()) {
+ return;
+ }
+ updateState(reporter.getState());
+ try {
+ int attemptCount = startupWaitSec / pollWaitSec;
+ for (int attempt = 0; attempt < attemptCount; attempt++) {
+ if (!poll()) {
+ break;
+ }
+ }
+ } finally {
+ System.out.println();
+ }
+ reporter.display(verbose, true);
+ if (reporter.isStarting()) {
+ System.out.println(
+ "Application Master is slow to start, use the 'status' command later to check status.");
+ }
+ }
+
+ private boolean poll() throws ClientException {
+ try {
+ Thread.sleep(pollWaitSec * 1000);
+ } catch (InterruptedException e) {
+ return false;
+ }
+ reporter.getReport();
+ if (!reporter.isStarting()) {
+ return false;
+ }
+ YarnApplicationState newState = reporter.getState();
+ if (newState == state) {
+ System.out.print(".");
+ return true;
+ }
+ System.out.println();
+ updateState(newState);
+ return true;
+ }
+
+ private void updateState(YarnApplicationState newState) {
+ state = newState;
+ if (verbose) {
+ System.out.print("Application State: ");
+ System.out.println(state.toString());
+ System.out.print("Starting...");
+ }
+ }
+ }
+
+ private void waitForStartAndReport(String appName) throws ClientException {
+ StartMonitor monitor = new StartMonitor();
+ monitor.run(appName);
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/drill/blob/f2ac8749/drill-yarn/src/main/java/org/apache/drill/yarn/client/CleanCommand.java
----------------------------------------------------------------------
diff --git a/drill-yarn/src/main/java/org/apache/drill/yarn/client/CleanCommand.java b/drill-yarn/src/main/java/org/apache/drill/yarn/client/CleanCommand.java
new file mode 100644
index 0000000..1fcba2d
--- /dev/null
+++ b/drill-yarn/src/main/java/org/apache/drill/yarn/client/CleanCommand.java
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.yarn.client;
+
+import java.io.File;
+
+import org.apache.drill.yarn.core.DfsFacade;
+import org.apache.drill.yarn.core.DrillOnYarnConfig;
+
+import com.typesafe.config.Config;
+
+import org.apache.drill.yarn.core.DfsFacade.DfsFacadeException;
+
+public class CleanCommand extends ClientCommand {
+ private Config config;
+ private DfsFacade dfs;
+
+ @Override
+ public void run() throws ClientException {
+ config = DrillOnYarnConfig.config();
+ if (!isLocalized()) {
+ System.out.println("Not using localized files; nothing to clean.");
+ return;
+ }
+ connectToDfs();
+ removeDrillArchive();
+ removeSiteArchive();
+ }
+
+ public boolean isLocalized() {
+ return config.getBoolean(DrillOnYarnConfig.LOCALIZE_DRILL);
+ }
+
+ protected void connectToDfs() throws ClientException {
+ try {
+ System.out.print("Connecting to DFS...");
+ dfs = new DfsFacade(config);
+ dfs.connect();
+ System.out.println(" Connected.");
+ } catch (DfsFacadeException e) {
+ System.out.println("Failed.");
+ throw new ClientException("Failed to connect to DFS", e);
+ }
+ }
+
+ private void removeDrillArchive() {
+ String localArchivePath = config
+ .getString(DrillOnYarnConfig.DRILL_ARCHIVE_PATH);
+ String archiveName = new File(localArchivePath).getName();
+ removeArchive(archiveName);
+ }
+
+ private void removeArchive(String archiveName) {
+ System.out.print("Removing " + archiveName + " ...");
+ try {
+ dfs.removeDrillFile(archiveName);
+ System.out.println(" Removed");
+ ;
+ } catch (DfsFacadeException e) {
+ System.out.println();
+ System.err.println(e.getMessage());
+ }
+ }
+
+ private void removeSiteArchive() {
+ DrillOnYarnConfig doyConfig = DrillOnYarnConfig.instance();
+ if (!doyConfig.hasSiteDir()) {
+ return;
+ }
+ String archiveName = DrillOnYarnConfig.SITE_ARCHIVE_NAME;
+ removeArchive(archiveName);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/drill/blob/f2ac8749/drill-yarn/src/main/java/org/apache/drill/yarn/client/ClientCommand.java
----------------------------------------------------------------------
diff --git a/drill-yarn/src/main/java/org/apache/drill/yarn/client/ClientCommand.java b/drill-yarn/src/main/java/org/apache/drill/yarn/client/ClientCommand.java
new file mode 100644
index 0000000..469d04c
--- /dev/null
+++ b/drill-yarn/src/main/java/org/apache/drill/yarn/client/ClientCommand.java
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.yarn.client;
+
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.FileReader;
+import java.io.IOException;
+
+import org.apache.drill.yarn.core.DrillOnYarnConfig;
+import org.apache.drill.yarn.core.YarnRMClient;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.util.ConverterUtils;
+
+public abstract class ClientCommand {
+ protected CommandLineOptions opts;
+
+ public void setOpts(CommandLineOptions opts) {
+ this.opts = opts;
+ }
+
+ public abstract void run() throws ClientException;
+
+ /**
+ * Return the path to the app id file. The file goes into the directory above
+ * Drill Home (which should be the folder that contains the localized archive)
+ * and is named for the ZK cluster (to ensure that the name is a valid file
+ * name.)
+ *
+ * @return
+ */
+ protected static File getAppIdFile() {
+ return DrillOnYarnConfig.instance().getLocalAppIdFile();
+ }
+
+ protected ApplicationId checkAppId() throws ClientException {
+ String appIdStr;
+ if (opts.appId != null) {
+ appIdStr = opts.appId;
+ } else {
+ File appIdFile = getAppIdFile();
+ appIdStr = loadAppId(appIdFile);
+ if (appIdStr == null) {
+ throw new ClientException(
+ "No Drill cluster is running (did not find file appid file: "
+ + appIdFile.toString() + ")");
+ }
+ }
+ return ConverterUtils.toApplicationId(appIdStr);
+ }
+
+ protected YarnRMClient getClient() throws ClientException {
+ return new YarnRMClient(checkAppId());
+ }
+
+ protected String loadAppId(File appIdFile) {
+ BufferedReader reader = null;
+ String appIdStr;
+ try {
+ reader = new BufferedReader(new FileReader(appIdFile));
+ appIdStr = reader.readLine();
+ if (appIdStr != null) {
+ appIdStr = appIdStr.trim();
+ }
+ } catch (FileNotFoundException e) {
+ return null;
+ } catch (IOException e) {
+ return null;
+ } finally {
+ try {
+ if (reader != null) {
+ reader.close();
+ }
+ } catch (IOException e) {
+ // Ignore
+ }
+ }
+ return appIdStr;
+ }
+
+ protected void removeAppIdFile() {
+ getAppIdFile().delete();
+ }
+}
http://git-wip-us.apache.org/repos/asf/drill/blob/f2ac8749/drill-yarn/src/main/java/org/apache/drill/yarn/client/ClientContext.java
----------------------------------------------------------------------
diff --git a/drill-yarn/src/main/java/org/apache/drill/yarn/client/ClientContext.java b/drill-yarn/src/main/java/org/apache/drill/yarn/client/ClientContext.java
new file mode 100644
index 0000000..377b3b3
--- /dev/null
+++ b/drill-yarn/src/main/java/org/apache/drill/yarn/client/ClientContext.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.yarn.client;
+
+import java.io.PrintStream;
+
+/**
+ * Provides a static set of contextual operations that can be configured one way
+ * for production, a separate way for unit tests.
+ */
+
+public class ClientContext {
+
+ private static ClientContext instance;
+ public static PrintStream err = System.err;
+ public static PrintStream out = System.out;
+
+ public static void init() {
+ init(new ClientContext());
+ }
+
+ protected static void init(ClientContext instance) {
+ ClientContext.instance = instance;
+ }
+
+ public static ClientContext instance() {
+ return instance;
+ }
+
+ public void exit(int exitCode) {
+ System.exit(exitCode);
+ }
+}
http://git-wip-us.apache.org/repos/asf/drill/blob/f2ac8749/drill-yarn/src/main/java/org/apache/drill/yarn/client/ClientException.java
----------------------------------------------------------------------
diff --git a/drill-yarn/src/main/java/org/apache/drill/yarn/client/ClientException.java b/drill-yarn/src/main/java/org/apache/drill/yarn/client/ClientException.java
new file mode 100644
index 0000000..24c062b
--- /dev/null
+++ b/drill-yarn/src/main/java/org/apache/drill/yarn/client/ClientException.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.yarn.client;
+
+public class ClientException extends Exception {
+ private static final long serialVersionUID = 1L;
+
+ public ClientException(String msg) {
+ super(msg);
+ }
+
+ public ClientException(String msg, Exception e) {
+ super(msg, e);
+ }
+
+ public ClientException(Exception e) {
+ super(e.getMessage(), e);
+ }
+}
http://git-wip-us.apache.org/repos/asf/drill/blob/f2ac8749/drill-yarn/src/main/java/org/apache/drill/yarn/client/CommandLineOptions.java
----------------------------------------------------------------------
diff --git a/drill-yarn/src/main/java/org/apache/drill/yarn/client/CommandLineOptions.java b/drill-yarn/src/main/java/org/apache/drill/yarn/client/CommandLineOptions.java
new file mode 100644
index 0000000..174265d
--- /dev/null
+++ b/drill-yarn/src/main/java/org/apache/drill/yarn/client/CommandLineOptions.java
@@ -0,0 +1,230 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.yarn.client;
+
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+/**
+ * Drill YARN client command line options.
+ * <p><pre>
+ * DrillYarnClient -h|--help |
+ * start |
+ * stop |
+ * status |
+ * resize [+|-]n
+ * </pre></p>
+ * <ul>
+ * <li>help: Prints command line usage</li>
+ * <li>start: starts the defined cluster</li>
+ * <li>stop: stops the defined cluster</li>
+ * <li>resize: adds (+n), removes (-n) or resizes (n) the cluster</li>
+ * <li>status: prints status about the cluster</li>
+ * </ul>
+ * <p>
+ * This is a do-it-yourself parser because the command line parser
+ * used by Drill does not accept parameters (arguments) without a dash,
+ * and does not accept arguments (such as resize -3) with a dash.
+ */
+
+public class CommandLineOptions {
+
+ public enum Command {
+ HELP( "help", "Provide description of usage."),
+
+ /**
+ * Primary command to upload the application archive and start the Drill cluster.
+ */
+
+ START( "start", "Start the cluster."),
+
+ // Removed at QA request. QA wants a "real" restart. Also, upload of the
+ // archive is fast enough that a "start without upload" option is not really
+ // needed.
+// /**
+// * Convenience method when debugging, testing. Restarts the cluster without the
+// * archive upload; assumes the upload was already done.
+// */
+//
+// RESTART( "restart", "Restart the cluster (without archive upload)."),
+
+ /**
+ * Primary command to stop a running cluster.
+ */
+
+ STOP( "stop", "Stop the cluster."),
+
+ /**
+ * Primary command to get the status of a running cluster.
+ */
+
+ STATUS( "status", "Provide the status of the cluster."),
+
+ RESIZE( "resize", "Resize the cluster +n: add nodes, -n: remove nodes, n resize to given size."),
+
+ TEST( null, null ),
+
+ /**
+ * Convenience command to display the effective configuration settings to
+ * diagnose problems.
+ */
+
+ DESCRIBE( "describe", "Display and validate configuration." ),
+
+ /**
+ * Convenience command to upload the application archive to test the DFS
+ * settings without launching the Drill cluster.
+ */
+
+ UPLOAD( "upload", "Upload archives to validate DFS." ),
+
+ /**
+ * Convenience command to remove the Drill-on-YARN archive(s) from DFS.
+ * Note: doing this while a Drill cluster is running will cause subsequent
+ * Drillbit launches to fail.
+ */
+
+ CLEAN( "clean", "Remove archives stored in DFS." );
+
+ private String cmd;
+ private String descrip;
+
+ private Command(String cmd, String descrip) {
+ this.cmd = cmd;
+ this.descrip = descrip;
+ }
+
+ public boolean isMatch(String arg) {
+ String key = (cmd == null) ? toString() : cmd;
+ return key.equalsIgnoreCase(arg);
+ }
+
+ public boolean isHidden() {
+ return descrip == null;
+ }
+
+ public String getCommand( ) { return cmd; }
+ public String getDescription( ) { return descrip; }
+ }
+
+ Command command;
+ public String appId;
+ public boolean dryRun;
+ public String resizePrefix;
+ public int resizeValue;
+ public boolean verbose = false;
+ public boolean force = false;
+
+ /**
+ * Parse the command line. Invalid option combinations result in the
+ * error option being set.
+ */
+ public boolean parse(String args[]) {
+ for (int i = 0; i < args.length; i++) {
+ String arg = args[i];
+ if (arg.equals("-h") || arg.equals("-?")) {
+ command = Command.HELP;
+ break;
+ }
+ if (arg.equals("-v") || arg.equals("--verbose")) {
+ verbose = true;
+ continue;
+ }
+ if (arg.equals("-f") || arg.equals("--force")) {
+ force = true;
+ continue;
+ }
+ if (arg.equals("-d") || arg.equals("--dryrun")) {
+ dryRun = true;
+ continue;
+ }
+ if (arg.equals("-a") || arg.equals("--appid")) {
+ if (i + 1 == args.length) {
+ return false;
+ }
+ appId = args[++i];
+ continue;
+ }
+ if (command != null) {
+ command = null;
+ return false;
+ }
+
+ // Check if a command line word matches this command. Be nice,
+ // allow -foo and --foo in addition to the "proper" foo.
+
+ String cmdStr = arg;
+ if (cmdStr.startsWith("--")) {
+ cmdStr = arg.substring(2);
+ } else if (cmdStr.startsWith("-")) {
+ cmdStr = cmdStr.substring(1);
+ }
+ for (Command cmd : Command.values()) {
+ if (cmd.isMatch(cmdStr)) {
+ command = cmd;
+ if (command == Command.RESIZE) {
+ if (i + 1 == args.length) {
+ command = null;
+ break;
+ }
+ parseResizeOption(args[++i]);
+ }
+ break;
+ }
+ }
+ }
+ return true;
+ }
+
+ private void parseResizeOption(String resize) {
+ Pattern p = Pattern.compile("([+-]?)(\\d+)");
+ Matcher m = p.matcher(resize);
+ if (m.matches()) {
+ resizePrefix = m.group(1);
+ resizeValue = Integer.parseInt(m.group(2));
+ } else {
+ command = null;
+ return;
+ }
+ }
+
+ public Command getCommand() {
+ return command;
+ }
+
+ public String getResizePrefix() {
+ return resizePrefix;
+ }
+
+ public int getResizeValue() {
+ return resizeValue;
+ }
+
+ public void usage() {
+ ClientContext.out.println(
+ "Usage: drill-on-yarn.sh [--site site-dir] command [-v|--verbose][-a app-id]");
+ ClientContext.out.println("Where command is one of:");
+ for (Command cmd : Command.values()) {
+ if (cmd.isHidden()) {
+ continue;
+ }
+ ClientContext.out
+ .println(" " + cmd.getCommand() + " - " + cmd.getDescription());
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/drill/blob/f2ac8749/drill-yarn/src/main/java/org/apache/drill/yarn/client/DrillOnYarn.java
----------------------------------------------------------------------
diff --git a/drill-yarn/src/main/java/org/apache/drill/yarn/client/DrillOnYarn.java b/drill-yarn/src/main/java/org/apache/drill/yarn/client/DrillOnYarn.java
new file mode 100644
index 0000000..587766a
--- /dev/null
+++ b/drill-yarn/src/main/java/org/apache/drill/yarn/client/DrillOnYarn.java
@@ -0,0 +1,176 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.yarn.client;
+
+
+import org.apache.drill.yarn.core.DoyConfigException;
+import org.apache.drill.yarn.core.DrillOnYarnConfig;
+import org.apache.log4j.BasicConfigurator;
+
+/**
+ * Client for the Drill-on-YARN integration. See YARN documentation
+ * for the role of a YARN client.
+ * <p>
+ * The client needs configuration information from drill-on-yarn.conf,
+ * the directory of which must be in the class path. It is put there
+ * by the drill-on-yarn.sh script.
+ * <p>
+ * The client also requires a debugging configuration file to be given
+ * explicitly as follows:<br>
+ * -Dlogback.configurationFile=/path/to/yarn-client-log.xml<br>
+ * The drillbit itself uses the default logging config file name of
+ * logback.xml; which contains references to system properties that are
+ * not defined in this client. The result of not including the log
+ * configuration file is that you'll see various var.name_IS_UNDEFINED
+ * files in the directory from which you launched the client.
+ * <p>
+ * The client accepts a command, creates a command object for that
+ * command, and executes it. There are a few main commands (start, stop),
+ * along with some management commands (status, resize), and a few commands
+ * mostly used for debugging and diagnosis (upload,etc.) Some commands
+ * are very similar, so a single command object may handle multiple user
+ * commands.
+ * <p>
+ * The client requires a working distributed file system (DFS), the
+ * configuration of which is given either implicitly, or in the Hadoop
+ * configuration files. Similarly, the client requires a working YARN
+ * deployment, again with either implicit configuration or configuration
+ * given in the Hadoop configuration. The Hadoop configuration must be
+ * on the class path when launching this client.
+ *
+ * <h3>Debugging</h3>
+ * <p>
+ * To debug this class, add two or three directories to your class path:
+ * <ul>
+ * <li>$DRILL_CONF_DIR (if using a separate site directory)</li>
+ * <li>$HADOOP_HOME/etc/hadoop</li>
+ * <li>$DRILL_HOME/conf</li>
+ * </ul>
+ * Note that these MUST be in the order listed since $DRILL_HOME/conf
+ * contains, by default, a version of core-site.xml that probably is
+ * NOT the one you want to use for YARN. For YARN, you want the one
+ * in $HADOOP_HOME/etc/hadoop.
+ * <p>
+ * Also, set the following VM argument:<br>
+ * -Dlogback.configurationFile=/path/to/drill/conf/yarn-client-log.xml<br>
+ * or<br>
+ * -Dlogback.configurationFile=/path/to/drill-site/yarn-client-log.xml<br>
+ */
+
+public class DrillOnYarn {
+ public static void main(String argv[]) {
+ BasicConfigurator.configure();
+ ClientContext.init();
+ run(argv);
+ }
+
+ public static void run(String argv[]) {
+ ClientContext context = ClientContext.instance();
+
+ // Parse command-line options.
+
+ CommandLineOptions opts = new CommandLineOptions();
+ if (!opts.parse(argv)) {
+ opts.usage();
+ context.exit(-1);
+ }
+ if (opts.getCommand() == null) {
+ opts.usage();
+ context.exit(-1);
+ }
+
+ // Load configuration.
+
+ try {
+ DrillOnYarnConfig.load().setClientPaths();
+ } catch (DoyConfigException e) {
+ ClientContext.err.println(e.getMessage());
+ context.exit(-1);
+ }
+
+ // Create the required command object.
+
+ ClientCommand cmd;
+ switch (opts.getCommand()) {
+ case UPLOAD:
+ cmd = new StartCommand(true, false);
+ break;
+ case START:
+ cmd = new StartCommand(true, true);
+ break;
+ // Removed at QA request. QA wants a "real" restart. Also, upload of the
+ // archive is fast enough that a "start without upload" option is not really
+ // needed.
+// case RESTART:
+// cmd = new StartCommand(false, true);
+// break;
+ case DESCRIBE:
+ cmd = new PrintConfigCommand();
+ break;
+ case STATUS:
+ cmd = new StatusCommand();
+ break;
+ case STOP:
+ cmd = new StopCommand();
+ break;
+ case CLEAN:
+ cmd = new CleanCommand();
+ break;
+ case RESIZE:
+ cmd = new ResizeCommand();
+ break;
+ default:
+ cmd = new HelpCommand();
+ }
+
+ // Run the command.
+
+ cmd.setOpts(opts);
+ try {
+ cmd.run();
+ } catch (ClientException e) {
+ displayError(opts, e);
+ context.exit(1);
+ }
+ }
+
+ private static void displayError(CommandLineOptions opts, ClientException e) {
+
+ // Show the Drill-provided explanation of the error.
+
+ ClientContext.err.println(e.getMessage());
+
+ // Add the underlying exception information, if any.
+
+ Throwable parent = e;
+ Throwable cause = e.getCause();
+ while (cause != null && cause != parent) {
+ ClientContext.err.print(" Caused by: ");
+ ClientContext.err.println(cause.getMessage());
+ parent = cause;
+ cause = cause.getCause();
+ }
+
+ // Include the full stack trace if requested.
+
+ if (opts.verbose) {
+ ClientContext.err.println("Full stack trace:");
+ e.printStackTrace(ClientContext.err);
+ }
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/drill/blob/f2ac8749/drill-yarn/src/main/java/org/apache/drill/yarn/client/FileUploader.java
----------------------------------------------------------------------
diff --git a/drill-yarn/src/main/java/org/apache/drill/yarn/client/FileUploader.java b/drill-yarn/src/main/java/org/apache/drill/yarn/client/FileUploader.java
new file mode 100644
index 0000000..ace2d03
--- /dev/null
+++ b/drill-yarn/src/main/java/org/apache/drill/yarn/client/FileUploader.java
@@ -0,0 +1,551 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.yarn.client;
+
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.io.PrintStream;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.drill.yarn.core.DfsFacade;
+import org.apache.drill.yarn.core.DoYUtil;
+import org.apache.drill.yarn.core.DoyConfigException;
+import org.apache.drill.yarn.core.DrillOnYarnConfig;
+import org.apache.drill.yarn.core.DfsFacade.DfsFacadeException;
+import org.apache.drill.yarn.core.DfsFacade.Localizer;
+import org.apache.hadoop.yarn.api.records.LocalResource;
+
+import com.typesafe.config.Config;
+
+/**
+ * Performs the file upload portion of the operation by uploading an archive to
+ * the target DFS system and directory. Records the uploaded archive so it may
+ * be used for localizing Drill in the launch step.
+ * <p>
+ * Some of the code is a bit of a dance so we can get information early to
+ * display in status messages.
+ * <p>
+ * This class handles x cases:
+ * <ol>
+ * <li>Non-localized, config in $DRILL_HOME/conf.</li>
+ * <li>Non-localized, config in a site directory.</li>
+ * <li>Localized, config in $DRILL_HOME.</li>
+ * <li>Localized, config in a site directory.</li>
+ * </ol>
+ * <p>
+ * The non-localized case adds complexity, but is very handy when doing
+ * development as it avoids the wait for the archives to up- and down-load. The
+ * non-localized mode is not advertised to users as it defeats one of the main
+ * benefits of YARN.
+ * <p>
+ * In the localized case, YARN is incomplete; there is no API to inform the AM
+ * of the set of localized files, so we pass the information along in
+ * environment variables. Also, tar is a bit annoying because it includes the
+ * root directory name when unpacking, so that the drill.tar.gz archive unpacks
+ * to, say, apache-drill.x.y.z. So, we must pass along the directory name as
+ * well.
+ * <p>
+ * All of this is further complicated by the way YARN needs detailed information
+ * to localize resources, and that YARN uses a "key" to identify localized
+ * resources, which becomes the directory name in the task's working folder.
+ * Thus, Drill becomes, say<br>
+ * $PWD/drill/apache-drill.x.y.z/bin, conf, ...<br>
+ * YARN provides PWD. The Drillbit launch script needs to know the next two
+ * directory names.
+ * <p>
+ * For efficiency, we omit uploading the Drill archive if one already exists in
+ * dfs and is the same size as the one on the client. We always upload the
+ * config archive (if needed) because config changes are likely to be one reason
+ * that someone (re)starts the Drill cluster.
+ */
+
+public abstract class FileUploader {
+ protected DrillOnYarnConfig doyConfig;
+ protected Config config;
+ protected DfsFacade dfs;
+ protected boolean dryRun;
+ protected boolean verbose;
+ protected File localDrillHome;
+ protected File localSiteDir;
+ protected File localDrillArchivePath;
+
+ public Map<String, LocalResource> resources = new HashMap<>();
+ public String drillArchivePath;
+ public String siteArchivePath;
+ public String remoteDrillHome;
+ public String remoteSiteDir;
+
+ public static class NonLocalized extends FileUploader {
+ public NonLocalized(boolean dryRun, boolean verbose) {
+ super(dryRun, verbose);
+ }
+
+ @Override
+ public void run() throws ClientException {
+ setup();
+ prepareDrillHome();
+ if (hasSiteDir()) {
+ prepareSiteDir();
+ }
+ if (verbose || dryRun) {
+ dump(System.out);
+ }
+ }
+
+ private void prepareDrillHome() throws ClientException {
+ // We need the drill home property. The client can figure out the
+ // Drill home, but the AM must be told.
+
+ String drillHomeProp = config.getString(DrillOnYarnConfig.DRILL_HOME);
+ if (DoYUtil.isBlank(drillHomeProp)) {
+ System.out.println("Warning: non-localized run "
+ + DrillOnYarnConfig.DRILL_HOME + " is not set.");
+ System.out.println(
+ "Assuming remote Drill home is the same as the local location: "
+ + localDrillHome.getAbsolutePath());
+ }
+ }
+
+ private void prepareSiteDir() throws ClientException {
+ String siteDirProp = config.getString(DrillOnYarnConfig.SITE_DIR);
+ if (DoYUtil.isBlank(siteDirProp)) {
+ System.out.println("Warning: non-localized run "
+ + DrillOnYarnConfig.SITE_DIR + " is not set.");
+ System.out.println(
+ "Assuming remote Drill site is the same as the local location: "
+ + localSiteDir.getAbsolutePath());
+ }
+ }
+ }
+
+ public static class ReuseFiles extends FileUploader {
+ public ReuseFiles(boolean dryRun, boolean verbose) {
+ super(dryRun, verbose);
+ }
+
+ @Override
+ public void run() throws ClientException {
+ setup();
+ checkDrillArchive();
+ if (hasSiteDir()) {
+ checkSiteArchive();
+ }
+ if (verbose || dryRun) {
+ dump(System.out);
+ }
+ }
+
+ /**
+ * Upload the Drill archive if desired. Skip the upload if the file already
+ * exists in dfs and is the same size as the local file. However using the
+ * force option can force an upload even if the sizes match.
+ * <p>
+ * Prepares the information needed to tell YARN and the AM about the
+ * localized archive.
+ * <p>
+ * Note that the Drill archive is not created by this client; it must
+ * already exist on disk. Typically, it is just the archive downloaded from
+ * Apache or some other distribution. The uploaded archive retains the name
+ * of the archive in the client, which may be useful to check the version of
+ * the uploaded code based on the file name.
+ *
+ * @throws ClientException
+ */
+
+ private void checkDrillArchive() throws ClientException {
+ // Print the progress message here because doing the connect takes
+ // a while and the message makes it look like we're doing something.
+
+ DfsFacade.Localizer localizer = makeDrillLocalizer();
+ connectToDfs();
+ try {
+ if (!localizer.destExists()) {
+ throw new ClientException(
+ "Drill archive not found in DFS: " + drillArchivePath);
+ }
+ } catch (IOException e) {
+ throw new ClientException(
+ "Failed to check existence of " + drillArchivePath, e);
+ }
+ if (!localDrillArchivePath.exists()) {
+ return;
+ }
+ if (!localizer.filesMatch()) {
+ System.out.println(
+ "Warning: Drill archive on DFS does not match the local version.");
+ }
+ defineResources(localizer, DrillOnYarnConfig.DRILL_ARCHIVE_KEY);
+ }
+
+ private void checkSiteArchive() throws ClientException {
+ // Print the progress message here because doing the connect takes
+ // a while and the message makes it look like we're doing something.
+
+ DfsFacade.Localizer localizer = makeSiteLocalizer(null);
+ try {
+ if (!localizer.destExists()) {
+ throw new ClientException(
+ "Drill archive not found in DFS: " + drillArchivePath);
+ }
+ } catch (IOException e) {
+ throw new ClientException(
+ "Failed to check existence of " + drillArchivePath, e);
+ }
+ defineResources(localizer, DrillOnYarnConfig.SITE_ARCHIVE_KEY);
+ }
+ }
+
+ public static class UploadFiles extends FileUploader {
+ private boolean force;
+
+ public UploadFiles(boolean force, boolean dryRun, boolean verbose) {
+ super(dryRun, verbose);
+ this.force = force;
+ }
+
+ @Override
+ public void run() throws ClientException {
+ setup();
+ uploadDrillArchive();
+ if (hasSiteDir()) {
+ uploadSite();
+ }
+ if (verbose || dryRun) {
+ dump(System.out);
+ }
+ }
+
+ /**
+ * Create a temporary archive of the site directory and upload it to DFS. We
+ * always upload the site; we never reuse an existing one.
+ *
+ * @throws ClientException
+ */
+
+ private void uploadSite() throws ClientException {
+ File siteArchive = createSiteArchive();
+ try {
+ uploadSiteArchive(siteArchive);
+ } finally {
+ siteArchive.delete();
+ }
+ }
+
+ /**
+ * Upload the Drill archive if desired. Skip the upload if the file already
+ * exists in dfs and is the same size as the local file. However using the
+ * force option can force an upload even if the sizes match.
+ * <p>
+ * Prepares the information needed to tell YARN and the AM about the
+ * localized archive.
+ * <p>
+ * Note that the Drill archive is not created by this client; it must
+ * already exist on disk. Typically, it is just the archive downloaded from
+ * Apache or some other distribution. The uploaded archive retains the name
+ * of the archive in the client, which may be useful to check the version of
+ * the uploaded code based on the file name.
+ *
+ * @throws ClientException
+ */
+
+ private void uploadDrillArchive() throws ClientException {
+ // Print the progress message here because doing the connect takes
+ // a while and the message makes it look like we're doing something.
+
+ connectToDfs();
+ DfsFacade.Localizer localizer = makeDrillLocalizer();
+ boolean needsUpload = force || !localizer.filesMatch();
+
+ if (needsUpload) {
+ // Thoroughly check the Drill archive. Errors with the archive seem a
+ // likely source of confusion, so provide detailed error messages for
+ // common cases. Don't bother with these checks if no upload is needed.
+
+ if (!localDrillArchivePath.exists()) {
+ throw new ClientException(
+ "Drill archive not found: " + localDrillArchivePath.getAbsolutePath());
+ }
+ if (!localDrillArchivePath.canRead()) {
+ throw new ClientException(
+ "Drill archive is not readable: " + localDrillArchivePath.getAbsolutePath());
+ }
+ if (localDrillArchivePath.isDirectory()) {
+ throw new ClientException(
+ "Drill archive cannot be a directory: " + localDrillArchivePath.getAbsolutePath());
+ }
+ }
+
+ drillArchivePath = localizer.getDestPath();
+ if (needsUpload) {
+ if (dryRun) {
+ System.out.print(
+ "Upload " + localDrillArchivePath.getAbsolutePath() + " to " + drillArchivePath);
+ } else {
+ System.out.print("Uploading " + localDrillArchivePath.getAbsolutePath() + " to "
+ + drillArchivePath + " ... ");
+ upload(localizer);
+ }
+ } else {
+ System.out.println(
+ "Using existing Drill archive in DFS: " + drillArchivePath);
+ }
+
+ defineResources(localizer, DrillOnYarnConfig.DRILL_ARCHIVE_KEY);
+ }
+
+ /**
+ * Run the tar command to archive the site directory into a temporary
+ * archive which is then uploaded to DFS using a standardized name. The site
+ * directory is always uploaded since configuration is subject to frequent
+ * changes.
+ *
+ * @return
+ * @throws ClientException
+ */
+
+ private File createSiteArchive() throws ClientException {
+ File siteArchiveFile;
+ try {
+ siteArchiveFile = File.createTempFile("drill-site-", ".tar.gz");
+ } catch (IOException e) {
+ throw new ClientException("Failed to create site archive temp file", e);
+ }
+ String cmd[] = new String[] { "tar", "-C", localSiteDir.getAbsolutePath(),
+ "-czf", siteArchiveFile.getAbsolutePath(), "." };
+ List<String> cmdList = Arrays.asList(cmd);
+ String cmdLine = DoYUtil.join(" ", cmdList);
+ if (dryRun) {
+ System.out.print("Site archive command: ");
+ System.out.println(cmdLine);
+ return siteArchiveFile;
+ }
+
+ ProcessBuilder builder = new ProcessBuilder(cmdList);
+ builder.redirectErrorStream(true);
+ Process proc;
+ try {
+ proc = builder.start();
+ } catch (IOException e) {
+ throw new ClientException("Failed to launch tar process: " + cmdLine,
+ e);
+ }
+
+ // Should not be much output. But, we have to read it anyway to avoid
+ // blocking. We'll use the output if we encounter an error.
+
+ BufferedReader br = new BufferedReader(
+ new InputStreamReader(proc.getInputStream()));
+ StringBuilder buf = new StringBuilder();
+ try {
+ String line;
+ while ((line = br.readLine()) != null) {
+ buf.append(line);
+ buf.append("\n");
+ }
+ br.close();
+ } catch (IOException e) {
+ throw new ClientException("Failed to read output from tar command", e);
+ }
+ try {
+ proc.waitFor();
+ } catch (InterruptedException e) {
+ // Won't occur.
+ }
+ if (proc.exitValue() != 0) {
+ String msg = buf.toString().trim();
+ throw new ClientException("Tar of site directory failed: " + msg);
+ }
+ return siteArchiveFile;
+ }
+
+ /**
+ * Upload the site archive. For debugging, the client provides the option to
+ * use existing files, which users should not do in production.
+ *
+ * @param siteArchive
+ * @throws ClientException
+ */
+
+ private void uploadSiteArchive(File siteArchive) throws ClientException {
+ DfsFacade.Localizer localizer = makeSiteLocalizer(siteArchive);
+
+ if (dryRun) {
+ System.out.println("Upload site archive to " + siteArchivePath);
+ } else {
+ System.out
+ .print("Uploading site directory " + localSiteDir.getAbsolutePath() +
+ " to " + siteArchivePath + " ... ");
+ upload(localizer);
+ }
+ defineResources(localizer, DrillOnYarnConfig.SITE_ARCHIVE_KEY);
+ }
+ }
+
+ public FileUploader(boolean dryRun, boolean verbose) {
+ doyConfig = DrillOnYarnConfig.instance();
+ this.config = doyConfig.getConfig();
+ this.dryRun = dryRun;
+ this.verbose = verbose;
+ }
+
+ public abstract void run() throws ClientException;
+
+ /**
+ * Common setup of the Drill and site directories.
+ *
+ * @throws ClientException
+ */
+
+ protected void setup() throws ClientException {
+
+ // Local and remote Drill home locations.
+
+ localDrillHome = doyConfig.getLocalDrillHome();
+ try {
+ remoteDrillHome = doyConfig.getRemoteDrillHome();
+ } catch (DoyConfigException e) {
+ throw new ClientException(e);
+ }
+
+ // Site directory is optional. Local and remote locations, if provided.
+ // Check that the site directory is an existing directory.
+
+ localSiteDir = doyConfig.getLocalSiteDir();
+ if (hasSiteDir()) {
+ if (!localSiteDir.isDirectory()) {
+ throw new ClientException(
+ "Drill site dir not a directory: " + localSiteDir);
+ }
+ remoteSiteDir = doyConfig.getRemoteSiteDir();
+ }
+
+ // Disclaimer that this is just a dry run when that option is selected.
+
+ if (dryRun) {
+ System.out.println("Dry run only.");
+ }
+ }
+
+ public boolean hasSiteDir() {
+ return localSiteDir != null;
+ }
+
+ /**
+ * Report whether the user wants to localize (upload) Drill files, or just use
+ * files already on the worker nodes.
+ *
+ * @return
+ */
+
+ public boolean isLocalized() {
+ return config.getBoolean(DrillOnYarnConfig.LOCALIZE_DRILL);
+ }
+
+ protected void connectToDfs() throws ClientException {
+ try {
+ System.out.print("Connecting to DFS...");
+ dfs = new DfsFacade(config);
+ dfs.connect();
+ System.out.println(" Connected.");
+ } catch (DfsFacadeException e) {
+ System.out.println("Failed.");
+ throw new ClientException("Failed to connect to DFS", e);
+ }
+ }
+
+ protected Localizer makeDrillLocalizer() throws ClientException {
+ String localArchivePath = config
+ .getString(DrillOnYarnConfig.DRILL_ARCHIVE_PATH);
+ if (DoYUtil.isBlank(localArchivePath)) {
+ throw new ClientException("Drill archive path ("
+ + DrillOnYarnConfig.DRILL_ARCHIVE_PATH + ") is not set.");
+ }
+
+ // Archive is either absolute, or relative to $DRILL_HOME.
+
+ localDrillArchivePath = new File(localArchivePath);
+ if (!localDrillArchivePath.isAbsolute()) {
+ localDrillArchivePath = new File(
+ DrillOnYarnConfig.instance().getLocalDrillHome(), localArchivePath);
+ }
+ DfsFacade.Localizer localizer = new DfsFacade.Localizer(dfs,
+ localDrillArchivePath, "Drill");
+ drillArchivePath = localizer.getDestPath();
+ return localizer;
+ }
+
+ protected Localizer makeSiteLocalizer(File siteArchive) {
+ DfsFacade.Localizer localizer = new DfsFacade.Localizer(dfs, siteArchive,
+ DrillOnYarnConfig.SITE_ARCHIVE_NAME, "Site");
+ siteArchivePath = localizer.getDestPath();
+ return localizer;
+ }
+
+ protected void upload(Localizer localizer) throws ClientException {
+ try {
+ localizer.upload();
+ } catch (DfsFacadeException e) {
+ System.out.println("Failed.");
+ throw new ClientException(
+ "Failed to upload " + localizer.getLabel() + " archive", e);
+ }
+ System.out.println("Uploaded.");
+ }
+
+ protected void defineResources(Localizer localizer, String keyProp)
+ throws ClientException {
+ String key = config.getString(keyProp);
+ try {
+ localizer.defineResources(resources, key);
+ } catch (DfsFacadeException e) {
+ throw new ClientException(
+ "Failed to get DFS status for " + localizer.getLabel() + " archive",
+ e);
+ }
+ }
+
+ protected void dump(PrintStream out) {
+ out.print("Localized: ");
+ out.println((isLocalized()) ? "Yes" : "No");
+ out.print("Has Site Dir: ");
+ out.println((hasSiteDir()) ? "Yes" : "No");
+ out.print("Local Drill home: ");
+ out.println(localDrillHome.getAbsolutePath());
+ out.print("Remote Drill home: ");
+ out.println(remoteDrillHome);
+ if (hasSiteDir()) {
+ out.print("Local Site dir: ");
+ out.println(localSiteDir.getAbsolutePath());
+ out.print("Remote Site dir: ");
+ out.println(remoteSiteDir);
+ }
+ if (isLocalized()) {
+ out.print("Drill archive DFS path: ");
+ out.println(drillArchivePath);
+ if (hasSiteDir()) {
+ out.print("Site archive DFS path: ");
+ out.println(siteArchivePath);
+ }
+ }
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/drill/blob/f2ac8749/drill-yarn/src/main/java/org/apache/drill/yarn/client/HelpCommand.java
----------------------------------------------------------------------
diff --git a/drill-yarn/src/main/java/org/apache/drill/yarn/client/HelpCommand.java b/drill-yarn/src/main/java/org/apache/drill/yarn/client/HelpCommand.java
new file mode 100644
index 0000000..3e7e5d9
--- /dev/null
+++ b/drill-yarn/src/main/java/org/apache/drill/yarn/client/HelpCommand.java
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.yarn.client;
+
+public class HelpCommand extends ClientCommand {
+ @Override
+ public void run() {
+ opts.usage();
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/drill/blob/f2ac8749/drill-yarn/src/main/java/org/apache/drill/yarn/client/KillCommand.java
----------------------------------------------------------------------
diff --git a/drill-yarn/src/main/java/org/apache/drill/yarn/client/KillCommand.java b/drill-yarn/src/main/java/org/apache/drill/yarn/client/KillCommand.java
new file mode 100644
index 0000000..8b7914c
--- /dev/null
+++ b/drill-yarn/src/main/java/org/apache/drill/yarn/client/KillCommand.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.yarn.client;
+
+import org.apache.drill.yarn.core.YarnClientException;
+import org.apache.drill.yarn.core.YarnRMClient;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+
+public class KillCommand extends ClientCommand {
+
+ @Override
+ public void run() throws ClientException {
+ ApplicationId appId = checkAppId();
+ if (appId == null) {
+ System.exit(-1);
+ }
+ YarnRMClient client = new YarnRMClient(appId);
+ try {
+ client.killApplication();
+ } catch (YarnClientException e) {
+ throw new ClientException(e);
+ }
+ System.out.println("Kill request sent, waiting for shut-down.");
+ try {
+ client.waitForCompletion();
+ } catch (YarnClientException e) {
+ throw new ClientException(
+ "Wait for completion failed for app id: " + appId.toString(), e);
+ }
+ System.out.println("Application completed: " + appId.toString());
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/drill/blob/f2ac8749/drill-yarn/src/main/java/org/apache/drill/yarn/client/PrintConfigCommand.java
----------------------------------------------------------------------
diff --git a/drill-yarn/src/main/java/org/apache/drill/yarn/client/PrintConfigCommand.java b/drill-yarn/src/main/java/org/apache/drill/yarn/client/PrintConfigCommand.java
new file mode 100644
index 0000000..69cdf55
--- /dev/null
+++ b/drill-yarn/src/main/java/org/apache/drill/yarn/client/PrintConfigCommand.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.yarn.client;
+
+import java.io.IOException;
+import java.io.OutputStreamWriter;
+
+import org.apache.drill.yarn.core.DrillOnYarnConfig;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+
+public class PrintConfigCommand extends ClientCommand {
+ @Override
+ public void run() {
+ // Dump configuration if requested for diagnostic use.
+
+ System.out.println("----------------------------------------------");
+ System.out.println("Effective Drill-on-YARN Configuration");
+ DrillOnYarnConfig.instance().dump();
+ System.out.println("----------------------------------------------");
+
+ // Dump YARN configuration.
+
+ System.out.println("YARN, DFS and Hadoop Configuration");
+ YarnConfiguration conf = new YarnConfiguration();
+ try {
+ YarnConfiguration.dumpConfiguration(conf,
+ new OutputStreamWriter(System.out));
+ System.out.println();
+ } catch (IOException e) {
+ // Ignore;
+ }
+ System.out.println("----------------------------------------------");
+ }
+}
http://git-wip-us.apache.org/repos/asf/drill/blob/f2ac8749/drill-yarn/src/main/java/org/apache/drill/yarn/client/ResizeCommand.java
----------------------------------------------------------------------
diff --git a/drill-yarn/src/main/java/org/apache/drill/yarn/client/ResizeCommand.java b/drill-yarn/src/main/java/org/apache/drill/yarn/client/ResizeCommand.java
new file mode 100644
index 0000000..43ae02c
--- /dev/null
+++ b/drill-yarn/src/main/java/org/apache/drill/yarn/client/ResizeCommand.java
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.yarn.client;
+
+import org.apache.drill.yarn.core.DoYUtil;
+import org.apache.drill.yarn.core.DrillOnYarnConfig;
+import org.apache.drill.yarn.core.YarnRMClient;
+import org.json.simple.JSONObject;
+import org.json.simple.parser.JSONParser;
+import org.json.simple.parser.ParseException;
+
+import com.typesafe.config.Config;
+
+public class ResizeCommand extends ClientCommand {
+ private Config config;
+ private YarnRMClient client;
+
+ @Override
+ public void run() throws ClientException {
+ config = DrillOnYarnConfig.config();
+ client = getClient();
+ System.out.println(
+ "Resizing cluster for Application ID: " + client.getAppId().toString());
+
+ // First get an application report to ensure that the AM is,
+ // in fact, running, and to get the HTTP endpoint.
+
+ StatusCommand.Reporter reporter = new StatusCommand.Reporter(client);
+ try {
+ reporter.getReport();
+ } catch (ClientException e) {
+ reporter = null;
+ }
+ String prefix = opts.resizePrefix;
+ int quantity = opts.resizeValue;
+ String cmd;
+ if (prefix.equals("+")) {
+ cmd = "grow";
+ if (opts.verbose) {
+ System.out.println("Growing cluster by " + quantity + " nodes.");
+ }
+ } else if (prefix.equals("-")) {
+ cmd = "shrink";
+ if (opts.verbose) {
+ System.out.println("Shrinking cluster by " + quantity + " nodes.");
+ }
+ } else {
+ cmd = "resize";
+ if (opts.verbose) {
+ System.out.println("Resizing cluster to " + quantity + " nodes.");
+ }
+ }
+ if (sendResize(reporter.getAmUrl(), cmd, quantity)) {
+ System.out.println("Use web UI or status command to check progress.");
+ }
+ }
+
+ private boolean sendResize(String baseUrl, String cmd, int quantity) {
+ try {
+ if (DoYUtil.isBlank(baseUrl)) {
+ return false;
+ }
+ SimpleRestClient restClient = new SimpleRestClient();
+ String tail = "rest/" + cmd + "/" + quantity;
+ String masterKey = config.getString(DrillOnYarnConfig.HTTP_REST_KEY);
+ if (!DoYUtil.isBlank(masterKey)) {
+ tail += "?key=" + masterKey;
+ }
+ if (opts.verbose) {
+ System.out.println("Resizing with POST " + baseUrl + "/" + tail);
+ }
+ String result = restClient.send(baseUrl, tail, true);
+
+ JSONParser parser = new JSONParser();
+ Object response;
+ try {
+ response = parser.parse(result);
+ } catch (ParseException e) {
+ System.err.println("Invalid response received from AM");
+ if (opts.verbose) {
+ System.out.println(result);
+ System.out.println(e.getMessage());
+ }
+ return false;
+ }
+ JSONObject root = (JSONObject) response;
+
+ System.out.println("AM responded: " + root.get("message"));
+ if ("ok".equals(root.get("status"))) {
+ return true;
+ }
+ System.err.println("Failed to resize the application master.");
+ return false;
+ } catch (ClientException e) {
+ System.err.println("Resize failed: " + e.getMessage());
+ return false;
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/drill/blob/f2ac8749/drill-yarn/src/main/java/org/apache/drill/yarn/client/SimpleRestClient.java
----------------------------------------------------------------------
diff --git a/drill-yarn/src/main/java/org/apache/drill/yarn/client/SimpleRestClient.java b/drill-yarn/src/main/java/org/apache/drill/yarn/client/SimpleRestClient.java
new file mode 100644
index 0000000..e47fb58
--- /dev/null
+++ b/drill-yarn/src/main/java/org/apache/drill/yarn/client/SimpleRestClient.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.yarn.client;
+
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStreamReader;
+
+import org.apache.http.HttpResponse;
+import org.apache.http.client.ClientProtocolException;
+import org.apache.http.client.HttpClient;
+import org.apache.http.client.methods.HttpGet;
+import org.apache.http.client.methods.HttpPost;
+import org.apache.http.client.methods.HttpRequestBase;
+import org.apache.http.impl.client.DefaultHttpClient;
+
+public class SimpleRestClient {
+ public String send(String baseUrl, String resource, boolean isPost)
+ throws ClientException {
+ String url = baseUrl;
+ if (!url.endsWith("/")) {
+ url += "/";
+ }
+ url += resource;
+ try {
+ HttpClient client = new DefaultHttpClient();
+ HttpRequestBase request;
+ if (isPost) {
+ request = new HttpPost(url);
+ } else {
+ request = new HttpGet(url);
+ }
+
+ HttpResponse response = client.execute(request);
+ BufferedReader rd = new BufferedReader(
+ new InputStreamReader(response.getEntity().getContent()));
+ StringBuilder buf = new StringBuilder();
+ String line = null;
+ while ((line = rd.readLine()) != null) {
+ buf.append(line);
+ }
+ return buf.toString().trim();
+ } catch (ClientProtocolException e) {
+ throw new ClientException("Internal REST error", e);
+ } catch (IllegalStateException e) {
+ throw new ClientException("Internal REST error", e);
+ } catch (IOException e) {
+ throw new ClientException("REST request failed: " + url, e);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/drill/blob/f2ac8749/drill-yarn/src/main/java/org/apache/drill/yarn/client/StartCommand.java
----------------------------------------------------------------------
diff --git a/drill-yarn/src/main/java/org/apache/drill/yarn/client/StartCommand.java b/drill-yarn/src/main/java/org/apache/drill/yarn/client/StartCommand.java
new file mode 100644
index 0000000..fe505a2
--- /dev/null
+++ b/drill-yarn/src/main/java/org/apache/drill/yarn/client/StartCommand.java
@@ -0,0 +1,145 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.yarn.client;
+
+import java.io.File;
+
+import org.apache.drill.yarn.client.StatusCommand.Reporter;
+import org.apache.drill.yarn.core.DrillOnYarnConfig;
+import org.apache.drill.yarn.core.YarnRMClient;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+
+import com.typesafe.config.Config;
+
+/**
+ * Launches a drill cluster by uploading the Drill archive then launching the
+ * Drill Application Master (AM). For testing, can also do just the upload or
+ * just the launch. Handles both a localized Drill and a non-localized launch
+ * (which uses a pre-installed Drill.)
+ * <p>
+ * This single operation combines upload and launch because the upload
+ * Information is needed by the launch.
+ * <p>
+ * On the surface, it would seem that uploading a file and launching an app
+ * should be simple operations. However, under YARN, we must handle a large
+ * number of details that must be gotten exactly right. Plus, both the upload
+ * and launch can be slow operations, so we provide feedback to the user that
+ * something is, indeed, happening.
+ */
+
+public class StartCommand extends ClientCommand {
+ private Config config;
+ private boolean upload;
+ private boolean launch;
+ private boolean dryRun;
+
+ public StartCommand(boolean upload, boolean launch) {
+ this.upload = upload;
+ this.launch = launch;
+ }
+
+ @Override
+ public void run() throws ClientException {
+ checkExistingApp();
+
+ dryRun = opts.dryRun;
+ config = DrillOnYarnConfig.config();
+ FileUploader uploader = upload();
+ if (launch) {
+ launch(uploader);
+ }
+ }
+
+ /**
+ * Check if an application ID file exists. If it does, check if an application
+ * is running. If an app is running, then we can't start a new one. If the app
+ * is not running, then clean up the "orphan" app id file.
+ *
+ * @throws ClientException
+ */
+
+ private void checkExistingApp() throws ClientException {
+ File appIdFile = getAppIdFile();
+ if (!appIdFile.exists()) {
+ return;
+ }
+
+ // File exists. Ask YARN about status.
+
+ Reporter reporter;
+ ApplicationId appId;
+ try {
+ System.out.println("Found app ID file: " + appIdFile.getAbsolutePath());
+ appId = checkAppId();
+ System.out.print("Checking application ID: " + appId.toString() + "...");
+ YarnRMClient client = new YarnRMClient(appId);
+ reporter = new Reporter(client);
+ reporter.getReport();
+ } catch (ClientException e) {
+ // This exception occurs when we ask for a report about an application
+ // that
+ // YARN does not know about. (YARN has likely been restarted.)
+
+ System.out.println(" Not running.");
+ appIdFile.delete();
+ return;
+ }
+
+ // YARN knows about the application. But, was it stopped, perhaps from the
+ // web UI?
+
+ if (reporter.isStopped()) {
+ System.out.println(" Completed with state " + reporter.getState());
+ appIdFile.delete();
+ return;
+ }
+
+ // The app (or another one with the same App ID) is running.
+
+ System.out.println(" Still running!");
+ throw new ClientException(
+ "Error: AM already running as Application ID: " + appId);
+ }
+
+ private FileUploader upload() throws ClientException {
+ FileUploader uploader;
+ if (!config.getBoolean(DrillOnYarnConfig.LOCALIZE_DRILL)) {
+ uploader = new FileUploader.NonLocalized(dryRun, opts.verbose);
+ } else if (upload) {
+ uploader = new FileUploader.UploadFiles(opts.force, dryRun, opts.verbose);
+ } else {
+ uploader = new FileUploader.ReuseFiles(dryRun, opts.verbose);
+ }
+ uploader.run();
+ return uploader;
+ }
+
+ private void launch(FileUploader uploader) throws ClientException {
+ AMRunner runner = new AMRunner(config, opts.verbose, dryRun);
+ runner.resources = uploader.resources;
+ runner.remoteDrillHome = uploader.remoteDrillHome;
+ runner.remoteSiteDir = uploader.remoteSiteDir;
+ if (uploader.isLocalized()) {
+ runner.drillArchivePath = uploader.drillArchivePath.toString();
+ if (uploader.hasSiteDir()) {
+ runner.siteArchivePath = uploader.siteArchivePath.toString();
+ }
+ }
+ runner.run();
+ }
+}
http://git-wip-us.apache.org/repos/asf/drill/blob/f2ac8749/drill-yarn/src/main/java/org/apache/drill/yarn/client/StatusCommand.java
----------------------------------------------------------------------
diff --git a/drill-yarn/src/main/java/org/apache/drill/yarn/client/StatusCommand.java b/drill-yarn/src/main/java/org/apache/drill/yarn/client/StatusCommand.java
new file mode 100644
index 0000000..863b700
--- /dev/null
+++ b/drill-yarn/src/main/java/org/apache/drill/yarn/client/StatusCommand.java
@@ -0,0 +1,189 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.yarn.client;
+
+import org.apache.drill.yarn.core.DoYUtil;
+import org.apache.drill.yarn.core.YarnClientException;
+import org.apache.drill.yarn.core.YarnRMClient;
+import org.apache.hadoop.yarn.api.records.ApplicationReport;
+import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
+import org.apache.hadoop.yarn.api.records.YarnApplicationState;
+import org.json.simple.JSONObject;
+import org.json.simple.parser.JSONParser;
+import org.json.simple.parser.ParseException;
+
+public class StatusCommand extends ClientCommand {
+ public static class Reporter {
+ private YarnRMClient client;
+ ApplicationReport report;
+
+ public Reporter(YarnRMClient client) {
+ this.client = client;
+ }
+
+ public void getReport() throws ClientException {
+ try {
+ report = client.getAppReport();
+ } catch (YarnClientException e) {
+ throw new ClientException(
+ "Failed to get report for Drill application master", e);
+ }
+ }
+
+ public void display(boolean verbose, boolean isNew) {
+ YarnApplicationState state = report.getYarnApplicationState();
+ if (verbose || !isNew) {
+ System.out.println("Application State: " + state.toString());
+ System.out.println("Host: " + report.getHost());
+ }
+ if (verbose || !isNew) {
+ System.out.println("Queue: " + report.getQueue());
+ System.out.println("User: " + report.getUser());
+ long startTime = report.getStartTime();
+ System.out.println("Start Time: " + DoYUtil.toIsoTime(startTime));
+ System.out.println("Application Name: " + report.getName());
+ }
+ System.out.println("Tracking URL: " + report.getTrackingUrl());
+ if (isNew) {
+ System.out.println("Application Master URL: " + getAmUrl());
+ }
+ showFinalStatus();
+ }
+
+ public String getAmUrl() {
+ return StatusCommand.getAmUrl(report);
+ }
+
+ public void showFinalStatus() {
+ YarnApplicationState state = report.getYarnApplicationState();
+ if (state == YarnApplicationState.FAILED
+ || state == YarnApplicationState.FINISHED) {
+ FinalApplicationStatus status = report.getFinalApplicationStatus();
+ System.out.println("Final status: " + status.toString());
+ if (status != FinalApplicationStatus.SUCCEEDED) {
+ String diag = report.getDiagnostics();
+ if (!DoYUtil.isBlank(diag)) {
+ System.out.println("Diagnostics: " + diag);
+ }
+ }
+ }
+ }
+
+ public YarnApplicationState getState() {
+ return report.getYarnApplicationState();
+ }
+
+ public boolean isStarting() {
+ YarnApplicationState state = getState();
+ return state == YarnApplicationState.ACCEPTED
+ || state == YarnApplicationState.NEW
+ || state == YarnApplicationState.NEW_SAVING
+ || state == YarnApplicationState.SUBMITTED;
+ }
+
+ public boolean isStopped() {
+ YarnApplicationState state = getState();
+ return state == YarnApplicationState.FAILED
+ || state == YarnApplicationState.FINISHED
+ || state == YarnApplicationState.KILLED;
+ }
+
+ public boolean isRunning() {
+ YarnApplicationState state = getState();
+ return state == YarnApplicationState.RUNNING;
+ }
+ }
+
+ public static String getAmUrl(ApplicationReport report) {
+ return DoYUtil.unwrapAmUrl(report.getOriginalTrackingUrl());
+ }
+
+ @Override
+ public void run() throws ClientException {
+ YarnRMClient client = getClient();
+ System.out.println("Application ID: " + client.getAppId().toString());
+ Reporter reporter = new Reporter(client);
+ try {
+ reporter.getReport();
+ } catch (Exception e) {
+ removeAppIdFile();
+ System.out.println("Application is not running.");
+ return;
+ }
+ reporter.display(opts.verbose, false);
+ if (reporter.isRunning()) {
+ showAmStatus(reporter.report);
+ }
+ }
+
+ private void showAmStatus(ApplicationReport report) {
+ try {
+ String baseUrl = getAmUrl(report);
+ if (DoYUtil.isBlank(baseUrl)) {
+ return;
+ }
+ SimpleRestClient restClient = new SimpleRestClient();
+ String tail = "rest/status";
+ if (opts.verbose) {
+ System.out.println("Getting status with " + baseUrl + "/" + tail);
+ }
+ String result = restClient.send(baseUrl, tail, false);
+ formatResponse(result);
+ System.out.println("For more information, visit: " + baseUrl);
+ } catch (ClientException e) {
+ System.out.println("Failed to get AM status");
+ System.err.println(e.getMessage());
+ }
+ }
+
+ private void formatResponse(String result) {
+ JSONParser parser = new JSONParser();
+ Object status;
+ try {
+ status = parser.parse(result);
+ } catch (ParseException e) {
+ System.err.println("Invalid response received from AM");
+ if (opts.verbose) {
+ System.out.println(result);
+ System.out.println(e.getMessage());
+ }
+ return;
+ }
+ JSONObject root = (JSONObject) status;
+ showMetric("AM State", root, "state");
+ showMetric("Target Drillbit Count", root.get("summary"), "targetBitCount");
+ showMetric("Live Drillbit Count", root.get("summary"), "liveBitCount");
+ showMetric("Unmanaged Drillbit Count", root.get("summary"), "unmanagedCount");
+ showMetric("Blacklisted Node Count", root.get("summary"), "blackListCount");
+ showMetric("Free Node Count", root.get("summary"), "freeNodeCount");
+ }
+
+ private void showMetric(String label, Object object, String key) {
+ if (object == null) {
+ return;
+ }
+ if (!(object instanceof JSONObject)) {
+ return;
+ }
+ object = ((JSONObject) object).get(key);
+ if (object == null) {
+ return;
+ }
+ System.out.println(label + ": " + object.toString());
+ }
+}