You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by we...@apache.org on 2017/05/08 20:43:21 UTC
[31/51] [partial] hive git commit: Revert "HIVE-14671 : merge master
into hive-14535 (Wei Zheng)"
http://git-wip-us.apache.org/repos/asf/hive/blob/ed64a74e/llap-server/src/java/org/apache/hadoop/hive/llap/cli/LlapStatusServiceDriver.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/cli/LlapStatusServiceDriver.java b/llap-server/src/java/org/apache/hadoop/hive/llap/cli/LlapStatusServiceDriver.java
index 1b57e38..b36d4ff 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/cli/LlapStatusServiceDriver.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/cli/LlapStatusServiceDriver.java
@@ -24,11 +24,11 @@ import java.io.FileOutputStream;
import java.io.IOException;
import java.io.OutputStream;
import java.io.PrintWriter;
-import java.net.URISyntaxException;
import java.text.DecimalFormat;
import java.util.Arrays;
import java.util.Collection;
import java.util.EnumSet;
+import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
@@ -41,28 +41,20 @@ import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
import org.apache.hadoop.hive.common.classification.InterfaceAudience;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.llap.cli.LlapStatusOptionsProcessor.LlapStatusOptions;
-import org.apache.hadoop.hive.llap.cli.status.LlapStatusHelpers;
-import org.apache.hadoop.hive.llap.cli.status.LlapStatusHelpers.AppStatusBuilder;
-import org.apache.hadoop.hive.llap.cli.status.LlapStatusHelpers.LlapInstance;
import org.apache.hadoop.hive.llap.configuration.LlapDaemonConfiguration;
import org.apache.hadoop.hive.llap.registry.ServiceInstance;
import org.apache.hadoop.hive.llap.registry.impl.LlapRegistryService;
import org.apache.hadoop.hive.ql.session.SessionState;
import org.apache.hadoop.yarn.api.records.ApplicationReport;
-import org.apache.hadoop.yarn.api.records.ContainerExitStatus;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
-import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.util.Clock;
import org.apache.hadoop.yarn.util.SystemClock;
import org.apache.slider.api.ClusterDescription;
import org.apache.slider.api.ClusterDescriptionKeys;
-import org.apache.slider.api.StateValues;
import org.apache.slider.api.StatusKeys;
-import org.apache.slider.api.types.ApplicationDiagnostics;
-import org.apache.slider.api.types.ContainerInformation;
import org.apache.slider.client.SliderClient;
-import org.apache.slider.common.params.ActionDiagnosticArgs;
import org.apache.slider.core.exceptions.SliderException;
+import org.codehaus.jackson.annotate.JsonIgnore;
import org.codehaus.jackson.map.ObjectMapper;
import org.codehaus.jackson.map.SerializationConfig;
import org.codehaus.jackson.map.annotate.JsonSerialize;
@@ -72,7 +64,6 @@ import org.slf4j.LoggerFactory;
public class LlapStatusServiceDriver {
private static final Logger LOG = LoggerFactory.getLogger(LlapStatusServiceDriver.class);
- private static final Logger CONSOLE_LOGGER = LoggerFactory.getLogger("LlapStatusServiceDriverConsole");
// Defining a bunch of configs here instead of in HiveConf. These are experimental, and mainly
// for use when retry handling is fixed in Yarn/Hadoop
@@ -113,8 +104,6 @@ public class LlapStatusServiceDriver {
CONF_PREFIX + "zk-registry.timeout-ms";
private static final long CONFIG_LLAP_ZK_REGISTRY_TIMEOUT_MS_DEFAULT = 20000l;
- private static final long LOG_SUMMARY_INTERVAL = 15000L; // Log summary every ~15 seconds.
-
private static final String LLAP_KEY = "LLAP";
private final Configuration conf;
private final Clock clock = new SystemClock();
@@ -172,8 +161,7 @@ public class LlapStatusServiceDriver {
* @param args
* @return command line options.
*/
- public LlapStatusOptions parseOptions(String[] args) throws
- LlapStatusCliException {
+ public LlapStatusOptions parseOptions(String[] args) throws LlapStatusCliException {
LlapStatusOptionsProcessor optionsProcessor = new LlapStatusOptionsProcessor();
LlapStatusOptions options;
@@ -221,21 +209,16 @@ public class LlapStatusServiceDriver {
}
try {
- if (sliderClient == null) {
- sliderClient = LlapSliderUtils.createSliderClient(conf);
- }
- } catch (Exception e) {
- LlapStatusCliException le = new LlapStatusCliException(
- LlapStatusServiceDriver.ExitCode.SLIDER_CLIENT_ERROR_CREATE_FAILED,
- "Failed to create slider client", e);
- logError(le);
- return le.getExitCode().getInt();
+ sliderClient = createSliderClient();
+ } catch (LlapStatusCliException e) {
+ logError(e);
+ return e.getExitCode().getInt();
}
// Get the App report from YARN
ApplicationReport appReport;
try {
- appReport = LlapSliderUtils.getAppReport(appName, sliderClient, options.getFindAppTimeoutMs());
+ appReport = getAppReport(appName, sliderClient, options.getFindAppTimeoutMs());
} catch (LlapStatusCliException e) {
logError(e);
return e.getExitCode().getInt();
@@ -252,13 +235,13 @@ public class LlapStatusServiceDriver {
if (ret != ExitCode.SUCCESS) {
return ret.getInt();
- } else if (EnumSet.of(LlapStatusHelpers.State.APP_NOT_FOUND, LlapStatusHelpers.State.COMPLETE, LlapStatusHelpers.State.LAUNCHING)
+ } else if (EnumSet.of(State.APP_NOT_FOUND, State.COMPLETE, State.LAUNCHING)
.contains(appStatusBuilder.getState())) {
return ExitCode.SUCCESS.getInt();
} else {
// Get information from slider.
try {
- ret = populateAppStatusFromSliderStatus(appName, sliderClient, appStatusBuilder);
+ ret = populateAppStatusFromSlider(appName, sliderClient, appStatusBuilder);
} catch (LlapStatusCliException e) {
// In case of failure, send back whatever is constructed sop far - which wouldbe from the AppReport
logError(e);
@@ -266,18 +249,6 @@ public class LlapStatusServiceDriver {
}
}
-
- if (ret != ExitCode.SUCCESS) {
- return ret.getInt();
- } else {
- try {
- ret = populateAppStatusFromSliderDiagnostics(appName, sliderClient, appStatusBuilder);
- } catch (LlapStatusCliException e) {
- logError(e);
- return e.getExitCode().getInt();
- }
- }
-
if (ret != ExitCode.SUCCESS) {
return ret.getInt();
} else {
@@ -297,8 +268,7 @@ public class LlapStatusServiceDriver {
}
}
- public void outputJson(PrintWriter writer) throws
- LlapStatusCliException {
+ public void outputJson(PrintWriter writer) throws LlapStatusCliException {
ObjectMapper mapper = new ObjectMapper();
mapper.configure(SerializationConfig.Feature.FAIL_ON_EMPTY_BEANS, false);
mapper.setSerializationInclusion(JsonSerialize.Inclusion.NON_NULL);
@@ -372,27 +342,25 @@ public class LlapStatusServiceDriver {
* @throws LlapStatusCliException
*/
private ExitCode processAppReport(ApplicationReport appReport,
- AppStatusBuilder appStatusBuilder) throws
- LlapStatusCliException {
+ AppStatusBuilder appStatusBuilder) throws LlapStatusCliException {
if (appReport == null) {
- appStatusBuilder.setState(LlapStatusHelpers.State.APP_NOT_FOUND);
+ appStatusBuilder.setState(State.APP_NOT_FOUND);
LOG.info("No Application Found");
return ExitCode.SUCCESS;
}
- // TODO Maybe add the YARN URL for the app.
appStatusBuilder.setAmInfo(
- new LlapStatusHelpers.AmInfo().setAppName(appReport.getName()).setAppType(appReport.getApplicationType()));
+ new AmInfo().setAppName(appReport.getName()).setAppType(appReport.getApplicationType()));
appStatusBuilder.setAppStartTime(appReport.getStartTime());
switch (appReport.getYarnApplicationState()) {
case NEW:
case NEW_SAVING:
case SUBMITTED:
- appStatusBuilder.setState(LlapStatusHelpers.State.LAUNCHING);
+ appStatusBuilder.setState(State.LAUNCHING);
return ExitCode.SUCCESS;
case ACCEPTED:
appStatusBuilder.maybeCreateAndGetAmInfo().setAppId(appReport.getApplicationId().toString());
- appStatusBuilder.setState(LlapStatusHelpers.State.LAUNCHING);
+ appStatusBuilder.setState(State.LAUNCHING);
return ExitCode.SUCCESS;
case RUNNING:
appStatusBuilder.maybeCreateAndGetAmInfo().setAppId(appReport.getApplicationId().toString());
@@ -403,13 +371,7 @@ public class LlapStatusServiceDriver {
case KILLED:
appStatusBuilder.maybeCreateAndGetAmInfo().setAppId(appReport.getApplicationId().toString());
appStatusBuilder.setAppFinishTime(appReport.getFinishTime());
- appStatusBuilder.setState(LlapStatusHelpers.State.COMPLETE);
- ApplicationDiagnostics appDiagnostics = LlapSliderUtils.getApplicationDiagnosticsFromYarnDiagnostics(appReport, LOG);
- if (appDiagnostics == null) {
- LOG.warn("AppDiagnostics not available for YARN application report");
- } else {
- processAppDiagnostics(appStatusBuilder, appDiagnostics, true);
- }
+ appStatusBuilder.setState(State.COMPLETE);
return ExitCode.SUCCESS;
default:
throw new LlapStatusCliException(ExitCode.INTERNAL_ERROR,
@@ -418,11 +380,7 @@ public class LlapStatusServiceDriver {
}
-
-
-
/**
- * Populates information from SliderStatus.
*
* @param appName
* @param sliderClient
@@ -430,7 +388,7 @@ public class LlapStatusServiceDriver {
* @return an ExitCode. An ExitCode other than ExitCode.SUCCESS implies future progress not possible
* @throws LlapStatusCliException
*/
- private ExitCode populateAppStatusFromSliderStatus(String appName, SliderClient sliderClient, AppStatusBuilder appStatusBuilder) throws
+ private ExitCode populateAppStatusFromSlider(String appName, SliderClient sliderClient, AppStatusBuilder appStatusBuilder) throws
LlapStatusCliException {
ClusterDescription clusterDescription;
@@ -492,10 +450,9 @@ public class LlapStatusServiceDriver {
String host = (String) containerParams.get("host");
- LlapInstance
- llapInstance = new LlapInstance(host, containerIdString);
+ LlapInstance llapInstance = new LlapInstance(host, containerIdString);
- appStatusBuilder.addNewRunningLlapInstance(llapInstance);
+ appStatusBuilder.addNewLlapInstance(llapInstance);
}
}
@@ -507,45 +464,8 @@ public class LlapStatusServiceDriver {
}
}
- /**
- * Populates information based on the slider diagnostics call. Must be invoked
- * after populating status from slider status.
- * @param appName
- * @param sliderClient
- * @param appStatusBuilder
- * @return
- * @throws LlapStatusCliException
- */
- private ExitCode populateAppStatusFromSliderDiagnostics(String appName,
- SliderClient sliderClient,
- AppStatusBuilder appStatusBuilder) throws
- LlapStatusCliException {
-
- ApplicationDiagnostics appDiagnostics;
- try {
- ActionDiagnosticArgs args = new ActionDiagnosticArgs();
- args.containers = true;
- args.name = appName;
- appDiagnostics =
- sliderClient.actionDiagnosticContainers(args);
- } catch (YarnException | IOException | URISyntaxException e) {
- throw new LlapStatusCliException(
- ExitCode.SLIDER_CLIENT_ERROR_OTHER,
- "Failed to get container diagnostics from slider", e);
- }
- if (appDiagnostics == null) {
- LOG.info("Slider container diagnostics not available");
- return ExitCode.SLIDER_CLIENT_ERROR_OTHER;
- }
-
- processAppDiagnostics(appStatusBuilder, appDiagnostics, false);
-
- return ExitCode.SUCCESS;
- }
/**
- * Populate additional information for containers from the LLAP registry. Must be invoked
- * after Slider status. Also after slider-diagnostics.
* @param appStatusBuilder
* @return an ExitCode. An ExitCode other than ExitCode.SUCCESS implies future progress not possible
* @throws LlapStatusCliException
@@ -571,12 +491,10 @@ public class LlapStatusServiceDriver {
}
if (serviceInstances == null || serviceInstances.isEmpty()) {
- if (LOG.isDebugEnabled()) {
- LOG.debug("No information found in the LLAP registry");
- }
+ LOG.info("No information found in the LLAP registry");
appStatusBuilder.setLiveInstances(0);
- appStatusBuilder.setState(LlapStatusHelpers.State.LAUNCHING);
- appStatusBuilder.clearRunningLlapInstances();
+ appStatusBuilder.setState(State.LAUNCHING);
+ appStatusBuilder.clearLlapInstances();
return ExitCode.SUCCESS;
} else {
// Tracks instances known by both slider and llap.
@@ -587,7 +505,7 @@ public class LlapStatusServiceDriver {
String containerIdString = serviceInstance.getProperties().get(
HiveConf.ConfVars.LLAP_DAEMON_CONTAINER_ID.varname);
- LlapInstance llapInstance = appStatusBuilder.removeAndGetRunningLlapInstanceForContainer(
+ LlapInstance llapInstance = appStatusBuilder.removeAndgetLlapInstanceForContainer(
containerIdString);
if (llapInstance != null) {
llapInstance.setMgmtPort(serviceInstance.getManagementPort());
@@ -606,185 +524,375 @@ public class LlapStatusServiceDriver {
}
appStatusBuilder.setLiveInstances(validatedInstances.size());
- appStatusBuilder.setLaunchingInstances(llapExtraInstances.size());
if (validatedInstances.size() >= appStatusBuilder.getDesiredInstances()) {
- appStatusBuilder.setState(LlapStatusHelpers.State.RUNNING_ALL);
+ appStatusBuilder.setState(State.RUNNING_ALL);
if (validatedInstances.size() > appStatusBuilder.getDesiredInstances()) {
LOG.warn("Found more entries in LLAP registry, as compared to desired entries");
}
} else {
if (validatedInstances.size() > 0) {
- appStatusBuilder.setState(LlapStatusHelpers.State.RUNNING_PARTIAL);
+ appStatusBuilder.setState(State.RUNNING_PARTIAL);
} else {
- appStatusBuilder.setState(LlapStatusHelpers.State.LAUNCHING);
+ appStatusBuilder.setState(State.LAUNCHING);
}
}
// At this point, everything that can be consumed from AppStatusBuilder has been consumed.
// Debug only
- if (appStatusBuilder.allRunningInstances().size() > 0) {
+ if (appStatusBuilder.allInstances().size() > 0) {
// Containers likely to come up soon.
- LOG.debug("Potential instances starting up: {}", appStatusBuilder.allRunningInstances());
+ LOG.debug("Potential instances starting up: {}", appStatusBuilder.allInstances());
}
if (llapExtraInstances.size() > 0) {
- // Old containers which are likely shutting down, or new containers which
- // launched between slider-status/slider-diagnostics. Skip for this iteration.
+ // Old containers which are likely shutting down
LOG.debug("Instances likely to shutdown soon: {}", llapExtraInstances);
}
- appStatusBuilder.clearAndAddPreviouslyKnownRunningInstances(validatedInstances);
+ appStatusBuilder.clearAndAddPreviouslyKnownInstances(validatedInstances);
}
return ExitCode.SUCCESS;
}
- private static void processAppDiagnostics(AppStatusBuilder appStatusBuilder,
- ApplicationDiagnostics appDiagnostics, boolean appComplete) {
- // For a running app this should be empty.
- String finalMessage = appDiagnostics.getFinalMessage();
- Collection<ContainerInformation> containerInfos =
- appDiagnostics.getContainers();
- appStatusBuilder.setDiagnostics(finalMessage);
- if (containerInfos != null) {
- for (ContainerInformation containerInformation : containerInfos) {
- if (containerInformation.getState() == StateValues.STATE_LIVE && !appComplete) {
- LlapInstance instance = appStatusBuilder
- .removeAndGetCompletedLlapInstanceForContainer(
- containerInformation.getContainerId());
- if (instance ==
- null) { // New launch. Not available during slider status, but available now.
- instance = new LlapInstance(containerInformation.getHost(),
- containerInformation.getContainerId());
- }
- instance.setLogUrl(containerInformation.getLogLink());
- appStatusBuilder.addNewRunningLlapInstance(instance);
- } else if (containerInformation.getState() ==
- StateValues.STATE_STOPPED || appComplete) {
- LlapInstance instance =
- new LlapInstance(containerInformation.getHost(),
- containerInformation.getContainerId());
- instance.setLogUrl(containerInformation.getLogLink());
- if (appComplete && containerInformation.getExitCode() !=
- ContainerExitStatus.INVALID) {
- instance
- .setYarnContainerExitStatus(containerInformation.getExitCode());
- }
- instance.setDiagnostics(containerInformation.getDiagnostics());
- appStatusBuilder.addNewCompleteLlapInstance(instance);
- } else {
- LOG.warn("Unexpected containerstate={}, for container={}",
- containerInformation.getState(), containerInformation);
- }
+ static final class AppStatusBuilder {
+
+ private AmInfo amInfo;
+ private State state = State.UNKNOWN;
+ private String originalConfigurationPath;
+ private String generatedConfigurationPath;
+
+ private int desiredInstances = -1;
+ private int liveInstances = -1;
+
+ private Long appStartTime;
+ private Long appFinishTime;
+
+ private boolean runningThresholdAchieved = false;
+
+ private final List<LlapInstance> llapInstances = new LinkedList<>();
+
+ private transient Map<String, LlapInstance> containerToInstanceMap = new HashMap<>();
+
+ public void setAmInfo(AmInfo amInfo) {
+ this.amInfo = amInfo;
+ }
+
+ public AppStatusBuilder setState(
+ State state) {
+ this.state = state;
+ return this;
+ }
+
+ public AppStatusBuilder setOriginalConfigurationPath(String originalConfigurationPath) {
+ this.originalConfigurationPath = originalConfigurationPath;
+ return this;
+ }
+
+ public AppStatusBuilder setGeneratedConfigurationPath(String generatedConfigurationPath) {
+ this.generatedConfigurationPath = generatedConfigurationPath;
+ return this;
+ }
+
+ public AppStatusBuilder setAppStartTime(long appStartTime) {
+ this.appStartTime = appStartTime;
+ return this;
+ }
+
+ public AppStatusBuilder setAppFinishTime(long finishTime) {
+ this.appFinishTime = finishTime;
+ return this;
+ }
+
+ public AppStatusBuilder setDesiredInstances(int desiredInstances) {
+ this.desiredInstances = desiredInstances;
+ return this;
+ }
+
+ public AppStatusBuilder setLiveInstances(int liveInstances) {
+ this.liveInstances = liveInstances;
+ return this;
+ }
+
+ public AppStatusBuilder addNewLlapInstance(LlapInstance llapInstance) {
+ this.llapInstances.add(llapInstance);
+ this.containerToInstanceMap.put(llapInstance.getContainerId(), llapInstance);
+ return this;
+ }
+
+ public AppStatusBuilder setRunningThresholdAchieved(boolean thresholdAchieved) {
+ this.runningThresholdAchieved = thresholdAchieved;
+ return this;
+ }
+
+ public LlapInstance removeAndgetLlapInstanceForContainer(String containerIdString) {
+ return containerToInstanceMap.remove(containerIdString);
+ }
+
+ public void clearLlapInstances() {
+ this.llapInstances.clear();
+ this.containerToInstanceMap.clear();
+ }
+
+ public AppStatusBuilder clearAndAddPreviouslyKnownInstances(List<LlapInstance> llapInstances) {
+ clearLlapInstances();
+ for (LlapInstance llapInstance : llapInstances) {
+ addNewLlapInstance(llapInstance);
}
- } else {
- if (LOG.isDebugEnabled()) {
- LOG.debug("ContainerInfos is null");
+ return this;
+ }
+
+ @JsonIgnore
+ public List<LlapInstance> allInstances() {
+ return this.llapInstances;
+ }
+
+ public AmInfo getAmInfo() {
+ return amInfo;
+ }
+
+ public State getState() {
+ return state;
+ }
+
+ public String getOriginalConfigurationPath() {
+ return originalConfigurationPath;
+ }
+
+ public String getGeneratedConfigurationPath() {
+ return generatedConfigurationPath;
+ }
+
+ public int getDesiredInstances() {
+ return desiredInstances;
+ }
+
+ public int getLiveInstances() {
+ return liveInstances;
+ }
+
+ public Long getAppStartTime() {
+ return appStartTime;
+ }
+
+ public Long getAppFinishTime() {
+ return appFinishTime;
+ }
+
+ public List<LlapInstance> getLlapInstances() {
+ return llapInstances;
+ }
+
+ public boolean isRunningThresholdAchieved() {
+ return runningThresholdAchieved;
+ }
+
+ @JsonIgnore
+ public AmInfo maybeCreateAndGetAmInfo() {
+ if (amInfo == null) {
+ amInfo = new AmInfo();
}
+ return amInfo;
+ }
+
+ @Override
+ public String toString() {
+ return "AppStatusBuilder{" +
+ "amInfo=" + amInfo +
+ ", state=" + state +
+ ", originalConfigurationPath='" + originalConfigurationPath + '\'' +
+ ", generatedConfigurationPath='" + generatedConfigurationPath + '\'' +
+ ", desiredInstances=" + desiredInstances +
+ ", liveInstances=" + liveInstances +
+ ", appStartTime=" + appStartTime +
+ ", appFinishTime=" + appFinishTime +
+ ", llapInstances=" + llapInstances +
+ ", containerToInstanceMap=" + containerToInstanceMap +
+ '}';
}
}
- private static String constructCompletedContainerDiagnostics(List<LlapInstance> completedInstances) {
- StringBuilder sb = new StringBuilder();
- if (completedInstances == null || completedInstances.size() == 0) {
- return "";
- } else {
- // TODO HIVE-15865 Ideally sort these by completion time, once that is available.
- boolean isFirst = true;
- for (LlapInstance instance : completedInstances) {
- if (!isFirst) {
- sb.append("\n");
- } else {
- isFirst = false;
- }
+ static class AmInfo {
+ private String appName;
+ private String appType;
+ private String appId;
+ private String containerId;
+ private String hostname;
+ private String amWebUrl;
+
+ public AmInfo setAppName(String appName) {
+ this.appName = appName;
+ return this;
+ }
- if (instance.getYarnContainerExitStatus() ==
- ContainerExitStatus.KILLED_EXCEEDED_PMEM ||
- instance.getYarnContainerExitStatus() ==
- ContainerExitStatus.KILLED_EXCEEDED_VMEM) {
- sb.append("\tKILLED container (by YARN for exceeding memory limits): ");
- } else {
- // TODO HIVE-15865 Handle additional reasons like OS launch failed (Slider needs to give this info)
- sb.append("\tFAILED container: ");
- }
- sb.append(" ").append(instance.getContainerId());
- sb.append(", Logs at: ").append(instance.getLogUrl());
- }
+ public AmInfo setAppType(String appType) {
+ this.appType = appType;
+ return this;
+ }
+
+ public AmInfo setAppId(String appId) {
+ this.appId = appId;
+ return this;
+ }
+
+ public AmInfo setContainerId(String containerId) {
+ this.containerId = containerId;
+ return this;
+ }
+
+ public AmInfo setHostname(String hostname) {
+ this.hostname = hostname;
+ return this;
+ }
+
+ public AmInfo setAmWebUrl(String amWebUrl) {
+ this.amWebUrl = amWebUrl;
+ return this;
+ }
+
+ public String getAppName() {
+ return appName;
+ }
+
+ public String getAppType() {
+ return appType;
+ }
+
+ public String getAppId() {
+ return appId;
+ }
+
+ public String getContainerId() {
+ return containerId;
+ }
+
+ public String getHostname() {
+ return hostname;
+ }
+
+ public String getAmWebUrl() {
+ return amWebUrl;
+ }
+
+ @Override
+ public String toString() {
+ return "AmInfo{" +
+ "appName='" + appName + '\'' +
+ ", appType='" + appType + '\'' +
+ ", appId='" + appId + '\'' +
+ ", containerId='" + containerId + '\'' +
+ ", hostname='" + hostname + '\'' +
+ ", amWebUrl='" + amWebUrl + '\'' +
+ '}';
}
- return sb.toString();
}
- /**
- * Helper method to construct a diagnostic message from a complete
- * AppStatusBuilder.
- *
- * @return
- */
- private static String constructDiagnostics(
- AppStatusBuilder appStatusBuilder) {
- StringBuilder sb = new StringBuilder();
-
- switch (appStatusBuilder.getState()) {
- case APP_NOT_FOUND:
- sb.append("LLAP status unknown. Awaiting app launch");
- break;
- case LAUNCHING:
- // This is a catch all state - when containers have not started yet, or LLAP has not started yet.
- if (StringUtils.isNotBlank(appStatusBuilder.getAmInfo().getAppId())) {
- sb.append("LLAP Starting up with AppId=")
- .append(appStatusBuilder.getAmInfo().getAppId()).append(".");
- if (appStatusBuilder.getDesiredInstances() != null) {
- sb.append(" Started 0/").append(appStatusBuilder.getDesiredInstances()).append(" instances");
- }
+ static class LlapInstance {
+ private final String hostname;
+ private final String containerId;
+ private String statusUrl;
+ private String webUrl;
+ private Integer rpcPort;
+ private Integer mgmtPort;
+ private Integer shufflePort;
- String containerDiagnostics = constructCompletedContainerDiagnostics(
- appStatusBuilder.getCompletedInstances());
- if (StringUtils.isNotEmpty(containerDiagnostics)) {
- sb.append("\n").append(containerDiagnostics);
- }
- } else {
- sb.append("Awaiting LLAP startup");
- }
- break;
- case RUNNING_PARTIAL:
- sb.append("LLAP Starting up with ApplicationId=")
- .append(appStatusBuilder.getAmInfo().getAppId());
- sb.append(" Started").append(appStatusBuilder.getLiveInstances())
- .append("/").append(appStatusBuilder.getDesiredInstances())
- .append(" instances");
- String containerDiagnostics = constructCompletedContainerDiagnostics(
- appStatusBuilder.getCompletedInstances());
- if (StringUtils.isNotEmpty(containerDiagnostics)) {
- sb.append("\n").append(containerDiagnostics);
- }
+ // TODO HIVE-13454 Add additional information such as #executors, container size, etc
- // TODO HIVE-15865: Include information about pending requests, and last allocation time
- // once Slider provides this information.
- break;
- case RUNNING_ALL:
- sb.append("LLAP Application running with ApplicationId=")
- .append(appStatusBuilder.getAmInfo().getAppId());
- break;
- case COMPLETE:
-
- sb.append("LLAP Application already complete. ApplicationId=")
- .append(appStatusBuilder.getAmInfo().getAppId());
- containerDiagnostics = constructCompletedContainerDiagnostics(
- appStatusBuilder.getCompletedInstances());
- if (StringUtils.isNotEmpty(containerDiagnostics)) {
- sb.append("\n").append(containerDiagnostics);
- }
+ public LlapInstance(String hostname, String containerId) {
+ this.hostname = hostname;
+ this.containerId = containerId;
+ }
+
+ public LlapInstance setWebUrl(String webUrl) {
+ this.webUrl = webUrl;
+ return this;
+ }
+
+ public LlapInstance setStatusUrl(String statusUrl) {
+ this.statusUrl = statusUrl;
+ return this;
+ }
+
+ public LlapInstance setRpcPort(int rpcPort) {
+ this.rpcPort = rpcPort;
+ return this;
+ }
+
+ public LlapInstance setMgmtPort(int mgmtPort) {
+ this.mgmtPort = mgmtPort;
+ return this;
+ }
- break;
- case UNKNOWN:
- sb.append("LLAP status unknown");
- break;
+ public LlapInstance setShufflePort(int shufflePort) {
+ this.shufflePort = shufflePort;
+ return this;
}
- if (StringUtils.isNotBlank(appStatusBuilder.getDiagnostics())) {
- sb.append("\n").append(appStatusBuilder.getDiagnostics());
+
+ public String getHostname() {
+ return hostname;
+ }
+
+ public String getStatusUrl() {
+ return statusUrl;
+ }
+
+ public String getContainerId() {
+ return containerId;
+ }
+
+ public String getWebUrl() {
+ return webUrl;
+ }
+
+ public Integer getRpcPort() {
+ return rpcPort;
+ }
+
+ public Integer getMgmtPort() {
+ return mgmtPort;
+ }
+
+ public Integer getShufflePort() {
+ return shufflePort;
}
- return sb.toString();
+ @Override
+ public String toString() {
+ return "LlapInstance{" +
+ "hostname='" + hostname + '\'' +
+ ", containerId='" + containerId + '\'' +
+ ", statusUrl='" + statusUrl + '\'' +
+ ", webUrl='" + webUrl + '\'' +
+ ", rpcPort=" + rpcPort +
+ ", mgmtPort=" + mgmtPort +
+ ", shufflePort=" + shufflePort +
+ '}';
+ }
+ }
+
+ static class LlapStatusCliException extends Exception {
+ final ExitCode exitCode;
+
+
+ public LlapStatusCliException(ExitCode exitCode, String message) {
+ super(exitCode.getInt() +": " + message);
+ this.exitCode = exitCode;
+ }
+
+ public LlapStatusCliException(ExitCode exitCode, String message, Throwable cause) {
+ super(message, cause);
+ this.exitCode = exitCode;
+ }
+
+ public ExitCode getExitCode() {
+ return exitCode;
+ }
+ }
+
+ enum State {
+ APP_NOT_FOUND, LAUNCHING,
+ RUNNING_PARTIAL,
+ RUNNING_ALL, COMPLETE, UNKNOWN
}
public enum ExitCode {
@@ -810,26 +918,6 @@ public class LlapStatusServiceDriver {
}
- public static class LlapStatusCliException extends Exception {
- final LlapStatusServiceDriver.ExitCode exitCode;
-
-
- public LlapStatusCliException(LlapStatusServiceDriver.ExitCode exitCode, String message) {
- super(exitCode.getInt() +": " + message);
- this.exitCode = exitCode;
- }
-
- public LlapStatusCliException(LlapStatusServiceDriver.ExitCode exitCode, String message, Throwable cause) {
- super(message, cause);
- this.exitCode = exitCode;
- }
-
- public LlapStatusServiceDriver.ExitCode getExitCode() {
- return exitCode;
- }
- }
-
-
private static void logError(Throwable t) {
LOG.error("FAILED: " + t.getMessage(), t);
System.err.println("FAILED: " + t.getMessage());
@@ -839,9 +927,6 @@ public class LlapStatusServiceDriver {
public static void main(String[] args) {
LOG.info("LLAP status invoked with arguments = {}", Arrays.toString(args));
int ret = ExitCode.SUCCESS.getInt();
- Clock clock = new SystemClock();
- long startTime = clock.getTime();
- long lastSummaryLogTime = -1;
LlapStatusServiceDriver statusServiceDriver = null;
LlapStatusOptions options = null;
@@ -852,8 +937,7 @@ public class LlapStatusServiceDriver {
statusServiceDriver.close();
logError(t);
if (t instanceof LlapStatusCliException) {
- LlapStatusCliException
- ce = (LlapStatusCliException) t;
+ LlapStatusCliException ce = (LlapStatusCliException) t;
ret = ce.getExitCode().getInt();
} else {
ret = ExitCode.INTERNAL_ERROR.getInt();
@@ -866,14 +950,12 @@ public class LlapStatusServiceDriver {
System.exit(ret);
}
- boolean firstAttempt = true;
final long refreshInterval = options.getRefreshIntervalMs();
final boolean watchMode = options.isWatchMode();
final long watchTimeout = options.getWatchTimeoutMs();
long numAttempts = watchTimeout / refreshInterval;
- numAttempts = watchMode ? numAttempts : 1; // Break out of the loop fast if watchMode is disabled.
- LlapStatusHelpers.State launchingState = null;
- LlapStatusHelpers.State currentState = null;
+ State launchingState = null;
+ State currentState = null;
boolean desiredStateAttained = false;
final float runningNodesThreshold = options.getRunningNodesThreshold();
try (OutputStream os = options.getOutputFile() == null ? System.out :
@@ -887,62 +969,28 @@ public class LlapStatusServiceDriver {
numAttempts, watchMode, new DecimalFormat("#.###").format(runningNodesThreshold));
while (numAttempts > 0) {
try {
- if (!firstAttempt) {
- if (watchMode) {
- try {
- Thread.sleep(refreshInterval);
- } catch (InterruptedException e) {
- // ignore
- }
- } else {
- // reported once, so break
- break;
- }
- } else {
- firstAttempt = false;
- }
ret = statusServiceDriver.run(options, watchMode ? watchTimeout : 0);
- currentState = statusServiceDriver.appStatusBuilder.getState();
- try {
- lastSummaryLogTime = LlapStatusServiceDriver
- .maybeLogSummary(clock, lastSummaryLogTime, statusServiceDriver,
- watchMode, watchTimeout, launchingState);
- } catch (Exception e) {
- LOG.warn("Failed to log summary", e);
- }
-
if (ret == ExitCode.SUCCESS.getInt()) {
if (watchMode) {
+ currentState = statusServiceDriver.appStatusBuilder.state;
// slider has started llap application, now if for some reason state changes to COMPLETE then fail fast
if (launchingState == null &&
- (EnumSet.of(LlapStatusHelpers.State.LAUNCHING,
- LlapStatusHelpers.State.RUNNING_PARTIAL,
- LlapStatusHelpers.State.RUNNING_ALL)
- .contains(currentState))) {
+ (currentState.equals(State.LAUNCHING) || currentState.equals(State.RUNNING_PARTIAL))) {
launchingState = currentState;
}
- if (launchingState != null && currentState.equals(
- LlapStatusHelpers.State.COMPLETE)) {
+ if (launchingState != null && currentState.equals(State.COMPLETE)) {
LOG.warn("Application stopped while launching. COMPLETE state reached while waiting for RUNNING state."
+ " Failing " + "fast..");
break;
}
- if (!(currentState.equals(LlapStatusHelpers.State.RUNNING_PARTIAL) || currentState.equals(
- LlapStatusHelpers.State.RUNNING_ALL))) {
- if (LOG.isDebugEnabled()) {
- LOG.debug(
- "Current state: {}. Desired state: {}. {}/{} instances.",
- currentState,
- runningNodesThreshold == 1.0f ?
- LlapStatusHelpers.State.RUNNING_ALL :
- LlapStatusHelpers.State.RUNNING_PARTIAL,
- statusServiceDriver.appStatusBuilder.getLiveInstances(),
- statusServiceDriver.appStatusBuilder
- .getDesiredInstances());
- }
+ if (!(currentState.equals(State.RUNNING_PARTIAL) || currentState.equals(State.RUNNING_ALL))) {
+ LOG.warn("Current state: {}. Desired state: {}. {}/{} instances.", currentState,
+ runningNodesThreshold == 1.0f ? State.RUNNING_ALL : State.RUNNING_PARTIAL,
+ statusServiceDriver.appStatusBuilder.getLiveInstances(),
+ statusServiceDriver.appStatusBuilder.getDesiredInstances());
numAttempts--;
continue;
}
@@ -953,17 +1001,11 @@ public class LlapStatusServiceDriver {
if (desiredInstances > 0) {
final float ratio = (float) liveInstances / (float) desiredInstances;
if (ratio < runningNodesThreshold) {
- if (LOG.isDebugEnabled()) {
- LOG.debug(
- "Waiting until running nodes threshold is reached. Current: {} Desired: {}." +
- " {}/{} instances.",
- new DecimalFormat("#.###").format(ratio),
- new DecimalFormat("#.###")
- .format(runningNodesThreshold),
- statusServiceDriver.appStatusBuilder.getLiveInstances(),
- statusServiceDriver.appStatusBuilder
- .getDesiredInstances());
- }
+ LOG.warn("Waiting until running nodes threshold is reached. Current: {} Desired: {}." +
+ " {}/{} instances.", new DecimalFormat("#.###").format(ratio),
+ new DecimalFormat("#.###").format(runningNodesThreshold),
+ statusServiceDriver.appStatusBuilder.getLiveInstances(),
+ statusServiceDriver.appStatusBuilder.getDesiredInstances());
numAttempts--;
continue;
} else {
@@ -994,14 +1036,18 @@ public class LlapStatusServiceDriver {
}
break;
} finally {
- // TODO Remove this before commit.
+ if (watchMode) {
+ try {
+ Thread.sleep(refreshInterval);
+ } catch (InterruptedException e) {
+ // ignore
+ }
+ } else {
+ // reported once, so break
+ break;
+ }
}
}
- // Log final state to CONSOLE_LOGGER
- LlapStatusServiceDriver
- .maybeLogSummary(clock, 0L, statusServiceDriver,
- watchMode, watchTimeout, launchingState);
- CONSOLE_LOGGER.info("\n\n\n");
// print current state before exiting
statusServiceDriver.outputJson(pw);
os.flush();
@@ -1013,8 +1059,7 @@ public class LlapStatusServiceDriver {
} catch (Throwable t) {
logError(t);
if (t instanceof LlapStatusCliException) {
- LlapStatusCliException
- ce = (LlapStatusCliException) t;
+ LlapStatusCliException ce = (LlapStatusCliException) t;
ret = ce.getExitCode().getInt();
} else {
ret = ExitCode.INTERNAL_ERROR.getInt();
@@ -1029,40 +1074,6 @@ public class LlapStatusServiceDriver {
System.exit(ret);
}
- private static long maybeLogSummary(Clock clock, long lastSummaryLogTime,
- LlapStatusServiceDriver statusServiceDriver,
- boolean watchMode, long watchTimeout, LlapStatusHelpers.State launchingState) {
- long currentTime = clock.getTime();
- if (lastSummaryLogTime < currentTime - LOG_SUMMARY_INTERVAL) {
- String diagString = null;
- if (launchingState == null && statusServiceDriver.appStatusBuilder.getState() ==
- LlapStatusHelpers.State.COMPLETE && watchMode) {
- // First known state was COMPLETED. Wait for the app launch to start.
- diagString = "Awaiting LLAP launch";
- // Clear completed instances in this case. Don't want to provide information from the previous run.
- statusServiceDriver.appStatusBuilder.clearCompletedLlapInstances();
- } else {
- diagString = constructDiagnostics(statusServiceDriver.appStatusBuilder);
- }
-
- if (lastSummaryLogTime == -1) {
- if (watchMode) {
- CONSOLE_LOGGER.info("\nLLAPSTATUS WatchMode with timeout={} s",
- TimeUnit.SECONDS.convert(watchTimeout, TimeUnit.MILLISECONDS));
- } else {
- CONSOLE_LOGGER.info("\nLLAPSTATUS");
- }
- CONSOLE_LOGGER.info(
- "--------------------------------------------------------------------------------");
- }
- CONSOLE_LOGGER.info(diagString);
- CONSOLE_LOGGER.info(
- "--------------------------------------------------------------------------------");
- lastSummaryLogTime = currentTime;
- }
- return lastSummaryLogTime;
- }
-
private void close() {
if (sliderClient != null) {
sliderClient.stop();
http://git-wip-us.apache.org/repos/asf/hive/blob/ed64a74e/llap-server/src/java/org/apache/hadoop/hive/llap/cli/status/LlapStatusHelpers.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/cli/status/LlapStatusHelpers.java b/llap-server/src/java/org/apache/hadoop/hive/llap/cli/status/LlapStatusHelpers.java
deleted file mode 100644
index 187f4c3..0000000
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/cli/status/LlapStatusHelpers.java
+++ /dev/null
@@ -1,449 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.hive.llap.cli.status;
-
-import java.util.HashMap;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Map;
-
-import org.apache.hadoop.hive.llap.cli.LlapStatusServiceDriver;
-import org.codehaus.jackson.annotate.JsonIgnore;
-
-public class LlapStatusHelpers {
- public enum State {
- APP_NOT_FOUND, LAUNCHING,
- RUNNING_PARTIAL,
- RUNNING_ALL, COMPLETE, UNKNOWN
- }
-
- public static class AmInfo {
- private String appName;
- private String appType;
- private String appId;
- private String containerId;
- private String hostname;
- private String amWebUrl;
-
- public AmInfo setAppName(String appName) {
- this.appName = appName;
- return this;
- }
-
- public AmInfo setAppType(String appType) {
- this.appType = appType;
- return this;
- }
-
- public AmInfo setAppId(String appId) {
- this.appId = appId;
- return this;
- }
-
- public AmInfo setContainerId(String containerId) {
- this.containerId = containerId;
- return this;
- }
-
- public AmInfo setHostname(String hostname) {
- this.hostname = hostname;
- return this;
- }
-
- public AmInfo setAmWebUrl(String amWebUrl) {
- this.amWebUrl = amWebUrl;
- return this;
- }
-
- public String getAppName() {
- return appName;
- }
-
- public String getAppType() {
- return appType;
- }
-
- public String getAppId() {
- return appId;
- }
-
- public String getContainerId() {
- return containerId;
- }
-
- public String getHostname() {
- return hostname;
- }
-
- public String getAmWebUrl() {
- return amWebUrl;
- }
-
- @Override
- public String toString() {
- return "AmInfo{" +
- "appName='" + appName + '\'' +
- ", appType='" + appType + '\'' +
- ", appId='" + appId + '\'' +
- ", containerId='" + containerId + '\'' +
- ", hostname='" + hostname + '\'' +
- ", amWebUrl='" + amWebUrl + '\'' +
- '}';
- }
- }
-
- public static class LlapInstance {
- private final String hostname;
- private final String containerId;
- private String logUrl;
-
- // Only for live instances.
- private String statusUrl;
- private String webUrl;
- private Integer rpcPort;
- private Integer mgmtPort;
- private Integer shufflePort;
-
- // For completed instances
- private String diagnostics;
- private int yarnContainerExitStatus;
-
- // TODO HIVE-13454 Add additional information such as #executors, container size, etc
-
- public LlapInstance(String hostname, String containerId) {
- this.hostname = hostname;
- this.containerId = containerId;
- }
-
- public LlapInstance setLogUrl(String logUrl) {
- this.logUrl = logUrl;
- return this;
- }
-
- public LlapInstance setWebUrl(String webUrl) {
- this.webUrl = webUrl;
- return this;
- }
-
- public LlapInstance setStatusUrl(String statusUrl) {
- this.statusUrl = statusUrl;
- return this;
- }
-
- public LlapInstance setRpcPort(int rpcPort) {
- this.rpcPort = rpcPort;
- return this;
- }
-
- public LlapInstance setMgmtPort(int mgmtPort) {
- this.mgmtPort = mgmtPort;
- return this;
- }
-
- public LlapInstance setShufflePort(int shufflePort) {
- this.shufflePort = shufflePort;
- return this;
- }
-
- public LlapInstance setDiagnostics(String diagnostics) {
- this.diagnostics = diagnostics;
- return this;
- }
-
- public LlapInstance setYarnContainerExitStatus(int yarnContainerExitStatus) {
- this.yarnContainerExitStatus = yarnContainerExitStatus;
- return this;
- }
-
- public String getHostname() {
- return hostname;
- }
-
- public String getLogUrl() {
- return logUrl;
- }
-
- public String getStatusUrl() {
- return statusUrl;
- }
-
- public String getContainerId() {
- return containerId;
- }
-
- public String getWebUrl() {
- return webUrl;
- }
-
- public Integer getRpcPort() {
- return rpcPort;
- }
-
- public Integer getMgmtPort() {
- return mgmtPort;
- }
-
- public Integer getShufflePort() {
- return shufflePort;
- }
-
- public String getDiagnostics() {
- return diagnostics;
- }
-
- public int getYarnContainerExitStatus() {
- return yarnContainerExitStatus;
- }
-
- @Override
- public String toString() {
- return "LlapInstance{" +
- "hostname='" + hostname + '\'' +
- "logUrl=" + logUrl + '\'' +
- ", containerId='" + containerId + '\'' +
- ", statusUrl='" + statusUrl + '\'' +
- ", webUrl='" + webUrl + '\'' +
- ", rpcPort=" + rpcPort +
- ", mgmtPort=" + mgmtPort +
- ", shufflePort=" + shufflePort +
- ", diagnostics=" + diagnostics +
- ", yarnContainerExitStatus=" + yarnContainerExitStatus +
- '}';
- }
- }
-
- public static final class AppStatusBuilder {
-
- private AmInfo amInfo;
- private State state = State.UNKNOWN;
- private String diagnostics;
- private String originalConfigurationPath;
- private String generatedConfigurationPath;
-
- private Integer desiredInstances = null;
- private Integer liveInstances = null;
- private Integer launchingInstances = null;
-
-
- private Long appStartTime;
- private Long appFinishTime;
-
- private boolean runningThresholdAchieved = false;
-
- private final List<LlapInstance> runningInstances = new LinkedList<>();
- private final List<LlapInstance> completedInstances = new LinkedList<>();
-
- private transient final Map<String, LlapInstance>
- containerToRunningInstanceMap = new HashMap<>();
- private transient final Map<String, LlapInstance>
- containerToCompletedInstanceMap = new HashMap<>();
-
- public void setAmInfo(AmInfo amInfo) {
- this.amInfo = amInfo;
- }
-
- public AppStatusBuilder setState(
- State state) {
- this.state = state;
- return this;
- }
-
- public AppStatusBuilder setDiagnostics(String diagnostics) {
- this.diagnostics = diagnostics;
- return this;
- }
-
- public AppStatusBuilder setOriginalConfigurationPath(String originalConfigurationPath) {
- this.originalConfigurationPath = originalConfigurationPath;
- return this;
- }
-
- public AppStatusBuilder setGeneratedConfigurationPath(String generatedConfigurationPath) {
- this.generatedConfigurationPath = generatedConfigurationPath;
- return this;
- }
-
- public AppStatusBuilder setAppStartTime(long appStartTime) {
- this.appStartTime = appStartTime;
- return this;
- }
-
- public AppStatusBuilder setAppFinishTime(long finishTime) {
- this.appFinishTime = finishTime;
- return this;
- }
-
- public void setRunningThresholdAchieved(boolean runningThresholdAchieved) {
- this.runningThresholdAchieved = runningThresholdAchieved;
- }
-
- public AppStatusBuilder setDesiredInstances(int desiredInstances) {
- this.desiredInstances = desiredInstances;
- return this;
- }
-
- public AppStatusBuilder setLiveInstances(int liveInstances) {
- this.liveInstances = liveInstances;
- return this;
- }
-
- public AppStatusBuilder setLaunchingInstances(int launchingInstances) {
- this.launchingInstances = launchingInstances;
- return this;
- }
-
- public AppStatusBuilder addNewRunningLlapInstance(LlapInstance llapInstance) {
- this.runningInstances.add(llapInstance);
- this.containerToRunningInstanceMap
- .put(llapInstance.getContainerId(), llapInstance);
- return this;
- }
-
- public LlapInstance removeAndGetRunningLlapInstanceForContainer(String containerIdString) {
- return containerToRunningInstanceMap.remove(containerIdString);
- }
-
- public void clearRunningLlapInstances() {
- this.runningInstances.clear();
- this.containerToRunningInstanceMap.clear();
- }
-
- public AppStatusBuilder clearAndAddPreviouslyKnownRunningInstances(List<LlapInstance> llapInstances) {
- clearRunningLlapInstances();
- for (LlapInstance llapInstance : llapInstances) {
- addNewRunningLlapInstance(llapInstance);
- }
- return this;
- }
-
- @JsonIgnore
- public List<LlapInstance> allRunningInstances() {
- return this.runningInstances;
- }
-
- public AppStatusBuilder addNewCompleteLlapInstance(LlapInstance llapInstance) {
- this.completedInstances.add(llapInstance);
- this.containerToCompletedInstanceMap
- .put(llapInstance.getContainerId(), llapInstance);
- return this;
- }
-
- public LlapInstance removeAndGetCompletedLlapInstanceForContainer(String containerIdString) {
- return containerToCompletedInstanceMap.remove(containerIdString);
- }
-
- public void clearCompletedLlapInstances() {
- this.completedInstances.clear();
- this.containerToCompletedInstanceMap.clear();
- }
-
- public AppStatusBuilder clearAndAddPreviouslyKnownCompletedInstances(List<LlapInstance> llapInstances) {
- clearCompletedLlapInstances();
- for (LlapInstance llapInstance : llapInstances) {
- addNewCompleteLlapInstance(llapInstance);
- }
- return this;
- }
-
- @JsonIgnore
- public List<LlapInstance> allCompletedInstances() {
- return this.completedInstances;
- }
-
- public AmInfo getAmInfo() {
- return amInfo;
- }
-
- public State getState() {
- return state;
- }
-
- public String getDiagnostics() {
- return diagnostics;
- }
-
- public String getOriginalConfigurationPath() {
- return originalConfigurationPath;
- }
-
- public String getGeneratedConfigurationPath() {
- return generatedConfigurationPath;
- }
-
- public Integer getDesiredInstances() {
- return desiredInstances;
- }
-
- public Integer getLiveInstances() {
- return liveInstances;
- }
-
- public Integer getLaunchingInstances() {
- return launchingInstances;
- }
-
- public Long getAppStartTime() {
- return appStartTime;
- }
-
- public Long getAppFinishTime() {
- return appFinishTime;
- }
-
- public boolean isRunningThresholdAchieved() {
- return runningThresholdAchieved;
- }
-
- public List<LlapInstance> getRunningInstances() {
- return runningInstances;
- }
-
- public List<LlapInstance> getCompletedInstances() {
- return completedInstances;
- }
-
- @JsonIgnore
- public AmInfo maybeCreateAndGetAmInfo() {
- if (amInfo == null) {
- amInfo = new AmInfo();
- }
- return amInfo;
- }
-
- @Override
- public String toString() {
- return "AppStatusBuilder{" +
- "amInfo=" + amInfo +
- ", state=" + state +
- ", diagnostics=" + diagnostics +
- ", originalConfigurationPath='" + originalConfigurationPath + '\'' +
- ", generatedConfigurationPath='" + generatedConfigurationPath + '\'' +
- ", desiredInstances=" + desiredInstances +
- ", liveInstances=" + liveInstances +
- ", launchingInstances=" + launchingInstances +
- ", appStartTime=" + appStartTime +
- ", appFinishTime=" + appFinishTime +
- ", runningThresholdAchieved=" + runningThresholdAchieved +
- ", runningInstances=" + runningInstances +
- ", completedInstances=" + completedInstances +
- ", containerToRunningInstanceMap=" + containerToRunningInstanceMap +
- '}';
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/hive/blob/ed64a74e/llap-server/src/java/org/apache/hadoop/hive/llap/configuration/LlapDaemonConfiguration.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/configuration/LlapDaemonConfiguration.java b/llap-server/src/java/org/apache/hadoop/hive/llap/configuration/LlapDaemonConfiguration.java
index 7219d36..88f3b19 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/configuration/LlapDaemonConfiguration.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/configuration/LlapDaemonConfiguration.java
@@ -36,7 +36,7 @@ public class LlapDaemonConfiguration extends Configuration {
public static final String[] SSL_DAEMON_CONFIGS = { "ssl-server.xml" };
public LlapDaemonConfiguration() {
- super(true); // Load the defaults.
+ super(false);
for (String conf : DAEMON_CONFIGS) {
addResource(conf);
}
http://git-wip-us.apache.org/repos/asf/hive/blob/ed64a74e/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/ContainerRunnerImpl.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/ContainerRunnerImpl.java b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/ContainerRunnerImpl.java
index e030a76..82bbcf3 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/ContainerRunnerImpl.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/ContainerRunnerImpl.java
@@ -524,8 +524,4 @@ public class ContainerRunnerImpl extends CompositeService implements ContainerRu
return queryId + "-" + dagIndex;
}
- public int getNumActive() {
- return executorService.getNumActive();
- }
-
}
http://git-wip-us.apache.org/repos/asf/hive/blob/ed64a74e/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/EvictingPriorityBlockingQueue.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/EvictingPriorityBlockingQueue.java b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/EvictingPriorityBlockingQueue.java
index 8fe59d4..a80bb9b 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/EvictingPriorityBlockingQueue.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/EvictingPriorityBlockingQueue.java
@@ -53,11 +53,6 @@ public class EvictingPriorityBlockingQueue<E> {
currentSize++;
return null;
} else {
- if (isEmpty()) {
- // Empty queue. But no capacity available, due to waitQueueSize and additionalElementsAllowed
- // Return the element.
- return e;
- }
// No capacity. Check if an element needs to be evicted.
E last = deque.peekLast();
if (comparator.compare(e, last) < 0) {
http://git-wip-us.apache.org/repos/asf/hive/blob/ed64a74e/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapDaemon.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapDaemon.java b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapDaemon.java
index aae146e..95bc675 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapDaemon.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapDaemon.java
@@ -15,17 +15,14 @@
package org.apache.hadoop.hive.llap.daemon.impl;
import org.apache.hadoop.hive.llap.LlapOutputFormatService;
-
import java.io.IOException;
import java.lang.management.ManagementFactory;
import java.lang.management.MemoryPoolMXBean;
import java.lang.management.MemoryType;
import java.net.InetSocketAddress;
import java.net.URL;
-import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Arrays;
-import java.util.Date;
import java.util.List;
import java.util.Set;
import java.util.concurrent.atomic.AtomicLong;
@@ -41,7 +38,6 @@ import org.apache.hadoop.hive.common.UgiFactory;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
import org.apache.hadoop.hive.llap.DaemonId;
-import org.apache.hadoop.hive.llap.LlapDaemonInfo;
import org.apache.hadoop.hive.llap.LlapUtil;
import org.apache.hadoop.hive.llap.configuration.LlapDaemonConfiguration;
import org.apache.hadoop.hive.llap.daemon.ContainerRunner;
@@ -177,33 +173,28 @@ public class LlapDaemon extends CompositeService implements ContainerRunner, Lla
daemonConf, ConfVars.LLAP_DAEMON_TASK_SCHEDULER_WAIT_QUEUE_SIZE);
boolean enablePreemption = HiveConf.getBoolVar(
daemonConf, ConfVars.LLAP_DAEMON_TASK_SCHEDULER_ENABLE_PREEMPTION);
- final String logMsg = "Attempting to start LlapDaemon with the following configuration: " +
- "maxJvmMemory=" + maxJvmMemory + " ("
- + LlapUtil.humanReadableByteCount(maxJvmMemory) + ")" +
- ", requestedExecutorMemory=" + executorMemoryBytes +
- " (" + LlapUtil.humanReadableByteCount(executorMemoryBytes) + ")" +
- ", llapIoCacheSize=" + ioMemoryBytes + " ("
- + LlapUtil.humanReadableByteCount(ioMemoryBytes) + ")" +
- ", xmxHeadRoomMemory=" + xmxHeadRoomBytes + " ("
- + LlapUtil.humanReadableByteCount(xmxHeadRoomBytes) + ")" +
- ", adjustedExecutorMemory=" + executorMemoryPerInstance +
- " (" + LlapUtil.humanReadableByteCount(executorMemoryPerInstance) + ")" +
- ", numExecutors=" + numExecutors +
- ", llapIoEnabled=" + ioEnabled +
- ", llapIoCacheIsDirect=" + isDirectCache +
- ", rpcListenerPort=" + srvPort +
- ", mngListenerPort=" + mngPort +
- ", webPort=" + webPort +
- ", outputFormatSvcPort=" + outputFormatServicePort +
- ", workDirs=" + Arrays.toString(localDirs) +
- ", shufflePort=" + shufflePort +
- ", waitQueueSize= " + waitQueueSize +
- ", enablePreemption= " + enablePreemption;
- LOG.info(logMsg);
- final String currTSISO8601 = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSSZ").format(new Date());
- // Time based log retrieval may not fetch the above log line so logging to stderr for debugging purpose.
- System.err.println(currTSISO8601 + " " + logMsg);
-
+ LOG.warn("Attempting to start LlapDaemonConf with the following configuration: " +
+ "maxJvmMemory=" + maxJvmMemory + " ("
+ + LlapUtil.humanReadableByteCount(maxJvmMemory) + ")" +
+ ", requestedExecutorMemory=" + executorMemoryBytes +
+ " (" + LlapUtil.humanReadableByteCount(executorMemoryBytes) + ")" +
+ ", llapIoCacheSize=" + ioMemoryBytes + " ("
+ + LlapUtil.humanReadableByteCount(ioMemoryBytes) + ")" +
+ ", xmxHeadRoomMemory=" + xmxHeadRoomBytes + " ("
+ + LlapUtil.humanReadableByteCount(xmxHeadRoomBytes) + ")" +
+ ", adjustedExecutorMemory=" + executorMemoryPerInstance +
+ " (" + LlapUtil.humanReadableByteCount(executorMemoryPerInstance) + ")" +
+ ", numExecutors=" + numExecutors +
+ ", llapIoEnabled=" + ioEnabled +
+ ", llapIoCacheIsDirect=" + isDirectCache +
+ ", rpcListenerPort=" + srvPort +
+ ", mngListenerPort=" + mngPort +
+ ", webPort=" + webPort +
+ ", outputFormatSvcPort=" + outputFormatServicePort +
+ ", workDirs=" + Arrays.toString(localDirs) +
+ ", shufflePort=" + shufflePort +
+ ", waitQueueSize= " + waitQueueSize +
+ ", enablePreemption= " + enablePreemption);
long memRequired =
executorMemoryBytes + (ioEnabled && isDirectCache == false ? ioMemoryBytes : 0);
@@ -265,7 +256,8 @@ public class LlapDaemon extends CompositeService implements ContainerRunner, Lla
this.metrics.setCacheMemoryPerInstance(ioMemoryBytes);
this.metrics.setJvmMaxMemory(maxJvmMemory);
this.metrics.setWaitQueueSize(waitQueueSize);
- this.metrics.getJvmMetrics().setPauseMonitor(pauseMonitor);
+ // TODO: Has to be reverted in HIVE-15644
+ //this.metrics.getJvmMetrics().setPauseMonitor(pauseMonitor);
this.llapDaemonInfoBean = MBeans.register("LlapDaemon", "LlapDaemonInfo", this);
LOG.info("Started LlapMetricsSystem with displayName: " + displayName +
" sessionId: " + sessionId);
@@ -344,7 +336,7 @@ public class LlapDaemon extends CompositeService implements ContainerRunner, Lla
System.setProperty("isThreadContextMapInheritable", "true");
Configurator.initialize("LlapDaemonLog4j2", llap_l4j2.toString());
long end = System.currentTimeMillis();
- LOG.debug("LLAP daemon logging initialized from {} in {} ms. Async: {}",
+ LOG.warn("LLAP daemon logging initialized from {} in {} ms. Async: {}",
llap_l4j2, (end - start), async);
} else {
throw new RuntimeException("Log initialization failed." +
@@ -386,7 +378,7 @@ public class LlapDaemon extends CompositeService implements ContainerRunner, Lla
"$$$$$$$$\\ $$$$$$$$\\ $$ | $$ |$$ |\n" +
"\\________|\\________|\\__| \\__|\\__|\n" +
"\n";
- LOG.info("\n\n" + asciiArt);
+ LOG.warn("\n\n" + asciiArt);
}
@Override
@@ -508,6 +500,8 @@ public class LlapDaemon extends CompositeService implements ContainerRunner, Lla
nmHost, nmPort);
}
+ int numExecutors = HiveConf.getIntVar(daemonConf, ConfVars.LLAP_DAEMON_NUM_EXECUTORS);
+
String workDirsString = System.getenv(ApplicationConstants.Environment.LOCAL_DIRS.name());
String localDirList = LlapUtil.getDaemonLocalDirString(daemonConf, workDirsString);
@@ -518,19 +512,16 @@ public class LlapDaemon extends CompositeService implements ContainerRunner, Lla
int shufflePort = daemonConf
.getInt(ShuffleHandler.SHUFFLE_PORT_CONFIG_KEY, ShuffleHandler.DEFAULT_SHUFFLE_PORT);
int webPort = HiveConf.getIntVar(daemonConf, ConfVars.LLAP_DAEMON_WEB_PORT);
-
- LlapDaemonInfo.initialize(appName, daemonConf);
-
- int numExecutors = LlapDaemonInfo.INSTANCE.getNumExecutors();
- long executorMemoryBytes = LlapDaemonInfo.INSTANCE.getExecutorMemory();
- long ioMemoryBytes = LlapDaemonInfo.INSTANCE.getCacheSize();
- boolean isDirectCache = LlapDaemonInfo.INSTANCE.isDirectCache();
- boolean isLlapIo = LlapDaemonInfo.INSTANCE.isLlapIo();
+ long executorMemoryBytes = HiveConf.getIntVar(
+ daemonConf, ConfVars.LLAP_DAEMON_MEMORY_PER_INSTANCE_MB) * 1024l * 1024l;
+ long ioMemoryBytes = HiveConf.getSizeVar(daemonConf, ConfVars.LLAP_IO_MEMORY_MAX_SIZE);
+ boolean isDirectCache = HiveConf.getBoolVar(daemonConf, ConfVars.LLAP_ALLOCATOR_DIRECT);
+ boolean isLlapIo = HiveConf.getBoolVar(daemonConf, HiveConf.ConfVars.LLAP_IO_ENABLED, true);
LlapDaemon.initializeLogging(daemonConf);
- llapDaemon =
- new LlapDaemon(daemonConf, numExecutors, executorMemoryBytes, isLlapIo, isDirectCache,
- ioMemoryBytes, localDirs, rpcPort, mngPort, shufflePort, webPort, appName);
+ llapDaemon = new LlapDaemon(daemonConf, numExecutors, executorMemoryBytes, isLlapIo,
+ isDirectCache, ioMemoryBytes, localDirs, rpcPort, mngPort, shufflePort, webPort,
+ appName);
LOG.info("Adding shutdown hook for LlapDaemon");
ShutdownHookManager.addShutdownHook(new CompositeServiceShutdownHook(llapDaemon), 1);
@@ -541,7 +532,7 @@ public class LlapDaemon extends CompositeService implements ContainerRunner, Lla
// Relying on the RPC threads to keep the service alive.
} catch (Throwable t) {
// TODO Replace this with a ExceptionHandler / ShutdownHook
- LOG.error("Failed to start LLAP Daemon with exception", t);
+ LOG.warn("Failed to start LLAP Daemon with exception", t);
if (llapDaemon != null) {
llapDaemon.shutdown();
}
@@ -610,11 +601,6 @@ public class LlapDaemon extends CompositeService implements ContainerRunner, Lla
}
@Override
- public int getNumActive() {
- return containerRunner.getNumActive();
- }
-
- @Override
public long getExecutorMemoryPerInstance() {
return executorMemoryPerInstance;
}
http://git-wip-us.apache.org/repos/asf/hive/blob/ed64a74e/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapDaemonMXBean.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapDaemonMXBean.java b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapDaemonMXBean.java
index 22cfc9e..d6449db 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapDaemonMXBean.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapDaemonMXBean.java
@@ -40,12 +40,6 @@ public interface LlapDaemonMXBean {
public int getNumExecutors();
/**
- * Gets the number of active executors.
- * @return number of active executors
- */
- public int getNumActive();
-
- /**
* Gets the shuffle port.
* @return the shuffle port
*/
http://git-wip-us.apache.org/repos/asf/hive/blob/ed64a74e/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/Scheduler.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/Scheduler.java b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/Scheduler.java
index f199593..fd6234a 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/Scheduler.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/Scheduler.java
@@ -45,7 +45,5 @@ public interface Scheduler<T> {
Set<String> getExecutorsStatus();
- int getNumActive();
-
QueryIdentifier findQueryByFragment(String fragmentId);
}
http://git-wip-us.apache.org/repos/asf/hive/blob/ed64a74e/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskExecutorService.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskExecutorService.java b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskExecutorService.java
index 7f8c947..9eaa7d7 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskExecutorService.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskExecutorService.java
@@ -201,20 +201,6 @@ public class TaskExecutorService extends AbstractService
};
@Override
- public int getNumActive() {
- int result = 0;
- for (Map.Entry<String, TaskWrapper> e : knownTasks.entrySet()) {
- TaskWrapper task = e.getValue();
- if (task.isInWaitQueue()) continue;
- TaskRunnerCallable c = task.getTaskRunnerCallable();
- // Count the tasks in intermediate state as waiting.
- if (c == null || c.getStartTime() == 0) continue;
- ++result;
- }
- return result;
- }
-
- @Override
public Set<String> getExecutorsStatus() {
// TODO Change this method to make the output easier to parse (parse programmatically)
Set<String> result = new LinkedHashSet<>();
@@ -291,8 +277,7 @@ public class TaskExecutorService extends AbstractService
}
// If the task cannot finish and if no slots are available then don't schedule it.
// Also don't wait if we have a task and we just killed something to schedule it.
- // (numSlotsAvailable can go negative, if the callback after the thread completes is delayed)
- boolean shouldWait = numSlotsAvailable.get() <= 0 && lastKillTimeMs == null;
+ boolean shouldWait = numSlotsAvailable.get() == 0 && lastKillTimeMs == null;
if (task.getTaskRunnerCallable().canFinish()) {
if (isDebugEnabled) {
LOG.debug("Attempting to schedule task {}, canFinish={}. Current state: "
@@ -743,8 +728,8 @@ public class TaskExecutorService extends AbstractService
knownTasks.remove(taskWrapper.getRequestId());
taskWrapper.setIsInPreemptableQueue(false);
taskWrapper.maybeUnregisterForFinishedStateNotifications();
- updatePreemptionListAndNotify(result.getEndReason());
taskWrapper.getTaskRunnerCallable().getCallback().onSuccess(result);
+ updatePreemptionListAndNotify(result.getEndReason());
}
@Override
@@ -757,8 +742,8 @@ public class TaskExecutorService extends AbstractService
knownTasks.remove(taskWrapper.getRequestId());
taskWrapper.setIsInPreemptableQueue(false);
taskWrapper.maybeUnregisterForFinishedStateNotifications();
- updatePreemptionListAndNotify(null);
taskWrapper.getTaskRunnerCallable().getCallback().onFailure(t);
+ updatePreemptionListAndNotify(null);
LOG.error("Failed notification received: Stacktrace: " + ExceptionUtils.getStackTrace(t));
}
http://git-wip-us.apache.org/repos/asf/hive/blob/ed64a74e/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskRunnerCallable.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskRunnerCallable.java b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskRunnerCallable.java
index 1669815..c077d75 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskRunnerCallable.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskRunnerCallable.java
@@ -407,7 +407,6 @@ public class TaskRunnerCallable extends CallableWithNdc<TaskRunner2Result> {
taskReporter.shutdown();
}
if (umbilical != null) {
- // TODO: Can this be moved out of the main callback path
RPC.stopProxy(umbilical);
}
}
http://git-wip-us.apache.org/repos/asf/hive/blob/ed64a74e/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/services/impl/LlapIoMemoryServlet.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/services/impl/LlapIoMemoryServlet.java b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/services/impl/LlapIoMemoryServlet.java
deleted file mode 100644
index 3386cb4..0000000
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/services/impl/LlapIoMemoryServlet.java
+++ /dev/null
@@ -1,88 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.hive.llap.daemon.services.impl;
-
-import java.io.PrintWriter;
-
-import javax.servlet.ServletException;
-import javax.servlet.http.HttpServlet;
-import javax.servlet.http.HttpServletRequest;
-import javax.servlet.http.HttpServletResponse;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.hive.llap.io.api.LlapIo;
-import org.apache.hadoop.hive.llap.io.api.LlapProxy;
-import org.apache.hive.http.HttpServer;
-
-@SuppressWarnings("serial")
-public class LlapIoMemoryServlet extends HttpServlet {
-
- private static final Log LOG = LogFactory.getLog(LlapIoMemoryServlet.class);
- static final String ACCESS_CONTROL_ALLOW_METHODS = "Access-Control-Allow-Methods";
- static final String ACCESS_CONTROL_ALLOW_ORIGIN = "Access-Control-Allow-Origin";
-
- /**
- * Initialize this servlet.
- */
- @Override
- public void init() throws ServletException {
- }
-
- /**
- * Process a GET request for the specified resource.
- *
- * @param request
- * The servlet request we are processing
- * @param response
- * The servlet response we are creating
- */
- @Override
- public void doGet(HttpServletRequest request, HttpServletResponse response) {
- try {
- if (!HttpServer.isInstrumentationAccessAllowed(getServletContext(), request, response)) {
- return;
- }
- PrintWriter writer = null;
-
- try {
- response.setContentType("text/plain; charset=utf8");
- response.setHeader(ACCESS_CONTROL_ALLOW_METHODS, "GET");
- response.setHeader(ACCESS_CONTROL_ALLOW_ORIGIN, "*");
- response.setHeader("Cache-Control", "no-transform,public,max-age=60,s-maxage=60");
-
- writer = response.getWriter();
-
- LlapIo<?> llapIo = LlapProxy.getIo();
- if (llapIo == null) {
- writer.write("LLAP IO not found");
- } else {
- writer.write(llapIo.getMemoryInfo());
- }
-
- } finally {
- if (writer != null) {
- writer.close();
- }
- }
- } catch (Exception e) {
- LOG.error("Caught exception while processing llap status request", e);
- response.setStatus(HttpServletResponse.SC_INTERNAL_SERVER_ERROR);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/hive/blob/ed64a74e/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/services/impl/LlapWebServices.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/services/impl/LlapWebServices.java b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/services/impl/LlapWebServices.java
index e896df2..028daa1 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/services/impl/LlapWebServices.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/services/impl/LlapWebServices.java
@@ -102,7 +102,6 @@ public class LlapWebServices extends AbstractService {
this.http = builder.build();
this.http.addServlet("status", "/status", LlapStatusServlet.class);
this.http.addServlet("peers", "/peers", LlapPeerRegistryServlet.class);
- this.http.addServlet("iomem", "/iomem", LlapIoMemoryServlet.class);
} catch (IOException e) {
LOG.warn("LLAP web service failed to come up", e);
}
http://git-wip-us.apache.org/repos/asf/hive/blob/ed64a74e/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapIoImpl.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapIoImpl.java b/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapIoImpl.java
index 294fb2b..7c309a4 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapIoImpl.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapIoImpl.java
@@ -23,12 +23,13 @@ import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import javax.management.ObjectName;
-import org.apache.hadoop.hive.llap.daemon.impl.LlapDaemon;
import org.apache.hadoop.hive.llap.daemon.impl.StatsRecordingThreadPool;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -38,8 +39,8 @@ import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
import org.apache.hadoop.hive.llap.cache.BuddyAllocator;
import org.apache.hadoop.hive.llap.cache.BufferUsageManager;
+import org.apache.hadoop.hive.llap.cache.EvictionAwareAllocator;
import org.apache.hadoop.hive.llap.cache.EvictionDispatcher;
-import org.apache.hadoop.hive.llap.cache.LlapOomDebugDump;
import org.apache.hadoop.hive.llap.cache.LowLevelCache;
import org.apache.hadoop.hive.llap.cache.LowLevelCacheImpl;
import org.apache.hadoop.hive.llap.cache.LowLevelCacheMemoryManager;
@@ -65,6 +66,8 @@ import org.apache.hadoop.mapred.InputFormat;
import org.apache.hadoop.metrics2.util.MBeans;
import com.google.common.primitives.Ints;
+import com.google.common.util.concurrent.ListeningExecutorService;
+import com.google.common.util.concurrent.MoreExecutors;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
public class LlapIoImpl implements LlapIo<VectorizedRowBatch> {
@@ -82,7 +85,6 @@ public class LlapIoImpl implements LlapIo<VectorizedRowBatch> {
private final LlapDaemonIOMetrics ioMetrics;
private ObjectName buddyAllocatorMXBean;
private final Allocator allocator;
- private final LlapOomDebugDump memoryDump;
private LlapIoImpl(Configuration conf) throws IOException {
String ioMode = HiveConf.getVar(conf, HiveConf.ConfVars.LLAP_IO_MEMORY_MODE);
@@ -119,36 +121,14 @@ public class LlapIoImpl implements LlapIo<VectorizedRowBatch> {
if (useLowLevelCache) {
// Memory manager uses cache policy to trigger evictions, so create the policy first.
boolean useLrfu = HiveConf.getBoolVar(conf, HiveConf.ConfVars.LLAP_USE_LRFU);
- long totalMemorySize = HiveConf.getSizeVar(conf, ConfVars.LLAP_IO_MEMORY_MAX_SIZE);
- int minAllocSize = (int)HiveConf.getSizeVar(conf, ConfVars.LLAP_ALLOCATOR_MIN_ALLOC);
- float metadataFraction = HiveConf.getFloatVar(conf, ConfVars.LLAP_IO_METADATA_FRACTION);
- long metaMem = 0;
- // TODO: this split a workaround until HIVE-15665.
- // Technically we don't have to do it for on-heap data cache but we'd do for testing.
- boolean isSplitCache = metadataFraction > 0f;
- if (isSplitCache) {
- metaMem = (long)(LlapDaemon.getTotalHeapSize() * metadataFraction);
- }
- LowLevelCachePolicy cachePolicy = useLrfu ? new LowLevelLrfuCachePolicy(
- minAllocSize, totalMemorySize, conf) : new LowLevelFifoCachePolicy();
+ LowLevelCachePolicy cachePolicy =
+ useLrfu ? new LowLevelLrfuCachePolicy(conf) : new LowLevelFifoCachePolicy(conf);
// Allocator uses memory manager to request memory, so create the manager next.
LowLevelCacheMemoryManager memManager = new LowLevelCacheMemoryManager(
- totalMemorySize, cachePolicy, cacheMetrics);
- LowLevelCachePolicy metaCachePolicy = null;
- LowLevelCacheMemoryManager metaMemManager = null;
- if (isSplitCache) {
- metaCachePolicy = useLrfu ? new LowLevelLrfuCachePolicy(
- minAllocSize, metaMem, conf) : new LowLevelFifoCachePolicy();
- metaMemManager = new LowLevelCacheMemoryManager(metaMem, metaCachePolicy, cacheMetrics);
- } else {
- metaCachePolicy = cachePolicy;
- metaMemManager = memManager;
- }
- cacheMetrics.setCacheCapacityTotal(totalMemorySize + metaMem);
+ conf, cachePolicy, cacheMetrics);
// Cache uses allocator to allocate and deallocate, create allocator and then caches.
- BuddyAllocator allocator = new BuddyAllocator(conf, memManager, cacheMetrics);
+ EvictionAwareAllocator allocator = new BuddyAllocator(conf, memManager, cacheMetrics);
this.allocator = allocator;
- this.memoryDump = allocator;
LowLevelCacheImpl cacheImpl = new LowLevelCacheImpl(
cacheMetrics, cachePolicy, allocator, true);
cache = cacheImpl;
@@ -158,21 +138,15 @@ public class LlapIoImpl implements LlapIo<VectorizedRowBatch> {
serdeCache = serdeCacheImpl;
}
boolean useGapCache = HiveConf.getBoolVar(conf, ConfVars.LLAP_CACHE_ENABLE_ORC_GAP_CACHE);
- metadataCache = new OrcMetadataCache(metaMemManager, metaCachePolicy, useGapCache);
+ metadataCache = new OrcMetadataCache(memManager, cachePolicy, useGapCache);
// And finally cache policy uses cache to notify it of eviction. The cycle is complete!
- EvictionDispatcher e = new EvictionDispatcher(cache, serdeCache, metadataCache, allocator);
- if (isSplitCache) {
- metaCachePolicy.setEvictionListener(e);
- metaCachePolicy.setParentDebugDumper(e);
- }
- cachePolicy.setEvictionListener(e);
- cachePolicy.setParentDebugDumper(e);
-
+ cachePolicy.setEvictionListener(new EvictionDispatcher(
+ cache, serdeCache, metadataCache, allocator));
+ cachePolicy.setParentDebugDumper(cacheImpl);
cacheImpl.startThreads(); // Start the cache threads.
bufferManager = cacheImpl; // Cache also serves as buffer manager.
} else {
this.allocator = new SimpleAllocator(conf);
- memoryDump = null;
SimpleBufferManager sbm = new SimpleBufferManager(allocator, cacheMetrics);
bufferManager = sbm;
cache = sbm;
@@ -197,14 +171,6 @@ public class LlapIoImpl implements LlapIo<VectorizedRowBatch> {
buddyAllocatorMXBean = MBeans.register("LlapDaemon", "BuddyAllocatorInfo", allocator);
}
- @Override
- public String getMemoryInfo() {
- if (memoryDump == null) return "\nNot using the allocator";
- StringBuilder sb = new StringBuilder();
- memoryDump.debugDumpShort(sb);
- return sb.toString();
- }
-
@SuppressWarnings("rawtypes")
@Override
public InputFormat<NullWritable, VectorizedRowBatch> getInputFormat(
http://git-wip-us.apache.org/repos/asf/hive/blob/ed64a74e/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/OrcColumnVectorProducer.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/OrcColumnVectorProducer.java b/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/OrcColumnVectorProducer.java
index 121e169..ac031aa 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/OrcColumnVectorProducer.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/OrcColumnVectorProducer.java
@@ -79,9 +79,9 @@ public class OrcColumnVectorProducer implements ColumnVectorProducer {
cacheMetrics.incrCacheReadRequests();
OrcEncodedDataConsumer edc = new OrcEncodedDataConsumer(consumer, columnIds.size(),
_skipCorrupt, counters, ioMetrics);
- OrcEncodedDataReader reader = new OrcEncodedDataReader(
- lowLevelCache, bufferManager, metadataCache, conf, job, split, columnIds, sarg,
- columnNames, edc, counters, readerSchema);
+ // Note: we use global conf here and ignore JobConf.
+ OrcEncodedDataReader reader = new OrcEncodedDataReader(lowLevelCache, bufferManager,
+ metadataCache, conf, split, columnIds, sarg, columnNames, edc, counters, readerSchema);
edc.init(reader, reader);
return edc;
}