You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by we...@apache.org on 2017/05/05 17:32:18 UTC
[31/51] [partial] hive git commit: HIVE-14671 : merge master into
hive-14535 (Wei Zheng)
http://git-wip-us.apache.org/repos/asf/hive/blob/187eb760/llap-server/src/java/org/apache/hadoop/hive/llap/cli/LlapStatusServiceDriver.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/cli/LlapStatusServiceDriver.java b/llap-server/src/java/org/apache/hadoop/hive/llap/cli/LlapStatusServiceDriver.java
index b36d4ff..1b57e38 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/cli/LlapStatusServiceDriver.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/cli/LlapStatusServiceDriver.java
@@ -24,11 +24,11 @@ import java.io.FileOutputStream;
import java.io.IOException;
import java.io.OutputStream;
import java.io.PrintWriter;
+import java.net.URISyntaxException;
import java.text.DecimalFormat;
import java.util.Arrays;
import java.util.Collection;
import java.util.EnumSet;
-import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
@@ -41,20 +41,28 @@ import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
import org.apache.hadoop.hive.common.classification.InterfaceAudience;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.llap.cli.LlapStatusOptionsProcessor.LlapStatusOptions;
+import org.apache.hadoop.hive.llap.cli.status.LlapStatusHelpers;
+import org.apache.hadoop.hive.llap.cli.status.LlapStatusHelpers.AppStatusBuilder;
+import org.apache.hadoop.hive.llap.cli.status.LlapStatusHelpers.LlapInstance;
import org.apache.hadoop.hive.llap.configuration.LlapDaemonConfiguration;
import org.apache.hadoop.hive.llap.registry.ServiceInstance;
import org.apache.hadoop.hive.llap.registry.impl.LlapRegistryService;
import org.apache.hadoop.hive.ql.session.SessionState;
import org.apache.hadoop.yarn.api.records.ApplicationReport;
+import org.apache.hadoop.yarn.api.records.ContainerExitStatus;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.util.Clock;
import org.apache.hadoop.yarn.util.SystemClock;
import org.apache.slider.api.ClusterDescription;
import org.apache.slider.api.ClusterDescriptionKeys;
+import org.apache.slider.api.StateValues;
import org.apache.slider.api.StatusKeys;
+import org.apache.slider.api.types.ApplicationDiagnostics;
+import org.apache.slider.api.types.ContainerInformation;
import org.apache.slider.client.SliderClient;
+import org.apache.slider.common.params.ActionDiagnosticArgs;
import org.apache.slider.core.exceptions.SliderException;
-import org.codehaus.jackson.annotate.JsonIgnore;
import org.codehaus.jackson.map.ObjectMapper;
import org.codehaus.jackson.map.SerializationConfig;
import org.codehaus.jackson.map.annotate.JsonSerialize;
@@ -64,6 +72,7 @@ import org.slf4j.LoggerFactory;
public class LlapStatusServiceDriver {
private static final Logger LOG = LoggerFactory.getLogger(LlapStatusServiceDriver.class);
+ private static final Logger CONSOLE_LOGGER = LoggerFactory.getLogger("LlapStatusServiceDriverConsole");
// Defining a bunch of configs here instead of in HiveConf. These are experimental, and mainly
// for use when retry handling is fixed in Yarn/Hadoop
@@ -104,6 +113,8 @@ public class LlapStatusServiceDriver {
CONF_PREFIX + "zk-registry.timeout-ms";
private static final long CONFIG_LLAP_ZK_REGISTRY_TIMEOUT_MS_DEFAULT = 20000l;
+ private static final long LOG_SUMMARY_INTERVAL = 15000L; // Log summary every ~15 seconds.
+
private static final String LLAP_KEY = "LLAP";
private final Configuration conf;
private final Clock clock = new SystemClock();
@@ -161,7 +172,8 @@ public class LlapStatusServiceDriver {
* @param args
* @return command line options.
*/
- public LlapStatusOptions parseOptions(String[] args) throws LlapStatusCliException {
+ public LlapStatusOptions parseOptions(String[] args) throws
+ LlapStatusCliException {
LlapStatusOptionsProcessor optionsProcessor = new LlapStatusOptionsProcessor();
LlapStatusOptions options;
@@ -209,16 +221,21 @@ public class LlapStatusServiceDriver {
}
try {
- sliderClient = createSliderClient();
- } catch (LlapStatusCliException e) {
- logError(e);
- return e.getExitCode().getInt();
+ if (sliderClient == null) {
+ sliderClient = LlapSliderUtils.createSliderClient(conf);
+ }
+ } catch (Exception e) {
+ LlapStatusCliException le = new LlapStatusCliException(
+ LlapStatusServiceDriver.ExitCode.SLIDER_CLIENT_ERROR_CREATE_FAILED,
+ "Failed to create slider client", e);
+ logError(le);
+ return le.getExitCode().getInt();
}
// Get the App report from YARN
ApplicationReport appReport;
try {
- appReport = getAppReport(appName, sliderClient, options.getFindAppTimeoutMs());
+ appReport = LlapSliderUtils.getAppReport(appName, sliderClient, options.getFindAppTimeoutMs());
} catch (LlapStatusCliException e) {
logError(e);
return e.getExitCode().getInt();
@@ -235,13 +252,13 @@ public class LlapStatusServiceDriver {
if (ret != ExitCode.SUCCESS) {
return ret.getInt();
- } else if (EnumSet.of(State.APP_NOT_FOUND, State.COMPLETE, State.LAUNCHING)
+ } else if (EnumSet.of(LlapStatusHelpers.State.APP_NOT_FOUND, LlapStatusHelpers.State.COMPLETE, LlapStatusHelpers.State.LAUNCHING)
.contains(appStatusBuilder.getState())) {
return ExitCode.SUCCESS.getInt();
} else {
// Get information from slider.
try {
- ret = populateAppStatusFromSlider(appName, sliderClient, appStatusBuilder);
+ ret = populateAppStatusFromSliderStatus(appName, sliderClient, appStatusBuilder);
} catch (LlapStatusCliException e) {
// In case of failure, send back whatever is constructed sop far - which wouldbe from the AppReport
logError(e);
@@ -249,6 +266,18 @@ public class LlapStatusServiceDriver {
}
}
+
+ if (ret != ExitCode.SUCCESS) {
+ return ret.getInt();
+ } else {
+ try {
+ ret = populateAppStatusFromSliderDiagnostics(appName, sliderClient, appStatusBuilder);
+ } catch (LlapStatusCliException e) {
+ logError(e);
+ return e.getExitCode().getInt();
+ }
+ }
+
if (ret != ExitCode.SUCCESS) {
return ret.getInt();
} else {
@@ -268,7 +297,8 @@ public class LlapStatusServiceDriver {
}
}
- public void outputJson(PrintWriter writer) throws LlapStatusCliException {
+ public void outputJson(PrintWriter writer) throws
+ LlapStatusCliException {
ObjectMapper mapper = new ObjectMapper();
mapper.configure(SerializationConfig.Feature.FAIL_ON_EMPTY_BEANS, false);
mapper.setSerializationInclusion(JsonSerialize.Inclusion.NON_NULL);
@@ -342,25 +372,27 @@ public class LlapStatusServiceDriver {
* @throws LlapStatusCliException
*/
private ExitCode processAppReport(ApplicationReport appReport,
- AppStatusBuilder appStatusBuilder) throws LlapStatusCliException {
+ AppStatusBuilder appStatusBuilder) throws
+ LlapStatusCliException {
if (appReport == null) {
- appStatusBuilder.setState(State.APP_NOT_FOUND);
+ appStatusBuilder.setState(LlapStatusHelpers.State.APP_NOT_FOUND);
LOG.info("No Application Found");
return ExitCode.SUCCESS;
}
+ // TODO Maybe add the YARN URL for the app.
appStatusBuilder.setAmInfo(
- new AmInfo().setAppName(appReport.getName()).setAppType(appReport.getApplicationType()));
+ new LlapStatusHelpers.AmInfo().setAppName(appReport.getName()).setAppType(appReport.getApplicationType()));
appStatusBuilder.setAppStartTime(appReport.getStartTime());
switch (appReport.getYarnApplicationState()) {
case NEW:
case NEW_SAVING:
case SUBMITTED:
- appStatusBuilder.setState(State.LAUNCHING);
+ appStatusBuilder.setState(LlapStatusHelpers.State.LAUNCHING);
return ExitCode.SUCCESS;
case ACCEPTED:
appStatusBuilder.maybeCreateAndGetAmInfo().setAppId(appReport.getApplicationId().toString());
- appStatusBuilder.setState(State.LAUNCHING);
+ appStatusBuilder.setState(LlapStatusHelpers.State.LAUNCHING);
return ExitCode.SUCCESS;
case RUNNING:
appStatusBuilder.maybeCreateAndGetAmInfo().setAppId(appReport.getApplicationId().toString());
@@ -371,7 +403,13 @@ public class LlapStatusServiceDriver {
case KILLED:
appStatusBuilder.maybeCreateAndGetAmInfo().setAppId(appReport.getApplicationId().toString());
appStatusBuilder.setAppFinishTime(appReport.getFinishTime());
- appStatusBuilder.setState(State.COMPLETE);
+ appStatusBuilder.setState(LlapStatusHelpers.State.COMPLETE);
+ ApplicationDiagnostics appDiagnostics = LlapSliderUtils.getApplicationDiagnosticsFromYarnDiagnostics(appReport, LOG);
+ if (appDiagnostics == null) {
+ LOG.warn("AppDiagnostics not available for YARN application report");
+ } else {
+ processAppDiagnostics(appStatusBuilder, appDiagnostics, true);
+ }
return ExitCode.SUCCESS;
default:
throw new LlapStatusCliException(ExitCode.INTERNAL_ERROR,
@@ -380,7 +418,11 @@ public class LlapStatusServiceDriver {
}
+
+
+
/**
+ * Populates information from SliderStatus.
*
* @param appName
* @param sliderClient
@@ -388,7 +430,7 @@ public class LlapStatusServiceDriver {
* @return an ExitCode. An ExitCode other than ExitCode.SUCCESS implies future progress not possible
* @throws LlapStatusCliException
*/
- private ExitCode populateAppStatusFromSlider(String appName, SliderClient sliderClient, AppStatusBuilder appStatusBuilder) throws
+ private ExitCode populateAppStatusFromSliderStatus(String appName, SliderClient sliderClient, AppStatusBuilder appStatusBuilder) throws
LlapStatusCliException {
ClusterDescription clusterDescription;
@@ -450,9 +492,10 @@ public class LlapStatusServiceDriver {
String host = (String) containerParams.get("host");
- LlapInstance llapInstance = new LlapInstance(host, containerIdString);
+ LlapInstance
+ llapInstance = new LlapInstance(host, containerIdString);
- appStatusBuilder.addNewLlapInstance(llapInstance);
+ appStatusBuilder.addNewRunningLlapInstance(llapInstance);
}
}
@@ -464,8 +507,45 @@ public class LlapStatusServiceDriver {
}
}
+ /**
+ * Populates information based on the slider diagnostics call. Must be invoked
+ * after populating status from slider status.
+ * @param appName
+ * @param sliderClient
+ * @param appStatusBuilder
+ * @return
+ * @throws LlapStatusCliException
+ */
+ private ExitCode populateAppStatusFromSliderDiagnostics(String appName,
+ SliderClient sliderClient,
+ AppStatusBuilder appStatusBuilder) throws
+ LlapStatusCliException {
+
+ ApplicationDiagnostics appDiagnostics;
+ try {
+ ActionDiagnosticArgs args = new ActionDiagnosticArgs();
+ args.containers = true;
+ args.name = appName;
+ appDiagnostics =
+ sliderClient.actionDiagnosticContainers(args);
+ } catch (YarnException | IOException | URISyntaxException e) {
+ throw new LlapStatusCliException(
+ ExitCode.SLIDER_CLIENT_ERROR_OTHER,
+ "Failed to get container diagnostics from slider", e);
+ }
+ if (appDiagnostics == null) {
+ LOG.info("Slider container diagnostics not available");
+ return ExitCode.SLIDER_CLIENT_ERROR_OTHER;
+ }
+
+ processAppDiagnostics(appStatusBuilder, appDiagnostics, false);
+
+ return ExitCode.SUCCESS;
+ }
/**
+ * Populate additional information for containers from the LLAP registry. Must be invoked
+ * after Slider status. Also after slider-diagnostics.
* @param appStatusBuilder
* @return an ExitCode. An ExitCode other than ExitCode.SUCCESS implies future progress not possible
* @throws LlapStatusCliException
@@ -491,10 +571,12 @@ public class LlapStatusServiceDriver {
}
if (serviceInstances == null || serviceInstances.isEmpty()) {
- LOG.info("No information found in the LLAP registry");
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("No information found in the LLAP registry");
+ }
appStatusBuilder.setLiveInstances(0);
- appStatusBuilder.setState(State.LAUNCHING);
- appStatusBuilder.clearLlapInstances();
+ appStatusBuilder.setState(LlapStatusHelpers.State.LAUNCHING);
+ appStatusBuilder.clearRunningLlapInstances();
return ExitCode.SUCCESS;
} else {
// Tracks instances known by both slider and llap.
@@ -505,7 +587,7 @@ public class LlapStatusServiceDriver {
String containerIdString = serviceInstance.getProperties().get(
HiveConf.ConfVars.LLAP_DAEMON_CONTAINER_ID.varname);
- LlapInstance llapInstance = appStatusBuilder.removeAndgetLlapInstanceForContainer(
+ LlapInstance llapInstance = appStatusBuilder.removeAndGetRunningLlapInstanceForContainer(
containerIdString);
if (llapInstance != null) {
llapInstance.setMgmtPort(serviceInstance.getManagementPort());
@@ -524,375 +606,185 @@ public class LlapStatusServiceDriver {
}
appStatusBuilder.setLiveInstances(validatedInstances.size());
+ appStatusBuilder.setLaunchingInstances(llapExtraInstances.size());
if (validatedInstances.size() >= appStatusBuilder.getDesiredInstances()) {
- appStatusBuilder.setState(State.RUNNING_ALL);
+ appStatusBuilder.setState(LlapStatusHelpers.State.RUNNING_ALL);
if (validatedInstances.size() > appStatusBuilder.getDesiredInstances()) {
LOG.warn("Found more entries in LLAP registry, as compared to desired entries");
}
} else {
if (validatedInstances.size() > 0) {
- appStatusBuilder.setState(State.RUNNING_PARTIAL);
+ appStatusBuilder.setState(LlapStatusHelpers.State.RUNNING_PARTIAL);
} else {
- appStatusBuilder.setState(State.LAUNCHING);
+ appStatusBuilder.setState(LlapStatusHelpers.State.LAUNCHING);
}
}
// At this point, everything that can be consumed from AppStatusBuilder has been consumed.
// Debug only
- if (appStatusBuilder.allInstances().size() > 0) {
+ if (appStatusBuilder.allRunningInstances().size() > 0) {
// Containers likely to come up soon.
- LOG.debug("Potential instances starting up: {}", appStatusBuilder.allInstances());
+ LOG.debug("Potential instances starting up: {}", appStatusBuilder.allRunningInstances());
}
if (llapExtraInstances.size() > 0) {
- // Old containers which are likely shutting down
+ // Old containers which are likely shutting down, or new containers which
+ // launched between slider-status/slider-diagnostics. Skip for this iteration.
LOG.debug("Instances likely to shutdown soon: {}", llapExtraInstances);
}
- appStatusBuilder.clearAndAddPreviouslyKnownInstances(validatedInstances);
+ appStatusBuilder.clearAndAddPreviouslyKnownRunningInstances(validatedInstances);
}
return ExitCode.SUCCESS;
}
- static final class AppStatusBuilder {
-
- private AmInfo amInfo;
- private State state = State.UNKNOWN;
- private String originalConfigurationPath;
- private String generatedConfigurationPath;
-
- private int desiredInstances = -1;
- private int liveInstances = -1;
-
- private Long appStartTime;
- private Long appFinishTime;
-
- private boolean runningThresholdAchieved = false;
-
- private final List<LlapInstance> llapInstances = new LinkedList<>();
-
- private transient Map<String, LlapInstance> containerToInstanceMap = new HashMap<>();
-
- public void setAmInfo(AmInfo amInfo) {
- this.amInfo = amInfo;
- }
-
- public AppStatusBuilder setState(
- State state) {
- this.state = state;
- return this;
- }
-
- public AppStatusBuilder setOriginalConfigurationPath(String originalConfigurationPath) {
- this.originalConfigurationPath = originalConfigurationPath;
- return this;
- }
-
- public AppStatusBuilder setGeneratedConfigurationPath(String generatedConfigurationPath) {
- this.generatedConfigurationPath = generatedConfigurationPath;
- return this;
- }
-
- public AppStatusBuilder setAppStartTime(long appStartTime) {
- this.appStartTime = appStartTime;
- return this;
- }
-
- public AppStatusBuilder setAppFinishTime(long finishTime) {
- this.appFinishTime = finishTime;
- return this;
- }
-
- public AppStatusBuilder setDesiredInstances(int desiredInstances) {
- this.desiredInstances = desiredInstances;
- return this;
- }
-
- public AppStatusBuilder setLiveInstances(int liveInstances) {
- this.liveInstances = liveInstances;
- return this;
- }
-
- public AppStatusBuilder addNewLlapInstance(LlapInstance llapInstance) {
- this.llapInstances.add(llapInstance);
- this.containerToInstanceMap.put(llapInstance.getContainerId(), llapInstance);
- return this;
- }
-
- public AppStatusBuilder setRunningThresholdAchieved(boolean thresholdAchieved) {
- this.runningThresholdAchieved = thresholdAchieved;
- return this;
- }
-
- public LlapInstance removeAndgetLlapInstanceForContainer(String containerIdString) {
- return containerToInstanceMap.remove(containerIdString);
- }
-
- public void clearLlapInstances() {
- this.llapInstances.clear();
- this.containerToInstanceMap.clear();
- }
-
- public AppStatusBuilder clearAndAddPreviouslyKnownInstances(List<LlapInstance> llapInstances) {
- clearLlapInstances();
- for (LlapInstance llapInstance : llapInstances) {
- addNewLlapInstance(llapInstance);
+ private static void processAppDiagnostics(AppStatusBuilder appStatusBuilder,
+ ApplicationDiagnostics appDiagnostics, boolean appComplete) {
+ // For a running app this should be empty.
+ String finalMessage = appDiagnostics.getFinalMessage();
+ Collection<ContainerInformation> containerInfos =
+ appDiagnostics.getContainers();
+ appStatusBuilder.setDiagnostics(finalMessage);
+ if (containerInfos != null) {
+ for (ContainerInformation containerInformation : containerInfos) {
+ if (containerInformation.getState() == StateValues.STATE_LIVE && !appComplete) {
+ LlapInstance instance = appStatusBuilder
+ .removeAndGetCompletedLlapInstanceForContainer(
+ containerInformation.getContainerId());
+ if (instance ==
+ null) { // New launch. Not available during slider status, but available now.
+ instance = new LlapInstance(containerInformation.getHost(),
+ containerInformation.getContainerId());
+ }
+ instance.setLogUrl(containerInformation.getLogLink());
+ appStatusBuilder.addNewRunningLlapInstance(instance);
+ } else if (containerInformation.getState() ==
+ StateValues.STATE_STOPPED || appComplete) {
+ LlapInstance instance =
+ new LlapInstance(containerInformation.getHost(),
+ containerInformation.getContainerId());
+ instance.setLogUrl(containerInformation.getLogLink());
+ if (appComplete && containerInformation.getExitCode() !=
+ ContainerExitStatus.INVALID) {
+ instance
+ .setYarnContainerExitStatus(containerInformation.getExitCode());
+ }
+ instance.setDiagnostics(containerInformation.getDiagnostics());
+ appStatusBuilder.addNewCompleteLlapInstance(instance);
+ } else {
+ LOG.warn("Unexpected containerstate={}, for container={}",
+ containerInformation.getState(), containerInformation);
+ }
}
- return this;
- }
-
- @JsonIgnore
- public List<LlapInstance> allInstances() {
- return this.llapInstances;
- }
-
- public AmInfo getAmInfo() {
- return amInfo;
- }
-
- public State getState() {
- return state;
- }
-
- public String getOriginalConfigurationPath() {
- return originalConfigurationPath;
- }
-
- public String getGeneratedConfigurationPath() {
- return generatedConfigurationPath;
- }
-
- public int getDesiredInstances() {
- return desiredInstances;
- }
-
- public int getLiveInstances() {
- return liveInstances;
- }
-
- public Long getAppStartTime() {
- return appStartTime;
- }
-
- public Long getAppFinishTime() {
- return appFinishTime;
- }
-
- public List<LlapInstance> getLlapInstances() {
- return llapInstances;
- }
-
- public boolean isRunningThresholdAchieved() {
- return runningThresholdAchieved;
- }
-
- @JsonIgnore
- public AmInfo maybeCreateAndGetAmInfo() {
- if (amInfo == null) {
- amInfo = new AmInfo();
+ } else {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("ContainerInfos is null");
}
- return amInfo;
- }
-
- @Override
- public String toString() {
- return "AppStatusBuilder{" +
- "amInfo=" + amInfo +
- ", state=" + state +
- ", originalConfigurationPath='" + originalConfigurationPath + '\'' +
- ", generatedConfigurationPath='" + generatedConfigurationPath + '\'' +
- ", desiredInstances=" + desiredInstances +
- ", liveInstances=" + liveInstances +
- ", appStartTime=" + appStartTime +
- ", appFinishTime=" + appFinishTime +
- ", llapInstances=" + llapInstances +
- ", containerToInstanceMap=" + containerToInstanceMap +
- '}';
}
}
- static class AmInfo {
- private String appName;
- private String appType;
- private String appId;
- private String containerId;
- private String hostname;
- private String amWebUrl;
-
- public AmInfo setAppName(String appName) {
- this.appName = appName;
- return this;
- }
-
- public AmInfo setAppType(String appType) {
- this.appType = appType;
- return this;
- }
-
- public AmInfo setAppId(String appId) {
- this.appId = appId;
- return this;
- }
-
- public AmInfo setContainerId(String containerId) {
- this.containerId = containerId;
- return this;
- }
-
- public AmInfo setHostname(String hostname) {
- this.hostname = hostname;
- return this;
- }
-
- public AmInfo setAmWebUrl(String amWebUrl) {
- this.amWebUrl = amWebUrl;
- return this;
- }
-
- public String getAppName() {
- return appName;
- }
-
- public String getAppType() {
- return appType;
- }
-
- public String getAppId() {
- return appId;
- }
-
- public String getContainerId() {
- return containerId;
- }
-
- public String getHostname() {
- return hostname;
- }
-
- public String getAmWebUrl() {
- return amWebUrl;
- }
-
- @Override
- public String toString() {
- return "AmInfo{" +
- "appName='" + appName + '\'' +
- ", appType='" + appType + '\'' +
- ", appId='" + appId + '\'' +
- ", containerId='" + containerId + '\'' +
- ", hostname='" + hostname + '\'' +
- ", amWebUrl='" + amWebUrl + '\'' +
- '}';
- }
- }
-
- static class LlapInstance {
- private final String hostname;
- private final String containerId;
- private String statusUrl;
- private String webUrl;
- private Integer rpcPort;
- private Integer mgmtPort;
- private Integer shufflePort;
-
- // TODO HIVE-13454 Add additional information such as #executors, container size, etc
-
- public LlapInstance(String hostname, String containerId) {
- this.hostname = hostname;
- this.containerId = containerId;
- }
-
- public LlapInstance setWebUrl(String webUrl) {
- this.webUrl = webUrl;
- return this;
- }
-
- public LlapInstance setStatusUrl(String statusUrl) {
- this.statusUrl = statusUrl;
- return this;
- }
-
- public LlapInstance setRpcPort(int rpcPort) {
- this.rpcPort = rpcPort;
- return this;
- }
-
- public LlapInstance setMgmtPort(int mgmtPort) {
- this.mgmtPort = mgmtPort;
- return this;
- }
-
- public LlapInstance setShufflePort(int shufflePort) {
- this.shufflePort = shufflePort;
- return this;
- }
-
- public String getHostname() {
- return hostname;
- }
-
- public String getStatusUrl() {
- return statusUrl;
- }
-
- public String getContainerId() {
- return containerId;
- }
-
- public String getWebUrl() {
- return webUrl;
- }
-
- public Integer getRpcPort() {
- return rpcPort;
- }
-
- public Integer getMgmtPort() {
- return mgmtPort;
- }
-
- public Integer getShufflePort() {
- return shufflePort;
- }
+ private static String constructCompletedContainerDiagnostics(List<LlapInstance> completedInstances) {
+ StringBuilder sb = new StringBuilder();
+ if (completedInstances == null || completedInstances.size() == 0) {
+ return "";
+ } else {
+ // TODO HIVE-15865 Ideally sort these by completion time, once that is available.
+ boolean isFirst = true;
+ for (LlapInstance instance : completedInstances) {
+ if (!isFirst) {
+ sb.append("\n");
+ } else {
+ isFirst = false;
+ }
- @Override
- public String toString() {
- return "LlapInstance{" +
- "hostname='" + hostname + '\'' +
- ", containerId='" + containerId + '\'' +
- ", statusUrl='" + statusUrl + '\'' +
- ", webUrl='" + webUrl + '\'' +
- ", rpcPort=" + rpcPort +
- ", mgmtPort=" + mgmtPort +
- ", shufflePort=" + shufflePort +
- '}';
+ if (instance.getYarnContainerExitStatus() ==
+ ContainerExitStatus.KILLED_EXCEEDED_PMEM ||
+ instance.getYarnContainerExitStatus() ==
+ ContainerExitStatus.KILLED_EXCEEDED_VMEM) {
+ sb.append("\tKILLED container (by YARN for exceeding memory limits): ");
+ } else {
+ // TODO HIVE-15865 Handle additional reasons like OS launch failed (Slider needs to give this info)
+ sb.append("\tFAILED container: ");
+ }
+ sb.append(" ").append(instance.getContainerId());
+ sb.append(", Logs at: ").append(instance.getLogUrl());
+ }
}
+ return sb.toString();
}
- static class LlapStatusCliException extends Exception {
- final ExitCode exitCode;
+ /**
+ * Helper method to construct a diagnostic message from a complete
+ * AppStatusBuilder.
+ *
+ * @return
+ */
+ private static String constructDiagnostics(
+ AppStatusBuilder appStatusBuilder) {
+ StringBuilder sb = new StringBuilder();
+
+ switch (appStatusBuilder.getState()) {
+ case APP_NOT_FOUND:
+ sb.append("LLAP status unknown. Awaiting app launch");
+ break;
+ case LAUNCHING:
+ // This is a catch all state - when containers have not started yet, or LLAP has not started yet.
+ if (StringUtils.isNotBlank(appStatusBuilder.getAmInfo().getAppId())) {
+ sb.append("LLAP Starting up with AppId=")
+ .append(appStatusBuilder.getAmInfo().getAppId()).append(".");
+ if (appStatusBuilder.getDesiredInstances() != null) {
+ sb.append(" Started 0/").append(appStatusBuilder.getDesiredInstances()).append(" instances");
+ }
+ String containerDiagnostics = constructCompletedContainerDiagnostics(
+ appStatusBuilder.getCompletedInstances());
+ if (StringUtils.isNotEmpty(containerDiagnostics)) {
+ sb.append("\n").append(containerDiagnostics);
+ }
+ } else {
+ sb.append("Awaiting LLAP startup");
+ }
+ break;
+ case RUNNING_PARTIAL:
+ sb.append("LLAP Starting up with ApplicationId=")
+ .append(appStatusBuilder.getAmInfo().getAppId());
+ sb.append(" Started").append(appStatusBuilder.getLiveInstances())
+ .append("/").append(appStatusBuilder.getDesiredInstances())
+ .append(" instances");
+ String containerDiagnostics = constructCompletedContainerDiagnostics(
+ appStatusBuilder.getCompletedInstances());
+ if (StringUtils.isNotEmpty(containerDiagnostics)) {
+ sb.append("\n").append(containerDiagnostics);
+ }
- public LlapStatusCliException(ExitCode exitCode, String message) {
- super(exitCode.getInt() +": " + message);
- this.exitCode = exitCode;
- }
+ // TODO HIVE-15865: Include information about pending requests, and last allocation time
+ // once Slider provides this information.
+ break;
+ case RUNNING_ALL:
+ sb.append("LLAP Application running with ApplicationId=")
+ .append(appStatusBuilder.getAmInfo().getAppId());
+ break;
+ case COMPLETE:
+
+ sb.append("LLAP Application already complete. ApplicationId=")
+ .append(appStatusBuilder.getAmInfo().getAppId());
+ containerDiagnostics = constructCompletedContainerDiagnostics(
+ appStatusBuilder.getCompletedInstances());
+ if (StringUtils.isNotEmpty(containerDiagnostics)) {
+ sb.append("\n").append(containerDiagnostics);
+ }
- public LlapStatusCliException(ExitCode exitCode, String message, Throwable cause) {
- super(message, cause);
- this.exitCode = exitCode;
+ break;
+ case UNKNOWN:
+ sb.append("LLAP status unknown");
+ break;
}
-
- public ExitCode getExitCode() {
- return exitCode;
+ if (StringUtils.isNotBlank(appStatusBuilder.getDiagnostics())) {
+ sb.append("\n").append(appStatusBuilder.getDiagnostics());
}
- }
- enum State {
- APP_NOT_FOUND, LAUNCHING,
- RUNNING_PARTIAL,
- RUNNING_ALL, COMPLETE, UNKNOWN
+ return sb.toString();
}
public enum ExitCode {
@@ -918,6 +810,26 @@ public class LlapStatusServiceDriver {
}
+ public static class LlapStatusCliException extends Exception {
+ final LlapStatusServiceDriver.ExitCode exitCode;
+
+
+ public LlapStatusCliException(LlapStatusServiceDriver.ExitCode exitCode, String message) {
+ super(exitCode.getInt() +": " + message);
+ this.exitCode = exitCode;
+ }
+
+ public LlapStatusCliException(LlapStatusServiceDriver.ExitCode exitCode, String message, Throwable cause) {
+ super(message, cause);
+ this.exitCode = exitCode;
+ }
+
+ public LlapStatusServiceDriver.ExitCode getExitCode() {
+ return exitCode;
+ }
+ }
+
+
private static void logError(Throwable t) {
LOG.error("FAILED: " + t.getMessage(), t);
System.err.println("FAILED: " + t.getMessage());
@@ -927,6 +839,9 @@ public class LlapStatusServiceDriver {
public static void main(String[] args) {
LOG.info("LLAP status invoked with arguments = {}", Arrays.toString(args));
int ret = ExitCode.SUCCESS.getInt();
+ Clock clock = new SystemClock();
+ long startTime = clock.getTime();
+ long lastSummaryLogTime = -1;
LlapStatusServiceDriver statusServiceDriver = null;
LlapStatusOptions options = null;
@@ -937,7 +852,8 @@ public class LlapStatusServiceDriver {
statusServiceDriver.close();
logError(t);
if (t instanceof LlapStatusCliException) {
- LlapStatusCliException ce = (LlapStatusCliException) t;
+ LlapStatusCliException
+ ce = (LlapStatusCliException) t;
ret = ce.getExitCode().getInt();
} else {
ret = ExitCode.INTERNAL_ERROR.getInt();
@@ -950,12 +866,14 @@ public class LlapStatusServiceDriver {
System.exit(ret);
}
+ boolean firstAttempt = true;
final long refreshInterval = options.getRefreshIntervalMs();
final boolean watchMode = options.isWatchMode();
final long watchTimeout = options.getWatchTimeoutMs();
long numAttempts = watchTimeout / refreshInterval;
- State launchingState = null;
- State currentState = null;
+ numAttempts = watchMode ? numAttempts : 1; // Break out of the loop fast if watchMode is disabled.
+ LlapStatusHelpers.State launchingState = null;
+ LlapStatusHelpers.State currentState = null;
boolean desiredStateAttained = false;
final float runningNodesThreshold = options.getRunningNodesThreshold();
try (OutputStream os = options.getOutputFile() == null ? System.out :
@@ -969,28 +887,62 @@ public class LlapStatusServiceDriver {
numAttempts, watchMode, new DecimalFormat("#.###").format(runningNodesThreshold));
while (numAttempts > 0) {
try {
+ if (!firstAttempt) {
+ if (watchMode) {
+ try {
+ Thread.sleep(refreshInterval);
+ } catch (InterruptedException e) {
+ // ignore
+ }
+ } else {
+ // reported once, so break
+ break;
+ }
+ } else {
+ firstAttempt = false;
+ }
ret = statusServiceDriver.run(options, watchMode ? watchTimeout : 0);
+ currentState = statusServiceDriver.appStatusBuilder.getState();
+ try {
+ lastSummaryLogTime = LlapStatusServiceDriver
+ .maybeLogSummary(clock, lastSummaryLogTime, statusServiceDriver,
+ watchMode, watchTimeout, launchingState);
+ } catch (Exception e) {
+ LOG.warn("Failed to log summary", e);
+ }
+
if (ret == ExitCode.SUCCESS.getInt()) {
if (watchMode) {
- currentState = statusServiceDriver.appStatusBuilder.state;
// slider has started llap application, now if for some reason state changes to COMPLETE then fail fast
if (launchingState == null &&
- (currentState.equals(State.LAUNCHING) || currentState.equals(State.RUNNING_PARTIAL))) {
+ (EnumSet.of(LlapStatusHelpers.State.LAUNCHING,
+ LlapStatusHelpers.State.RUNNING_PARTIAL,
+ LlapStatusHelpers.State.RUNNING_ALL)
+ .contains(currentState))) {
launchingState = currentState;
}
- if (launchingState != null && currentState.equals(State.COMPLETE)) {
+ if (launchingState != null && currentState.equals(
+ LlapStatusHelpers.State.COMPLETE)) {
LOG.warn("Application stopped while launching. COMPLETE state reached while waiting for RUNNING state."
+ " Failing " + "fast..");
break;
}
- if (!(currentState.equals(State.RUNNING_PARTIAL) || currentState.equals(State.RUNNING_ALL))) {
- LOG.warn("Current state: {}. Desired state: {}. {}/{} instances.", currentState,
- runningNodesThreshold == 1.0f ? State.RUNNING_ALL : State.RUNNING_PARTIAL,
- statusServiceDriver.appStatusBuilder.getLiveInstances(),
- statusServiceDriver.appStatusBuilder.getDesiredInstances());
+ if (!(currentState.equals(LlapStatusHelpers.State.RUNNING_PARTIAL) || currentState.equals(
+ LlapStatusHelpers.State.RUNNING_ALL))) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug(
+ "Current state: {}. Desired state: {}. {}/{} instances.",
+ currentState,
+ runningNodesThreshold == 1.0f ?
+ LlapStatusHelpers.State.RUNNING_ALL :
+ LlapStatusHelpers.State.RUNNING_PARTIAL,
+ statusServiceDriver.appStatusBuilder.getLiveInstances(),
+ statusServiceDriver.appStatusBuilder
+ .getDesiredInstances());
+ }
numAttempts--;
continue;
}
@@ -1001,11 +953,17 @@ public class LlapStatusServiceDriver {
if (desiredInstances > 0) {
final float ratio = (float) liveInstances / (float) desiredInstances;
if (ratio < runningNodesThreshold) {
- LOG.warn("Waiting until running nodes threshold is reached. Current: {} Desired: {}." +
- " {}/{} instances.", new DecimalFormat("#.###").format(ratio),
- new DecimalFormat("#.###").format(runningNodesThreshold),
- statusServiceDriver.appStatusBuilder.getLiveInstances(),
- statusServiceDriver.appStatusBuilder.getDesiredInstances());
+ if (LOG.isDebugEnabled()) {
+ LOG.debug(
+ "Waiting until running nodes threshold is reached. Current: {} Desired: {}." +
+ " {}/{} instances.",
+ new DecimalFormat("#.###").format(ratio),
+ new DecimalFormat("#.###")
+ .format(runningNodesThreshold),
+ statusServiceDriver.appStatusBuilder.getLiveInstances(),
+ statusServiceDriver.appStatusBuilder
+ .getDesiredInstances());
+ }
numAttempts--;
continue;
} else {
@@ -1036,18 +994,14 @@ public class LlapStatusServiceDriver {
}
break;
} finally {
- if (watchMode) {
- try {
- Thread.sleep(refreshInterval);
- } catch (InterruptedException e) {
- // ignore
- }
- } else {
- // reported once, so break
- break;
- }
+ // TODO Remove this before commit.
}
}
+ // Log final state to CONSOLE_LOGGER
+ LlapStatusServiceDriver
+ .maybeLogSummary(clock, 0L, statusServiceDriver,
+ watchMode, watchTimeout, launchingState);
+ CONSOLE_LOGGER.info("\n\n\n");
// print current state before exiting
statusServiceDriver.outputJson(pw);
os.flush();
@@ -1059,7 +1013,8 @@ public class LlapStatusServiceDriver {
} catch (Throwable t) {
logError(t);
if (t instanceof LlapStatusCliException) {
- LlapStatusCliException ce = (LlapStatusCliException) t;
+ LlapStatusCliException
+ ce = (LlapStatusCliException) t;
ret = ce.getExitCode().getInt();
} else {
ret = ExitCode.INTERNAL_ERROR.getInt();
@@ -1074,6 +1029,40 @@ public class LlapStatusServiceDriver {
System.exit(ret);
}
+ private static long maybeLogSummary(Clock clock, long lastSummaryLogTime,
+ LlapStatusServiceDriver statusServiceDriver,
+ boolean watchMode, long watchTimeout, LlapStatusHelpers.State launchingState) {
+ long currentTime = clock.getTime();
+ if (lastSummaryLogTime < currentTime - LOG_SUMMARY_INTERVAL) {
+ String diagString = null;
+ if (launchingState == null && statusServiceDriver.appStatusBuilder.getState() ==
+ LlapStatusHelpers.State.COMPLETE && watchMode) {
+ // First known state was COMPLETED. Wait for the app launch to start.
+ diagString = "Awaiting LLAP launch";
+ // Clear completed instances in this case. Don't want to provide information from the previous run.
+ statusServiceDriver.appStatusBuilder.clearCompletedLlapInstances();
+ } else {
+ diagString = constructDiagnostics(statusServiceDriver.appStatusBuilder);
+ }
+
+ if (lastSummaryLogTime == -1) {
+ if (watchMode) {
+ CONSOLE_LOGGER.info("\nLLAPSTATUS WatchMode with timeout={} s",
+ TimeUnit.SECONDS.convert(watchTimeout, TimeUnit.MILLISECONDS));
+ } else {
+ CONSOLE_LOGGER.info("\nLLAPSTATUS");
+ }
+ CONSOLE_LOGGER.info(
+ "--------------------------------------------------------------------------------");
+ }
+ CONSOLE_LOGGER.info(diagString);
+ CONSOLE_LOGGER.info(
+ "--------------------------------------------------------------------------------");
+ lastSummaryLogTime = currentTime;
+ }
+ return lastSummaryLogTime;
+ }
+
private void close() {
if (sliderClient != null) {
sliderClient.stop();
http://git-wip-us.apache.org/repos/asf/hive/blob/187eb760/llap-server/src/java/org/apache/hadoop/hive/llap/cli/status/LlapStatusHelpers.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/cli/status/LlapStatusHelpers.java b/llap-server/src/java/org/apache/hadoop/hive/llap/cli/status/LlapStatusHelpers.java
new file mode 100644
index 0000000..187f4c3
--- /dev/null
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/cli/status/LlapStatusHelpers.java
@@ -0,0 +1,449 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.llap.cli.status;
+
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.hadoop.hive.llap.cli.LlapStatusServiceDriver;
+import org.codehaus.jackson.annotate.JsonIgnore;
+
+public class LlapStatusHelpers {
+ public enum State {
+ APP_NOT_FOUND, LAUNCHING,
+ RUNNING_PARTIAL,
+ RUNNING_ALL, COMPLETE, UNKNOWN
+ }
+
+ public static class AmInfo {
+ private String appName;
+ private String appType;
+ private String appId;
+ private String containerId;
+ private String hostname;
+ private String amWebUrl;
+
+ public AmInfo setAppName(String appName) {
+ this.appName = appName;
+ return this;
+ }
+
+ public AmInfo setAppType(String appType) {
+ this.appType = appType;
+ return this;
+ }
+
+ public AmInfo setAppId(String appId) {
+ this.appId = appId;
+ return this;
+ }
+
+ public AmInfo setContainerId(String containerId) {
+ this.containerId = containerId;
+ return this;
+ }
+
+ public AmInfo setHostname(String hostname) {
+ this.hostname = hostname;
+ return this;
+ }
+
+ public AmInfo setAmWebUrl(String amWebUrl) {
+ this.amWebUrl = amWebUrl;
+ return this;
+ }
+
+ public String getAppName() {
+ return appName;
+ }
+
+ public String getAppType() {
+ return appType;
+ }
+
+ public String getAppId() {
+ return appId;
+ }
+
+ public String getContainerId() {
+ return containerId;
+ }
+
+ public String getHostname() {
+ return hostname;
+ }
+
+ public String getAmWebUrl() {
+ return amWebUrl;
+ }
+
+ @Override
+ public String toString() {
+ return "AmInfo{" +
+ "appName='" + appName + '\'' +
+ ", appType='" + appType + '\'' +
+ ", appId='" + appId + '\'' +
+ ", containerId='" + containerId + '\'' +
+ ", hostname='" + hostname + '\'' +
+ ", amWebUrl='" + amWebUrl + '\'' +
+ '}';
+ }
+ }
+
+ public static class LlapInstance {
+ private final String hostname;
+ private final String containerId;
+ private String logUrl;
+
+ // Only for live instances.
+ private String statusUrl;
+ private String webUrl;
+ private Integer rpcPort;
+ private Integer mgmtPort;
+ private Integer shufflePort;
+
+ // For completed instances
+ private String diagnostics;
+ private int yarnContainerExitStatus;
+
+ // TODO HIVE-13454 Add additional information such as #executors, container size, etc
+
+ public LlapInstance(String hostname, String containerId) {
+ this.hostname = hostname;
+ this.containerId = containerId;
+ }
+
+ public LlapInstance setLogUrl(String logUrl) {
+ this.logUrl = logUrl;
+ return this;
+ }
+
+ public LlapInstance setWebUrl(String webUrl) {
+ this.webUrl = webUrl;
+ return this;
+ }
+
+ public LlapInstance setStatusUrl(String statusUrl) {
+ this.statusUrl = statusUrl;
+ return this;
+ }
+
+ public LlapInstance setRpcPort(int rpcPort) {
+ this.rpcPort = rpcPort;
+ return this;
+ }
+
+ public LlapInstance setMgmtPort(int mgmtPort) {
+ this.mgmtPort = mgmtPort;
+ return this;
+ }
+
+ public LlapInstance setShufflePort(int shufflePort) {
+ this.shufflePort = shufflePort;
+ return this;
+ }
+
+ public LlapInstance setDiagnostics(String diagnostics) {
+ this.diagnostics = diagnostics;
+ return this;
+ }
+
+ public LlapInstance setYarnContainerExitStatus(int yarnContainerExitStatus) {
+ this.yarnContainerExitStatus = yarnContainerExitStatus;
+ return this;
+ }
+
+ public String getHostname() {
+ return hostname;
+ }
+
+ public String getLogUrl() {
+ return logUrl;
+ }
+
+ public String getStatusUrl() {
+ return statusUrl;
+ }
+
+ public String getContainerId() {
+ return containerId;
+ }
+
+ public String getWebUrl() {
+ return webUrl;
+ }
+
+ public Integer getRpcPort() {
+ return rpcPort;
+ }
+
+ public Integer getMgmtPort() {
+ return mgmtPort;
+ }
+
+ public Integer getShufflePort() {
+ return shufflePort;
+ }
+
+ public String getDiagnostics() {
+ return diagnostics;
+ }
+
+ public int getYarnContainerExitStatus() {
+ return yarnContainerExitStatus;
+ }
+
+ @Override
+ public String toString() {
+ return "LlapInstance{" +
+ "hostname='" + hostname + '\'' +
+ "logUrl=" + logUrl + '\'' +
+ ", containerId='" + containerId + '\'' +
+ ", statusUrl='" + statusUrl + '\'' +
+ ", webUrl='" + webUrl + '\'' +
+ ", rpcPort=" + rpcPort +
+ ", mgmtPort=" + mgmtPort +
+ ", shufflePort=" + shufflePort +
+ ", diagnostics=" + diagnostics +
+ ", yarnContainerExitStatus=" + yarnContainerExitStatus +
+ '}';
+ }
+ }
+
+ public static final class AppStatusBuilder {
+
+ private AmInfo amInfo;
+ private State state = State.UNKNOWN;
+ private String diagnostics;
+ private String originalConfigurationPath;
+ private String generatedConfigurationPath;
+
+ private Integer desiredInstances = null;
+ private Integer liveInstances = null;
+ private Integer launchingInstances = null;
+
+
+ private Long appStartTime;
+ private Long appFinishTime;
+
+ private boolean runningThresholdAchieved = false;
+
+ private final List<LlapInstance> runningInstances = new LinkedList<>();
+ private final List<LlapInstance> completedInstances = new LinkedList<>();
+
+ private transient final Map<String, LlapInstance>
+ containerToRunningInstanceMap = new HashMap<>();
+ private transient final Map<String, LlapInstance>
+ containerToCompletedInstanceMap = new HashMap<>();
+
+ public void setAmInfo(AmInfo amInfo) {
+ this.amInfo = amInfo;
+ }
+
+ public AppStatusBuilder setState(
+ State state) {
+ this.state = state;
+ return this;
+ }
+
+ public AppStatusBuilder setDiagnostics(String diagnostics) {
+ this.diagnostics = diagnostics;
+ return this;
+ }
+
+ public AppStatusBuilder setOriginalConfigurationPath(String originalConfigurationPath) {
+ this.originalConfigurationPath = originalConfigurationPath;
+ return this;
+ }
+
+ public AppStatusBuilder setGeneratedConfigurationPath(String generatedConfigurationPath) {
+ this.generatedConfigurationPath = generatedConfigurationPath;
+ return this;
+ }
+
+ public AppStatusBuilder setAppStartTime(long appStartTime) {
+ this.appStartTime = appStartTime;
+ return this;
+ }
+
+ public AppStatusBuilder setAppFinishTime(long finishTime) {
+ this.appFinishTime = finishTime;
+ return this;
+ }
+
+ public void setRunningThresholdAchieved(boolean runningThresholdAchieved) {
+ this.runningThresholdAchieved = runningThresholdAchieved;
+ }
+
+ public AppStatusBuilder setDesiredInstances(int desiredInstances) {
+ this.desiredInstances = desiredInstances;
+ return this;
+ }
+
+ public AppStatusBuilder setLiveInstances(int liveInstances) {
+ this.liveInstances = liveInstances;
+ return this;
+ }
+
+ public AppStatusBuilder setLaunchingInstances(int launchingInstances) {
+ this.launchingInstances = launchingInstances;
+ return this;
+ }
+
+ public AppStatusBuilder addNewRunningLlapInstance(LlapInstance llapInstance) {
+ this.runningInstances.add(llapInstance);
+ this.containerToRunningInstanceMap
+ .put(llapInstance.getContainerId(), llapInstance);
+ return this;
+ }
+
+ public LlapInstance removeAndGetRunningLlapInstanceForContainer(String containerIdString) {
+ return containerToRunningInstanceMap.remove(containerIdString);
+ }
+
+ public void clearRunningLlapInstances() {
+ this.runningInstances.clear();
+ this.containerToRunningInstanceMap.clear();
+ }
+
+ public AppStatusBuilder clearAndAddPreviouslyKnownRunningInstances(List<LlapInstance> llapInstances) {
+ clearRunningLlapInstances();
+ for (LlapInstance llapInstance : llapInstances) {
+ addNewRunningLlapInstance(llapInstance);
+ }
+ return this;
+ }
+
+ @JsonIgnore
+ public List<LlapInstance> allRunningInstances() {
+ return this.runningInstances;
+ }
+
+ public AppStatusBuilder addNewCompleteLlapInstance(LlapInstance llapInstance) {
+ this.completedInstances.add(llapInstance);
+ this.containerToCompletedInstanceMap
+ .put(llapInstance.getContainerId(), llapInstance);
+ return this;
+ }
+
+ public LlapInstance removeAndGetCompletedLlapInstanceForContainer(String containerIdString) {
+ return containerToCompletedInstanceMap.remove(containerIdString);
+ }
+
+ public void clearCompletedLlapInstances() {
+ this.completedInstances.clear();
+ this.containerToCompletedInstanceMap.clear();
+ }
+
+ public AppStatusBuilder clearAndAddPreviouslyKnownCompletedInstances(List<LlapInstance> llapInstances) {
+ clearCompletedLlapInstances();
+ for (LlapInstance llapInstance : llapInstances) {
+ addNewCompleteLlapInstance(llapInstance);
+ }
+ return this;
+ }
+
+ @JsonIgnore
+ public List<LlapInstance> allCompletedInstances() {
+ return this.completedInstances;
+ }
+
+ public AmInfo getAmInfo() {
+ return amInfo;
+ }
+
+ public State getState() {
+ return state;
+ }
+
+ public String getDiagnostics() {
+ return diagnostics;
+ }
+
+ public String getOriginalConfigurationPath() {
+ return originalConfigurationPath;
+ }
+
+ public String getGeneratedConfigurationPath() {
+ return generatedConfigurationPath;
+ }
+
+ public Integer getDesiredInstances() {
+ return desiredInstances;
+ }
+
+ public Integer getLiveInstances() {
+ return liveInstances;
+ }
+
+ public Integer getLaunchingInstances() {
+ return launchingInstances;
+ }
+
+ public Long getAppStartTime() {
+ return appStartTime;
+ }
+
+ public Long getAppFinishTime() {
+ return appFinishTime;
+ }
+
+ public boolean isRunningThresholdAchieved() {
+ return runningThresholdAchieved;
+ }
+
+ public List<LlapInstance> getRunningInstances() {
+ return runningInstances;
+ }
+
+ public List<LlapInstance> getCompletedInstances() {
+ return completedInstances;
+ }
+
+ @JsonIgnore
+ public AmInfo maybeCreateAndGetAmInfo() {
+ if (amInfo == null) {
+ amInfo = new AmInfo();
+ }
+ return amInfo;
+ }
+
+ @Override
+ public String toString() {
+ return "AppStatusBuilder{" +
+ "amInfo=" + amInfo +
+ ", state=" + state +
+ ", diagnostics=" + diagnostics +
+ ", originalConfigurationPath='" + originalConfigurationPath + '\'' +
+ ", generatedConfigurationPath='" + generatedConfigurationPath + '\'' +
+ ", desiredInstances=" + desiredInstances +
+ ", liveInstances=" + liveInstances +
+ ", launchingInstances=" + launchingInstances +
+ ", appStartTime=" + appStartTime +
+ ", appFinishTime=" + appFinishTime +
+ ", runningThresholdAchieved=" + runningThresholdAchieved +
+ ", runningInstances=" + runningInstances +
+ ", completedInstances=" + completedInstances +
+ ", containerToRunningInstanceMap=" + containerToRunningInstanceMap +
+ '}';
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/hive/blob/187eb760/llap-server/src/java/org/apache/hadoop/hive/llap/configuration/LlapDaemonConfiguration.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/configuration/LlapDaemonConfiguration.java b/llap-server/src/java/org/apache/hadoop/hive/llap/configuration/LlapDaemonConfiguration.java
index 88f3b19..7219d36 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/configuration/LlapDaemonConfiguration.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/configuration/LlapDaemonConfiguration.java
@@ -36,7 +36,7 @@ public class LlapDaemonConfiguration extends Configuration {
public static final String[] SSL_DAEMON_CONFIGS = { "ssl-server.xml" };
public LlapDaemonConfiguration() {
- super(false);
+ super(true); // Load the defaults.
for (String conf : DAEMON_CONFIGS) {
addResource(conf);
}
http://git-wip-us.apache.org/repos/asf/hive/blob/187eb760/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/ContainerRunnerImpl.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/ContainerRunnerImpl.java b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/ContainerRunnerImpl.java
index 82bbcf3..e030a76 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/ContainerRunnerImpl.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/ContainerRunnerImpl.java
@@ -524,4 +524,8 @@ public class ContainerRunnerImpl extends CompositeService implements ContainerRu
return queryId + "-" + dagIndex;
}
+ public int getNumActive() {
+ return executorService.getNumActive();
+ }
+
}
http://git-wip-us.apache.org/repos/asf/hive/blob/187eb760/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/EvictingPriorityBlockingQueue.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/EvictingPriorityBlockingQueue.java b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/EvictingPriorityBlockingQueue.java
index a80bb9b..8fe59d4 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/EvictingPriorityBlockingQueue.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/EvictingPriorityBlockingQueue.java
@@ -53,6 +53,11 @@ public class EvictingPriorityBlockingQueue<E> {
currentSize++;
return null;
} else {
+ if (isEmpty()) {
+ // Empty queue. But no capacity available, due to waitQueueSize and additionalElementsAllowed
+ // Return the element.
+ return e;
+ }
// No capacity. Check if an element needs to be evicted.
E last = deque.peekLast();
if (comparator.compare(e, last) < 0) {
http://git-wip-us.apache.org/repos/asf/hive/blob/187eb760/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapDaemon.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapDaemon.java b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapDaemon.java
index 95bc675..aae146e 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapDaemon.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapDaemon.java
@@ -15,14 +15,17 @@
package org.apache.hadoop.hive.llap.daemon.impl;
import org.apache.hadoop.hive.llap.LlapOutputFormatService;
+
import java.io.IOException;
import java.lang.management.ManagementFactory;
import java.lang.management.MemoryPoolMXBean;
import java.lang.management.MemoryType;
import java.net.InetSocketAddress;
import java.net.URL;
+import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Arrays;
+import java.util.Date;
import java.util.List;
import java.util.Set;
import java.util.concurrent.atomic.AtomicLong;
@@ -38,6 +41,7 @@ import org.apache.hadoop.hive.common.UgiFactory;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
import org.apache.hadoop.hive.llap.DaemonId;
+import org.apache.hadoop.hive.llap.LlapDaemonInfo;
import org.apache.hadoop.hive.llap.LlapUtil;
import org.apache.hadoop.hive.llap.configuration.LlapDaemonConfiguration;
import org.apache.hadoop.hive.llap.daemon.ContainerRunner;
@@ -173,28 +177,33 @@ public class LlapDaemon extends CompositeService implements ContainerRunner, Lla
daemonConf, ConfVars.LLAP_DAEMON_TASK_SCHEDULER_WAIT_QUEUE_SIZE);
boolean enablePreemption = HiveConf.getBoolVar(
daemonConf, ConfVars.LLAP_DAEMON_TASK_SCHEDULER_ENABLE_PREEMPTION);
- LOG.warn("Attempting to start LlapDaemonConf with the following configuration: " +
- "maxJvmMemory=" + maxJvmMemory + " ("
- + LlapUtil.humanReadableByteCount(maxJvmMemory) + ")" +
- ", requestedExecutorMemory=" + executorMemoryBytes +
- " (" + LlapUtil.humanReadableByteCount(executorMemoryBytes) + ")" +
- ", llapIoCacheSize=" + ioMemoryBytes + " ("
- + LlapUtil.humanReadableByteCount(ioMemoryBytes) + ")" +
- ", xmxHeadRoomMemory=" + xmxHeadRoomBytes + " ("
- + LlapUtil.humanReadableByteCount(xmxHeadRoomBytes) + ")" +
- ", adjustedExecutorMemory=" + executorMemoryPerInstance +
- " (" + LlapUtil.humanReadableByteCount(executorMemoryPerInstance) + ")" +
- ", numExecutors=" + numExecutors +
- ", llapIoEnabled=" + ioEnabled +
- ", llapIoCacheIsDirect=" + isDirectCache +
- ", rpcListenerPort=" + srvPort +
- ", mngListenerPort=" + mngPort +
- ", webPort=" + webPort +
- ", outputFormatSvcPort=" + outputFormatServicePort +
- ", workDirs=" + Arrays.toString(localDirs) +
- ", shufflePort=" + shufflePort +
- ", waitQueueSize= " + waitQueueSize +
- ", enablePreemption= " + enablePreemption);
+ final String logMsg = "Attempting to start LlapDaemon with the following configuration: " +
+ "maxJvmMemory=" + maxJvmMemory + " ("
+ + LlapUtil.humanReadableByteCount(maxJvmMemory) + ")" +
+ ", requestedExecutorMemory=" + executorMemoryBytes +
+ " (" + LlapUtil.humanReadableByteCount(executorMemoryBytes) + ")" +
+ ", llapIoCacheSize=" + ioMemoryBytes + " ("
+ + LlapUtil.humanReadableByteCount(ioMemoryBytes) + ")" +
+ ", xmxHeadRoomMemory=" + xmxHeadRoomBytes + " ("
+ + LlapUtil.humanReadableByteCount(xmxHeadRoomBytes) + ")" +
+ ", adjustedExecutorMemory=" + executorMemoryPerInstance +
+ " (" + LlapUtil.humanReadableByteCount(executorMemoryPerInstance) + ")" +
+ ", numExecutors=" + numExecutors +
+ ", llapIoEnabled=" + ioEnabled +
+ ", llapIoCacheIsDirect=" + isDirectCache +
+ ", rpcListenerPort=" + srvPort +
+ ", mngListenerPort=" + mngPort +
+ ", webPort=" + webPort +
+ ", outputFormatSvcPort=" + outputFormatServicePort +
+ ", workDirs=" + Arrays.toString(localDirs) +
+ ", shufflePort=" + shufflePort +
+ ", waitQueueSize= " + waitQueueSize +
+ ", enablePreemption= " + enablePreemption;
+ LOG.info(logMsg);
+ final String currTSISO8601 = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSSZ").format(new Date());
+ // Time based log retrieval may not fetch the above log line so logging to stderr for debugging purpose.
+ System.err.println(currTSISO8601 + " " + logMsg);
+
long memRequired =
executorMemoryBytes + (ioEnabled && isDirectCache == false ? ioMemoryBytes : 0);
@@ -256,8 +265,7 @@ public class LlapDaemon extends CompositeService implements ContainerRunner, Lla
this.metrics.setCacheMemoryPerInstance(ioMemoryBytes);
this.metrics.setJvmMaxMemory(maxJvmMemory);
this.metrics.setWaitQueueSize(waitQueueSize);
- // TODO: Has to be reverted in HIVE-15644
- //this.metrics.getJvmMetrics().setPauseMonitor(pauseMonitor);
+ this.metrics.getJvmMetrics().setPauseMonitor(pauseMonitor);
this.llapDaemonInfoBean = MBeans.register("LlapDaemon", "LlapDaemonInfo", this);
LOG.info("Started LlapMetricsSystem with displayName: " + displayName +
" sessionId: " + sessionId);
@@ -336,7 +344,7 @@ public class LlapDaemon extends CompositeService implements ContainerRunner, Lla
System.setProperty("isThreadContextMapInheritable", "true");
Configurator.initialize("LlapDaemonLog4j2", llap_l4j2.toString());
long end = System.currentTimeMillis();
- LOG.warn("LLAP daemon logging initialized from {} in {} ms. Async: {}",
+ LOG.debug("LLAP daemon logging initialized from {} in {} ms. Async: {}",
llap_l4j2, (end - start), async);
} else {
throw new RuntimeException("Log initialization failed." +
@@ -378,7 +386,7 @@ public class LlapDaemon extends CompositeService implements ContainerRunner, Lla
"$$$$$$$$\\ $$$$$$$$\\ $$ | $$ |$$ |\n" +
"\\________|\\________|\\__| \\__|\\__|\n" +
"\n";
- LOG.warn("\n\n" + asciiArt);
+ LOG.info("\n\n" + asciiArt);
}
@Override
@@ -500,8 +508,6 @@ public class LlapDaemon extends CompositeService implements ContainerRunner, Lla
nmHost, nmPort);
}
- int numExecutors = HiveConf.getIntVar(daemonConf, ConfVars.LLAP_DAEMON_NUM_EXECUTORS);
-
String workDirsString = System.getenv(ApplicationConstants.Environment.LOCAL_DIRS.name());
String localDirList = LlapUtil.getDaemonLocalDirString(daemonConf, workDirsString);
@@ -512,16 +518,19 @@ public class LlapDaemon extends CompositeService implements ContainerRunner, Lla
int shufflePort = daemonConf
.getInt(ShuffleHandler.SHUFFLE_PORT_CONFIG_KEY, ShuffleHandler.DEFAULT_SHUFFLE_PORT);
int webPort = HiveConf.getIntVar(daemonConf, ConfVars.LLAP_DAEMON_WEB_PORT);
- long executorMemoryBytes = HiveConf.getIntVar(
- daemonConf, ConfVars.LLAP_DAEMON_MEMORY_PER_INSTANCE_MB) * 1024l * 1024l;
- long ioMemoryBytes = HiveConf.getSizeVar(daemonConf, ConfVars.LLAP_IO_MEMORY_MAX_SIZE);
- boolean isDirectCache = HiveConf.getBoolVar(daemonConf, ConfVars.LLAP_ALLOCATOR_DIRECT);
- boolean isLlapIo = HiveConf.getBoolVar(daemonConf, HiveConf.ConfVars.LLAP_IO_ENABLED, true);
+
+ LlapDaemonInfo.initialize(appName, daemonConf);
+
+ int numExecutors = LlapDaemonInfo.INSTANCE.getNumExecutors();
+ long executorMemoryBytes = LlapDaemonInfo.INSTANCE.getExecutorMemory();
+ long ioMemoryBytes = LlapDaemonInfo.INSTANCE.getCacheSize();
+ boolean isDirectCache = LlapDaemonInfo.INSTANCE.isDirectCache();
+ boolean isLlapIo = LlapDaemonInfo.INSTANCE.isLlapIo();
LlapDaemon.initializeLogging(daemonConf);
- llapDaemon = new LlapDaemon(daemonConf, numExecutors, executorMemoryBytes, isLlapIo,
- isDirectCache, ioMemoryBytes, localDirs, rpcPort, mngPort, shufflePort, webPort,
- appName);
+ llapDaemon =
+ new LlapDaemon(daemonConf, numExecutors, executorMemoryBytes, isLlapIo, isDirectCache,
+ ioMemoryBytes, localDirs, rpcPort, mngPort, shufflePort, webPort, appName);
LOG.info("Adding shutdown hook for LlapDaemon");
ShutdownHookManager.addShutdownHook(new CompositeServiceShutdownHook(llapDaemon), 1);
@@ -532,7 +541,7 @@ public class LlapDaemon extends CompositeService implements ContainerRunner, Lla
// Relying on the RPC threads to keep the service alive.
} catch (Throwable t) {
// TODO Replace this with a ExceptionHandler / ShutdownHook
- LOG.warn("Failed to start LLAP Daemon with exception", t);
+ LOG.error("Failed to start LLAP Daemon with exception", t);
if (llapDaemon != null) {
llapDaemon.shutdown();
}
@@ -601,6 +610,11 @@ public class LlapDaemon extends CompositeService implements ContainerRunner, Lla
}
@Override
+ public int getNumActive() {
+ return containerRunner.getNumActive();
+ }
+
+ @Override
public long getExecutorMemoryPerInstance() {
return executorMemoryPerInstance;
}
http://git-wip-us.apache.org/repos/asf/hive/blob/187eb760/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapDaemonMXBean.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapDaemonMXBean.java b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapDaemonMXBean.java
index d6449db..22cfc9e 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapDaemonMXBean.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapDaemonMXBean.java
@@ -40,6 +40,12 @@ public interface LlapDaemonMXBean {
public int getNumExecutors();
/**
+ * Gets the number of active executors.
+ * @return number of active executors
+ */
+ public int getNumActive();
+
+ /**
* Gets the shuffle port.
* @return the shuffle port
*/
http://git-wip-us.apache.org/repos/asf/hive/blob/187eb760/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/Scheduler.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/Scheduler.java b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/Scheduler.java
index fd6234a..f199593 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/Scheduler.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/Scheduler.java
@@ -45,5 +45,7 @@ public interface Scheduler<T> {
Set<String> getExecutorsStatus();
+ int getNumActive();
+
QueryIdentifier findQueryByFragment(String fragmentId);
}
http://git-wip-us.apache.org/repos/asf/hive/blob/187eb760/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskExecutorService.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskExecutorService.java b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskExecutorService.java
index 9eaa7d7..7f8c947 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskExecutorService.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskExecutorService.java
@@ -201,6 +201,20 @@ public class TaskExecutorService extends AbstractService
};
@Override
+ public int getNumActive() {
+ int result = 0;
+ for (Map.Entry<String, TaskWrapper> e : knownTasks.entrySet()) {
+ TaskWrapper task = e.getValue();
+ if (task.isInWaitQueue()) continue;
+ TaskRunnerCallable c = task.getTaskRunnerCallable();
+ // Count the tasks in intermediate state as waiting.
+ if (c == null || c.getStartTime() == 0) continue;
+ ++result;
+ }
+ return result;
+ }
+
+ @Override
public Set<String> getExecutorsStatus() {
// TODO Change this method to make the output easier to parse (parse programmatically)
Set<String> result = new LinkedHashSet<>();
@@ -277,7 +291,8 @@ public class TaskExecutorService extends AbstractService
}
// If the task cannot finish and if no slots are available then don't schedule it.
// Also don't wait if we have a task and we just killed something to schedule it.
- boolean shouldWait = numSlotsAvailable.get() == 0 && lastKillTimeMs == null;
+ // (numSlotsAvailable can go negative, if the callback after the thread completes is delayed)
+ boolean shouldWait = numSlotsAvailable.get() <= 0 && lastKillTimeMs == null;
if (task.getTaskRunnerCallable().canFinish()) {
if (isDebugEnabled) {
LOG.debug("Attempting to schedule task {}, canFinish={}. Current state: "
@@ -728,8 +743,8 @@ public class TaskExecutorService extends AbstractService
knownTasks.remove(taskWrapper.getRequestId());
taskWrapper.setIsInPreemptableQueue(false);
taskWrapper.maybeUnregisterForFinishedStateNotifications();
- taskWrapper.getTaskRunnerCallable().getCallback().onSuccess(result);
updatePreemptionListAndNotify(result.getEndReason());
+ taskWrapper.getTaskRunnerCallable().getCallback().onSuccess(result);
}
@Override
@@ -742,8 +757,8 @@ public class TaskExecutorService extends AbstractService
knownTasks.remove(taskWrapper.getRequestId());
taskWrapper.setIsInPreemptableQueue(false);
taskWrapper.maybeUnregisterForFinishedStateNotifications();
- taskWrapper.getTaskRunnerCallable().getCallback().onFailure(t);
updatePreemptionListAndNotify(null);
+ taskWrapper.getTaskRunnerCallable().getCallback().onFailure(t);
LOG.error("Failed notification received: Stacktrace: " + ExceptionUtils.getStackTrace(t));
}
http://git-wip-us.apache.org/repos/asf/hive/blob/187eb760/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskRunnerCallable.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskRunnerCallable.java b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskRunnerCallable.java
index c077d75..1669815 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskRunnerCallable.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskRunnerCallable.java
@@ -407,6 +407,7 @@ public class TaskRunnerCallable extends CallableWithNdc<TaskRunner2Result> {
taskReporter.shutdown();
}
if (umbilical != null) {
+ // TODO: Can this be moved out of the main callback path
RPC.stopProxy(umbilical);
}
}
http://git-wip-us.apache.org/repos/asf/hive/blob/187eb760/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/services/impl/LlapIoMemoryServlet.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/services/impl/LlapIoMemoryServlet.java b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/services/impl/LlapIoMemoryServlet.java
new file mode 100644
index 0000000..3386cb4
--- /dev/null
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/services/impl/LlapIoMemoryServlet.java
@@ -0,0 +1,88 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.llap.daemon.services.impl;
+
+import java.io.PrintWriter;
+
+import javax.servlet.ServletException;
+import javax.servlet.http.HttpServlet;
+import javax.servlet.http.HttpServletRequest;
+import javax.servlet.http.HttpServletResponse;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hive.llap.io.api.LlapIo;
+import org.apache.hadoop.hive.llap.io.api.LlapProxy;
+import org.apache.hive.http.HttpServer;
+
+@SuppressWarnings("serial")
+public class LlapIoMemoryServlet extends HttpServlet {
+
+ private static final Log LOG = LogFactory.getLog(LlapIoMemoryServlet.class);
+ static final String ACCESS_CONTROL_ALLOW_METHODS = "Access-Control-Allow-Methods";
+ static final String ACCESS_CONTROL_ALLOW_ORIGIN = "Access-Control-Allow-Origin";
+
+ /**
+ * Initialize this servlet.
+ */
+ @Override
+ public void init() throws ServletException {
+ }
+
+ /**
+ * Process a GET request for the specified resource.
+ *
+ * @param request
+ * The servlet request we are processing
+ * @param response
+ * The servlet response we are creating
+ */
+ @Override
+ public void doGet(HttpServletRequest request, HttpServletResponse response) {
+ try {
+ if (!HttpServer.isInstrumentationAccessAllowed(getServletContext(), request, response)) {
+ return;
+ }
+ PrintWriter writer = null;
+
+ try {
+ response.setContentType("text/plain; charset=utf8");
+ response.setHeader(ACCESS_CONTROL_ALLOW_METHODS, "GET");
+ response.setHeader(ACCESS_CONTROL_ALLOW_ORIGIN, "*");
+ response.setHeader("Cache-Control", "no-transform,public,max-age=60,s-maxage=60");
+
+ writer = response.getWriter();
+
+ LlapIo<?> llapIo = LlapProxy.getIo();
+ if (llapIo == null) {
+ writer.write("LLAP IO not found");
+ } else {
+ writer.write(llapIo.getMemoryInfo());
+ }
+
+ } finally {
+ if (writer != null) {
+ writer.close();
+ }
+ }
+ } catch (Exception e) {
+ LOG.error("Caught exception while processing llap status request", e);
+ response.setStatus(HttpServletResponse.SC_INTERNAL_SERVER_ERROR);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/hive/blob/187eb760/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/services/impl/LlapWebServices.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/services/impl/LlapWebServices.java b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/services/impl/LlapWebServices.java
index 028daa1..e896df2 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/services/impl/LlapWebServices.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/services/impl/LlapWebServices.java
@@ -102,6 +102,7 @@ public class LlapWebServices extends AbstractService {
this.http = builder.build();
this.http.addServlet("status", "/status", LlapStatusServlet.class);
this.http.addServlet("peers", "/peers", LlapPeerRegistryServlet.class);
+ this.http.addServlet("iomem", "/iomem", LlapIoMemoryServlet.class);
} catch (IOException e) {
LOG.warn("LLAP web service failed to come up", e);
}
http://git-wip-us.apache.org/repos/asf/hive/blob/187eb760/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapIoImpl.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapIoImpl.java b/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapIoImpl.java
index 7c309a4..294fb2b 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapIoImpl.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapIoImpl.java
@@ -23,13 +23,12 @@ import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import javax.management.ObjectName;
+import org.apache.hadoop.hive.llap.daemon.impl.LlapDaemon;
import org.apache.hadoop.hive.llap.daemon.impl.StatsRecordingThreadPool;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -39,8 +38,8 @@ import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
import org.apache.hadoop.hive.llap.cache.BuddyAllocator;
import org.apache.hadoop.hive.llap.cache.BufferUsageManager;
-import org.apache.hadoop.hive.llap.cache.EvictionAwareAllocator;
import org.apache.hadoop.hive.llap.cache.EvictionDispatcher;
+import org.apache.hadoop.hive.llap.cache.LlapOomDebugDump;
import org.apache.hadoop.hive.llap.cache.LowLevelCache;
import org.apache.hadoop.hive.llap.cache.LowLevelCacheImpl;
import org.apache.hadoop.hive.llap.cache.LowLevelCacheMemoryManager;
@@ -66,8 +65,6 @@ import org.apache.hadoop.mapred.InputFormat;
import org.apache.hadoop.metrics2.util.MBeans;
import com.google.common.primitives.Ints;
-import com.google.common.util.concurrent.ListeningExecutorService;
-import com.google.common.util.concurrent.MoreExecutors;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
public class LlapIoImpl implements LlapIo<VectorizedRowBatch> {
@@ -85,6 +82,7 @@ public class LlapIoImpl implements LlapIo<VectorizedRowBatch> {
private final LlapDaemonIOMetrics ioMetrics;
private ObjectName buddyAllocatorMXBean;
private final Allocator allocator;
+ private final LlapOomDebugDump memoryDump;
private LlapIoImpl(Configuration conf) throws IOException {
String ioMode = HiveConf.getVar(conf, HiveConf.ConfVars.LLAP_IO_MEMORY_MODE);
@@ -121,14 +119,36 @@ public class LlapIoImpl implements LlapIo<VectorizedRowBatch> {
if (useLowLevelCache) {
// Memory manager uses cache policy to trigger evictions, so create the policy first.
boolean useLrfu = HiveConf.getBoolVar(conf, HiveConf.ConfVars.LLAP_USE_LRFU);
- LowLevelCachePolicy cachePolicy =
- useLrfu ? new LowLevelLrfuCachePolicy(conf) : new LowLevelFifoCachePolicy(conf);
+ long totalMemorySize = HiveConf.getSizeVar(conf, ConfVars.LLAP_IO_MEMORY_MAX_SIZE);
+ int minAllocSize = (int)HiveConf.getSizeVar(conf, ConfVars.LLAP_ALLOCATOR_MIN_ALLOC);
+ float metadataFraction = HiveConf.getFloatVar(conf, ConfVars.LLAP_IO_METADATA_FRACTION);
+ long metaMem = 0;
+ // TODO: this split a workaround until HIVE-15665.
+ // Technically we don't have to do it for on-heap data cache but we'd do for testing.
+ boolean isSplitCache = metadataFraction > 0f;
+ if (isSplitCache) {
+ metaMem = (long)(LlapDaemon.getTotalHeapSize() * metadataFraction);
+ }
+ LowLevelCachePolicy cachePolicy = useLrfu ? new LowLevelLrfuCachePolicy(
+ minAllocSize, totalMemorySize, conf) : new LowLevelFifoCachePolicy();
// Allocator uses memory manager to request memory, so create the manager next.
LowLevelCacheMemoryManager memManager = new LowLevelCacheMemoryManager(
- conf, cachePolicy, cacheMetrics);
+ totalMemorySize, cachePolicy, cacheMetrics);
+ LowLevelCachePolicy metaCachePolicy = null;
+ LowLevelCacheMemoryManager metaMemManager = null;
+ if (isSplitCache) {
+ metaCachePolicy = useLrfu ? new LowLevelLrfuCachePolicy(
+ minAllocSize, metaMem, conf) : new LowLevelFifoCachePolicy();
+ metaMemManager = new LowLevelCacheMemoryManager(metaMem, metaCachePolicy, cacheMetrics);
+ } else {
+ metaCachePolicy = cachePolicy;
+ metaMemManager = memManager;
+ }
+ cacheMetrics.setCacheCapacityTotal(totalMemorySize + metaMem);
// Cache uses allocator to allocate and deallocate, create allocator and then caches.
- EvictionAwareAllocator allocator = new BuddyAllocator(conf, memManager, cacheMetrics);
+ BuddyAllocator allocator = new BuddyAllocator(conf, memManager, cacheMetrics);
this.allocator = allocator;
+ this.memoryDump = allocator;
LowLevelCacheImpl cacheImpl = new LowLevelCacheImpl(
cacheMetrics, cachePolicy, allocator, true);
cache = cacheImpl;
@@ -138,15 +158,21 @@ public class LlapIoImpl implements LlapIo<VectorizedRowBatch> {
serdeCache = serdeCacheImpl;
}
boolean useGapCache = HiveConf.getBoolVar(conf, ConfVars.LLAP_CACHE_ENABLE_ORC_GAP_CACHE);
- metadataCache = new OrcMetadataCache(memManager, cachePolicy, useGapCache);
+ metadataCache = new OrcMetadataCache(metaMemManager, metaCachePolicy, useGapCache);
// And finally cache policy uses cache to notify it of eviction. The cycle is complete!
- cachePolicy.setEvictionListener(new EvictionDispatcher(
- cache, serdeCache, metadataCache, allocator));
- cachePolicy.setParentDebugDumper(cacheImpl);
+ EvictionDispatcher e = new EvictionDispatcher(cache, serdeCache, metadataCache, allocator);
+ if (isSplitCache) {
+ metaCachePolicy.setEvictionListener(e);
+ metaCachePolicy.setParentDebugDumper(e);
+ }
+ cachePolicy.setEvictionListener(e);
+ cachePolicy.setParentDebugDumper(e);
+
cacheImpl.startThreads(); // Start the cache threads.
bufferManager = cacheImpl; // Cache also serves as buffer manager.
} else {
this.allocator = new SimpleAllocator(conf);
+ memoryDump = null;
SimpleBufferManager sbm = new SimpleBufferManager(allocator, cacheMetrics);
bufferManager = sbm;
cache = sbm;
@@ -171,6 +197,14 @@ public class LlapIoImpl implements LlapIo<VectorizedRowBatch> {
buddyAllocatorMXBean = MBeans.register("LlapDaemon", "BuddyAllocatorInfo", allocator);
}
+ @Override
+ public String getMemoryInfo() {
+ if (memoryDump == null) return "\nNot using the allocator";
+ StringBuilder sb = new StringBuilder();
+ memoryDump.debugDumpShort(sb);
+ return sb.toString();
+ }
+
@SuppressWarnings("rawtypes")
@Override
public InputFormat<NullWritable, VectorizedRowBatch> getInputFormat(
http://git-wip-us.apache.org/repos/asf/hive/blob/187eb760/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/OrcColumnVectorProducer.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/OrcColumnVectorProducer.java b/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/OrcColumnVectorProducer.java
index ac031aa..121e169 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/OrcColumnVectorProducer.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/OrcColumnVectorProducer.java
@@ -79,9 +79,9 @@ public class OrcColumnVectorProducer implements ColumnVectorProducer {
cacheMetrics.incrCacheReadRequests();
OrcEncodedDataConsumer edc = new OrcEncodedDataConsumer(consumer, columnIds.size(),
_skipCorrupt, counters, ioMetrics);
- // Note: we use global conf here and ignore JobConf.
- OrcEncodedDataReader reader = new OrcEncodedDataReader(lowLevelCache, bufferManager,
- metadataCache, conf, split, columnIds, sarg, columnNames, edc, counters, readerSchema);
+ OrcEncodedDataReader reader = new OrcEncodedDataReader(
+ lowLevelCache, bufferManager, metadataCache, conf, job, split, columnIds, sarg,
+ columnNames, edc, counters, readerSchema);
edc.init(reader, reader);
return edc;
}