You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by ji...@apache.org on 2017/09/25 23:38:36 UTC
[37/86] [abbrv] hadoop git commit: YARN-7050. Post cleanup after
YARN-6903, removal of org.apache.slider package. Contributed by Jian He
http://git-wip-us.apache.org/repos/asf/hadoop/blob/727e6d78/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/hadoop/yarn/service/servicemonitor/probe/Probe.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/hadoop/yarn/service/servicemonitor/probe/Probe.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/hadoop/yarn/service/servicemonitor/probe/Probe.java
new file mode 100644
index 0000000..b851fb7
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/hadoop/yarn/service/servicemonitor/probe/Probe.java
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.service.servicemonitor.probe;
+
+import org.apache.commons.lang.StringUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.yarn.service.compinstance.ComponentInstance;
+
+import java.io.IOException;
+import java.util.Map;
+
+/**
+ * Base class of all probes.
+ */
+public abstract class Probe implements MonitorKeys {
+
+ protected final Configuration conf;
+ private String name;
+
+ /**
+ * Create a probe of a specific name
+ *
+ * @param name probe name
+ * @param conf configuration being stored.
+ */
+ public Probe(String name, Configuration conf) {
+ this.name = name;
+ this.conf = conf;
+ }
+
+
+ protected void setName(String name) {
+ this.name = name;
+ }
+
+ public String getName() {
+ return name;
+ }
+
+
+ @Override
+ public String toString() {
+ return getName();
+ }
+
+ public static String getProperty(Map<String, String> props, String name,
+ String defaultValue) throws IOException {
+ String value = props.get(name);
+ if (StringUtils.isEmpty(value)) {
+ if (defaultValue == null) {
+ throw new IOException(name + " not specified");
+ }
+ return defaultValue;
+ }
+ return value;
+ }
+
+ public static int getPropertyInt(Map<String, String> props, String name,
+ Integer defaultValue) throws IOException {
+ String value = props.get(name);
+ if (StringUtils.isEmpty(value)) {
+ if (defaultValue == null) {
+ throw new IOException(name + " not specified");
+ }
+ return defaultValue;
+ }
+ return Integer.parseInt(value);
+ }
+
+ /**
+ * perform any prelaunch initialization
+ */
+ public void init() throws IOException {
+
+ }
+
+ /**
+ * Ping the endpoint. All exceptions must be caught and included in the
+ * (failure) status.
+ *
+ * @param instance instance to ping
+ * @return the status
+ */
+ public abstract ProbeStatus ping(ComponentInstance instance);
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/727e6d78/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/hadoop/yarn/service/servicemonitor/probe/ProbeStatus.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/hadoop/yarn/service/servicemonitor/probe/ProbeStatus.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/hadoop/yarn/service/servicemonitor/probe/ProbeStatus.java
new file mode 100644
index 0000000..7cd761c
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/hadoop/yarn/service/servicemonitor/probe/ProbeStatus.java
@@ -0,0 +1,160 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.service.servicemonitor.probe;
+
+import java.io.Serializable;
+import java.util.Date;
+
+/**
+ * Status message of a probe. This is designed to be sent over the wire, though the exception
+ * Had better be unserializable at the far end if that is to work.
+ */
+public final class ProbeStatus implements Serializable {
+ private static final long serialVersionUID = 165468L;
+
+ private long timestamp;
+ private String timestampText;
+ private boolean success;
+ private boolean realOutcome;
+ private String message;
+ private Throwable thrown;
+ private transient Probe originator;
+
+ public ProbeStatus() {
+ }
+
+ public ProbeStatus(long timestamp, String message, Throwable thrown) {
+ this.success = false;
+ this.message = message;
+ this.thrown = thrown;
+ setTimestamp(timestamp);
+ }
+
+ public ProbeStatus(long timestamp, String message) {
+ this.success = true;
+ setTimestamp(timestamp);
+ this.message = message;
+ this.thrown = null;
+ }
+
+ public long getTimestamp() {
+ return timestamp;
+ }
+
+ public void setTimestamp(long timestamp) {
+ this.timestamp = timestamp;
+ timestampText = new Date(timestamp).toString();
+ }
+
+ public boolean isSuccess() {
+ return success;
+ }
+
+ /**
+ * Set both the success and the real outcome bits to the same value
+ * @param success the new value
+ */
+ public void setSuccess(boolean success) {
+ this.success = success;
+ realOutcome = success;
+ }
+
+ public String getTimestampText() {
+ return timestampText;
+ }
+
+ public boolean getRealOutcome() {
+ return realOutcome;
+ }
+
+ public String getMessage() {
+ return message;
+ }
+
+ public void setMessage(String message) {
+ this.message = message;
+ }
+
+ public Throwable getThrown() {
+ return thrown;
+ }
+
+ public void setThrown(Throwable thrown) {
+ this.thrown = thrown;
+ }
+
+ /**
+ * Get the probe that generated this result. May be null
+ * @return a possibly null reference to a probe
+ */
+ public Probe getOriginator() {
+ return originator;
+ }
+
+ /**
+ * The probe has succeeded -capture the current timestamp, set
+ * success to true, and record any other data needed.
+ * @param probe probe
+ */
+ public void succeed(Probe probe) {
+ finish(probe, true, probe.getName(), null);
+ }
+
+ /**
+ * A probe has failed either because the test returned false, or an exception
+ * was thrown. The {@link #success} field is set to false, any exception
+ * thrown is recorded.
+ * @param probe probe that failed
+ * @param thrown an exception that was thrown.
+ */
+ public void fail(Probe probe, Throwable thrown) {
+ finish(probe, false, "Failure in " + probe, thrown);
+ }
+
+ public void finish(Probe probe, boolean succeeded, String text, Throwable thrown) {
+ setTimestamp(System.currentTimeMillis());
+ setSuccess(succeeded);
+ originator = probe;
+ message = text;
+ this.thrown = thrown;
+ }
+
+ @Override
+ public String toString() {
+ LogEntryBuilder builder = new LogEntryBuilder("Probe Status");
+ builder.elt("time", timestampText)
+ .elt("outcome", (success ? "success" : "failure"));
+
+ if (success != realOutcome) {
+ builder.elt("originaloutcome", (realOutcome ? "success" : "failure"));
+ }
+ builder.elt("message", message);
+ if (thrown != null) {
+ builder.elt("exception", thrown);
+ }
+
+ return builder.toString();
+ }
+
+ /**
+ * Flip the success bit on while the real outcome bit is kept false
+ */
+ public void markAsSuccessful() {
+ success = true;
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/727e6d78/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/hadoop/yarn/service/timelineservice/ServiceTimelineMetricsConstants.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/hadoop/yarn/service/timelineservice/ServiceTimelineMetricsConstants.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/hadoop/yarn/service/timelineservice/ServiceTimelineMetricsConstants.java
index 4f39921..78a7171 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/hadoop/yarn/service/timelineservice/ServiceTimelineMetricsConstants.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/hadoop/yarn/service/timelineservice/ServiceTimelineMetricsConstants.java
@@ -84,9 +84,6 @@ public final class ServiceTimelineMetricsConstants {
public static final String DESCRIPTION = "DESCRIPTION";
- public static final String UNIQUE_COMPONENT_SUPPORT =
- "UNIQUE_COMPONENT_SUPPORT";
-
public static final String RUN_PRIVILEGED_CONTAINER =
"RUN_PRIVILEGED_CONTAINER";
http://git-wip-us.apache.org/repos/asf/hadoop/blob/727e6d78/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/hadoop/yarn/service/timelineservice/ServiceTimelinePublisher.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/hadoop/yarn/service/timelineservice/ServiceTimelinePublisher.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/hadoop/yarn/service/timelineservice/ServiceTimelinePublisher.java
index f115063..243baea 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/hadoop/yarn/service/timelineservice/ServiceTimelinePublisher.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/hadoop/yarn/service/timelineservice/ServiceTimelinePublisher.java
@@ -21,24 +21,19 @@ package org.apache.hadoop.yarn.service.timelineservice;
import org.apache.hadoop.metrics2.AbstractMetric;
import org.apache.hadoop.service.CompositeService;
import org.apache.hadoop.yarn.api.records.ContainerState;
-import org.apache.hadoop.yarn.api.records.ContainerStatus;
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity;
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent;
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetric;
import org.apache.hadoop.yarn.client.api.TimelineV2Client;
-import org.apache.hadoop.yarn.util.timeline.TimelineUtils;
-import org.apache.slider.api.resource.Application;
-import org.apache.slider.api.resource.Component;
-import org.apache.slider.api.resource.ConfigFile;
-import org.apache.slider.api.resource.Configuration;
-import org.apache.slider.api.resource.Container;
-import org.apache.slider.common.tools.SliderUtils;
-import org.apache.slider.server.appmaster.actions.ActionStopSlider;
-import org.apache.hadoop.yarn.service.compinstance.ComponentInstance;
import org.apache.hadoop.yarn.service.ServiceContext;
-import org.apache.slider.server.appmaster.state.AppState;
-import org.apache.slider.server.appmaster.state.RoleInstance;
+import org.apache.hadoop.yarn.service.api.records.Application;
+import org.apache.hadoop.yarn.service.api.records.Component;
+import org.apache.hadoop.yarn.service.api.records.ConfigFile;
+import org.apache.hadoop.yarn.service.api.records.Configuration;
+import org.apache.hadoop.yarn.service.api.records.Container;
+import org.apache.hadoop.yarn.service.compinstance.ComponentInstance;
+import org.apache.hadoop.yarn.util.timeline.TimelineUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -50,6 +45,8 @@ import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
+import static org.apache.hadoop.yarn.service.timelineservice.ServiceTimelineMetricsConstants.DIAGNOSTICS_INFO;
+
/**
* A single service that publishes all the Timeline Entities.
*/
@@ -87,7 +84,8 @@ public class ServiceTimelinePublisher extends CompositeService {
timelineClient = client;
}
- public void serviceAttemptRegistered(Application application) {
+ public void serviceAttemptRegistered(Application application,
+ org.apache.hadoop.conf.Configuration systemConf) {
long currentTimeMillis = application.getLaunchTime() == null
? System.currentTimeMillis() : application.getLaunchTime().getTime();
@@ -114,9 +112,12 @@ public class ServiceTimelinePublisher extends CompositeService {
// publish before configurations published
putEntity(entity);
- // publish application specific configurations
- publishConfigurations(application.getConfiguration(), application.getId(),
- ServiceTimelineEntityType.SERVICE_ATTEMPT.toString(), true);
+ // publish system config - YarnConfiguration
+ populateTimelineEntity(systemConf.iterator(), application.getId(),
+ ServiceTimelineEntityType.SERVICE_ATTEMPT.toString());
+ // publish user conf
+ publishUserConf(application.getConfiguration(), application.getId(),
+ ServiceTimelineEntityType.SERVICE_ATTEMPT.toString());
// publish component as separate entity.
publishComponents(application.getComponents());
@@ -129,12 +130,14 @@ public class ServiceTimelinePublisher extends CompositeService {
putEntity(entity);
}
- public void serviceAttemptUnregistered(ServiceContext context) {
+ public void serviceAttemptUnregistered(ServiceContext context,
+ String diagnostics) {
TimelineEntity entity = createServiceAttemptEntity(
context.attemptId.getApplicationId().toString());
Map<String, Object> entityInfos = new HashMap<String, Object>();
entityInfos.put(ServiceTimelineMetricsConstants.STATE,
- FinalApplicationStatus.FAILED);
+ FinalApplicationStatus.ENDED);
+ entityInfos.put(DIAGNOSTICS_INFO, diagnostics);
entity.addInfo(entityInfos);
// add an event
@@ -147,39 +150,6 @@ public class ServiceTimelinePublisher extends CompositeService {
putEntity(entity);
}
- public void serviceAttemptUnregistered(AppState appState,
- ActionStopSlider stopAction) {
- long currentTimeMillis = System.currentTimeMillis();
-
- TimelineEntity entity =
- createServiceAttemptEntity(appState.getClusterStatus().getId());
-
- // add info
- Map<String, Object> entityInfos = new HashMap<String, Object>();
- entityInfos.put(ServiceTimelineMetricsConstants.EXIT_STATUS_CODE,
- stopAction.getExitCode());
- entityInfos.put(ServiceTimelineMetricsConstants.STATE,
- stopAction.getFinalApplicationStatus().toString());
- if (stopAction.getMessage() != null) {
- entityInfos.put(ServiceTimelineMetricsConstants.EXIT_REASON,
- stopAction.getMessage());
- }
- if (stopAction.getEx() != null) {
- entityInfos.put(ServiceTimelineMetricsConstants.DIAGNOSTICS_INFO,
- stopAction.getEx().toString());
- }
- entity.addInfo(entityInfos);
-
- // add an event
- TimelineEvent startEvent = new TimelineEvent();
- startEvent
- .setId(ServiceTimelineEvent.SERVICE_ATTEMPT_UNREGISTERED.toString());
- startEvent.setTimestamp(currentTimeMillis);
- entity.addEvent(startEvent);
-
- putEntity(entity);
- }
-
public void componentInstanceStarted(Container container,
ComponentInstance instance) {
@@ -210,29 +180,6 @@ public class ServiceTimelinePublisher extends CompositeService {
putEntity(entity);
}
- public void componentInstanceFinished(RoleInstance instance) {
- TimelineEntity entity = createComponentInstanceEntity(instance.id);
-
- // create info keys
- Map<String, Object> entityInfos = new HashMap<String, Object>();
- entityInfos.put(ServiceTimelineMetricsConstants.EXIT_STATUS_CODE,
- instance.exitCode);
- entityInfos.put(ServiceTimelineMetricsConstants.DIAGNOSTICS_INFO,
- instance.diagnostics);
- // TODO need to change the state based on enum value.
- entityInfos.put(ServiceTimelineMetricsConstants.STATE, "FINISHED");
- entity.addInfo(entityInfos);
-
- // add an event
- TimelineEvent startEvent = new TimelineEvent();
- startEvent
- .setId(ServiceTimelineEvent.COMPONENT_INSTANCE_UNREGISTERED.toString());
- startEvent.setTimestamp(System.currentTimeMillis());
- entity.addEvent(startEvent);
-
- putEntity(entity);
- }
-
public void componentInstanceFinished(ComponentInstance instance,
int exitCode, ContainerState state, String diagnostics) {
TimelineEntity entity = createComponentInstanceEntity(
@@ -242,7 +189,7 @@ public class ServiceTimelinePublisher extends CompositeService {
Map<String, Object> entityInfos = new HashMap<String, Object>();
entityInfos.put(ServiceTimelineMetricsConstants.EXIT_STATUS_CODE,
exitCode);
- entityInfos.put(ServiceTimelineMetricsConstants.DIAGNOSTICS_INFO, diagnostics);
+ entityInfos.put(DIAGNOSTICS_INFO, diagnostics);
entityInfos.put(ServiceTimelineMetricsConstants.STATE, state);
entity.addInfo(entityInfos);
@@ -302,8 +249,6 @@ public class ServiceTimelinePublisher extends CompositeService {
entityInfos.put(ServiceTimelineMetricsConstants.LAUNCH_COMMAND,
component.getLaunchCommand());
}
- entityInfos.put(ServiceTimelineMetricsConstants.UNIQUE_COMPONENT_SUPPORT,
- component.getUniqueComponentSupport().toString());
entityInfos.put(ServiceTimelineMetricsConstants.RUN_PRIVILEGED_CONTAINER,
component.getRunPrivilegedContainer().toString());
if (component.getPlacementPolicy() != null) {
@@ -315,31 +260,26 @@ public class ServiceTimelinePublisher extends CompositeService {
putEntity(entity);
// publish component specific configurations
- publishConfigurations(component.getConfiguration(), component.getName(),
- ServiceTimelineEntityType.COMPONENT.toString(), false);
+ publishUserConf(component.getConfiguration(), component.getName(),
+ ServiceTimelineEntityType.COMPONENT.toString());
}
}
- private void publishConfigurations(Configuration configuration,
- String entityId, String entityType, boolean isServiceAttemptEntity) {
- if (isServiceAttemptEntity) {
- // publish slider-client.xml properties at service level
- publishConfigurations(SliderUtils.loadSliderClientXML().iterator(),
- entityId, entityType);
- }
- publishConfigurations(configuration.getProperties().entrySet().iterator(),
+ private void publishUserConf(Configuration configuration,
+ String entityId, String entityType) {
+ populateTimelineEntity(configuration.getProperties().entrySet().iterator(),
entityId, entityType);
- publishConfigurations(configuration.getEnv().entrySet().iterator(),
+ populateTimelineEntity(configuration.getEnv().entrySet().iterator(),
entityId, entityType);
for (ConfigFile configFile : configuration.getFiles()) {
- publishConfigurations(configFile.getProps().entrySet().iterator(),
+ populateTimelineEntity(configFile.getProps().entrySet().iterator(),
entityId, entityType);
}
}
- private void publishConfigurations(Iterator<Entry<String, String>> iterator,
+ private void populateTimelineEntity(Iterator<Entry<String, String>> iterator,
String entityId, String entityType) {
int configSize = 0;
TimelineEntity entity = createTimelineEntity(entityId, entityType);
http://git-wip-us.apache.org/repos/asf/hadoop/blob/727e6d78/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/hadoop/yarn/service/utils/ApplicationReportSerDeser.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/hadoop/yarn/service/utils/ApplicationReportSerDeser.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/hadoop/yarn/service/utils/ApplicationReportSerDeser.java
new file mode 100644
index 0000000..2607c08
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/hadoop/yarn/service/utils/ApplicationReportSerDeser.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.service.utils;
+
+import org.codehaus.jackson.JsonGenerationException;
+import org.codehaus.jackson.JsonParseException;
+import org.codehaus.jackson.map.JsonMappingException;
+
+import java.io.IOException;
+
+/**
+ * Persistence of {@link SerializedApplicationReport}
+ *
+ */
+public class ApplicationReportSerDeser
+ extends JsonSerDeser<SerializedApplicationReport> {
+ public ApplicationReportSerDeser() {
+ super(SerializedApplicationReport.class);
+ }
+
+
+ private static final ApplicationReportSerDeser
+ staticinstance = new ApplicationReportSerDeser();
+
+ /**
+ * Convert an instance to a JSON string -sync access to a shared ser/deser
+ * object instance
+ * @param instance object to convert
+ * @return a JSON string description
+ * @throws JsonParseException parse problems
+ * @throws JsonMappingException O/J mapping problems
+ */
+ public static String toString(SerializedApplicationReport instance)
+ throws IOException, JsonGenerationException, JsonMappingException {
+ synchronized (staticinstance) {
+ return staticinstance.toJson(instance);
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/727e6d78/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/hadoop/yarn/service/utils/ClientRegistryBinder.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/hadoop/yarn/service/utils/ClientRegistryBinder.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/hadoop/yarn/service/utils/ClientRegistryBinder.java
new file mode 100644
index 0000000..86896b2
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/hadoop/yarn/service/utils/ClientRegistryBinder.java
@@ -0,0 +1,201 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.service.utils;
+
+import com.google.common.base.Preconditions;
+import org.apache.hadoop.fs.PathNotFoundException;
+import org.apache.hadoop.registry.client.api.RegistryConstants;
+import org.apache.hadoop.registry.client.api.RegistryOperations;
+import org.apache.hadoop.registry.client.binding.RegistryPathUtils;
+import org.apache.hadoop.registry.client.binding.RegistryTypeUtils;
+import org.apache.hadoop.registry.client.exceptions.InvalidRecordException;
+import org.apache.hadoop.registry.client.impl.zk.RegistryInternalConstants;
+import org.apache.hadoop.registry.client.types.Endpoint;
+import org.apache.hadoop.registry.client.types.ServiceRecord;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.List;
+
+import static org.apache.hadoop.registry.client.binding.RegistryPathUtils.encodeForRegistry;
+import static org.apache.hadoop.registry.client.binding.RegistryUtils.convertUsername;
+import static org.apache.hadoop.registry.client.binding.RegistryUtils.getCurrentUsernameUnencoded;
+import static org.apache.hadoop.registry.client.binding.RegistryUtils.servicePath;
+
+/**
+ * Generic code to get the URLs for clients via the registry
+ */
+public class ClientRegistryBinder {
+ private static final Logger log =
+ LoggerFactory.getLogger(ClientRegistryBinder.class);
+
+ private final RegistryOperations operations;
+
+ public ClientRegistryBinder(RegistryOperations operations) {
+ this.operations = operations;
+ }
+
+ /**
+ * Buld the user path -switches to the system path if the user is "".
+ * It also cross-converts the username to ascii via punycode
+ * @param username username or ""
+ * @return the path to the user
+ */
+ public static String homePathForUser(String username) {
+ Preconditions.checkArgument(username != null, "null user");
+
+ // catch recursion
+ if (username.startsWith(RegistryConstants.PATH_USERS)) {
+ return username;
+ }
+
+ if (username.isEmpty()) {
+ return RegistryConstants.PATH_SYSTEM_SERVICES;
+ }
+
+ // convert username to registry name
+ String convertedName = convertUsername(username);
+
+ return RegistryPathUtils.join(RegistryConstants.PATH_USERS,
+ encodeForRegistry(convertedName));
+ }
+
+ /**
+ * Get the current username, before any encoding has been applied.
+ * @return the current user from the kerberos identity, falling back
+ * to the user and/or env variables.
+ */
+ public static String currentUsernameUnencoded() {
+ String env_hadoop_username = System.getenv(
+ RegistryInternalConstants.HADOOP_USER_NAME);
+ return getCurrentUsernameUnencoded(env_hadoop_username);
+ }
+
+ /**
+ * Qualify a user.
+ * <ol>
+ * <li> <code>"~"</code> maps to user home path home</li>
+ * <li> <code>"~user"</code> maps to <code>/users/$user</code></li>
+ * <li> <code>"/"</code> maps to <code>/services/</code></li>
+ * </ol>
+ * @param user the username
+ * @return the base path
+ */
+ public static String qualifyUser(String user) {
+ // qualify the user
+ String t = user.trim();
+ if (t.startsWith("/")) {
+ // already resolved
+ return t;
+ } else if (t.equals("~")) {
+ // self
+ return currentUsernameUnencoded();
+ } else if (t.startsWith("~")) {
+ // another user
+ // convert username to registry name
+ String convertedName = convertUsername(t.substring(1));
+
+ return RegistryPathUtils.join(RegistryConstants.PATH_USERS,
+ encodeForRegistry(convertedName));
+ } else {
+ return "/" + t;
+ }
+ }
+
+ /**
+ * Look up an external REST API
+ * @param user user which will be qualified as per {@link #qualifyUser(String)}
+ * @param serviceClass service class
+ * @param instance instance name
+ * @param api API
+ * @return the API, or an exception is raised.
+ * @throws IOException
+ */
+ public String lookupExternalRestAPI(String user,
+ String serviceClass,
+ String instance,
+ String api)
+ throws IOException {
+ String qualified = qualifyUser(user);
+ String path = servicePath(qualified, serviceClass, instance);
+ String restAPI = resolveExternalRestAPI(api, path);
+ if (restAPI == null) {
+ throw new PathNotFoundException(path + " API " + api);
+ }
+ return restAPI;
+ }
+
+ /**
+ * Resolve a service record then return an external REST API exported it.
+ *
+ * @param api API to resolve
+ * @param path path of the service record
+ * @return null if the record exists but the API is absent or it has no
+ * REST endpoints.
+ * @throws IOException resolution problems, as covered in
+ * {@link RegistryOperations#resolve(String)}
+ */
+ protected String resolveExternalRestAPI(String api, String path) throws
+ IOException {
+ ServiceRecord record = operations.resolve(path);
+ return lookupRestAPI(record, api, true);
+ }
+
+ /**
+ * Look up an external REST API endpoint
+ * @param record service record
+ * @param api URI of api
+ * @param external flag to indicate this is an external record
+ * @return the first endpoint of the implementation, or null if there
+ * is no entry for the API, implementation or it's the wrong type.
+ */
+ public static String lookupRestAPI(ServiceRecord record,
+ String api, boolean external) throws InvalidRecordException {
+ try {
+ String url = null;
+ Endpoint endpoint = getEndpoint(record, api, external);
+ List<String> addresses =
+ RegistryTypeUtils.retrieveAddressesUriType(endpoint);
+ if (addresses != null && !addresses.isEmpty()) {
+ url = addresses.get(0);
+ }
+ return url;
+ } catch (InvalidRecordException e) {
+ log.debug("looking for API {}", api, e);
+ return null;
+ }
+ }
+
+ /**
+ * Get an endpont by API
+ * @param record service record
+ * @param api API
+ * @param external flag to indicate this is an external record
+ * @return the endpoint or null
+ */
+ public static Endpoint getEndpoint(ServiceRecord record,
+ String api,
+ boolean external) {
+ return external ? record.getExternalEndpoint(api)
+ : record.getInternalEndpoint(api);
+ }
+
+
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/727e6d78/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/hadoop/yarn/service/utils/Comparators.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/hadoop/yarn/service/utils/Comparators.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/hadoop/yarn/service/utils/Comparators.java
new file mode 100644
index 0000000..9f0e5d4
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/hadoop/yarn/service/utils/Comparators.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.service.utils;
+
+import java.io.Serializable;
+import java.util.Comparator;
+
+/**
+ * Some general comparators
+ */
+public class Comparators {
+
+ public static class LongComparator implements Comparator<Long>, Serializable {
+ @Override
+ public int compare(Long o1, Long o2) {
+ return o1.compareTo(o2);
+ }
+ }
+
+ public static class InvertedLongComparator
+ implements Comparator<Long>, Serializable {
+ @Override
+ public int compare(Long o1, Long o2) {
+ return o2.compareTo(o1);
+ }
+ }
+
+ /**
+ * Little template class to reverse any comparitor
+ * @param <CompareType> the type that is being compared
+ */
+ public static class ComparatorReverser<CompareType> implements Comparator<CompareType>,
+ Serializable {
+
+ final Comparator<CompareType> instance;
+
+ public ComparatorReverser(Comparator<CompareType> instance) {
+ this.instance = instance;
+ }
+
+ @Override
+ public int compare(CompareType first, CompareType second) {
+ return instance.compare(second, first);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/727e6d78/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/hadoop/yarn/service/utils/ConfigHelper.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/hadoop/yarn/service/utils/ConfigHelper.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/hadoop/yarn/service/utils/ConfigHelper.java
new file mode 100644
index 0000000..fe8cce8
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/hadoop/yarn/service/utils/ConfigHelper.java
@@ -0,0 +1,157 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.service.utils;
+
+import com.google.common.base.Preconditions;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.yarn.service.exceptions.BadConfigException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.io.StringWriter;
+import java.net.URL;
+import java.util.Map;
+
+/**
+ * Methods to aid in config, both in the Configuration class and
+ * with other parts of setting up Slider-initated processes.
+ *
+ * Some of the methods take an argument of a map iterable for their sources; this allows
+ * the same method
+ */
+public class ConfigHelper {
+ private static final Logger log = LoggerFactory.getLogger(ConfigHelper.class);
+
+ /**
+ * Set an entire map full of values
+ *
+ * @param config config to patch
+ * @param map map of data
+ * @param origin origin data
+ */
+ public static void addConfigMap(Configuration config,
+ Map<String, String> map,
+ String origin) throws BadConfigException {
+ addConfigMap(config, map.entrySet(), origin);
+ }
+
+ /**
+ * Set an entire map full of values
+ *
+ * @param config config to patch
+ * @param map map of data
+ * @param origin origin data
+ */
+ public static void addConfigMap(Configuration config,
+ Iterable<Map.Entry<String, String>> map,
+ String origin) throws BadConfigException {
+ for (Map.Entry<String, String> mapEntry : map) {
+ String key = mapEntry.getKey();
+ String value = mapEntry.getValue();
+ if (value == null) {
+ throw new BadConfigException("Null value for property " + key);
+ }
+ config.set(key, value, origin);
+ }
+ }
+
+ /**
+ * Convert to an XML string
+ * @param conf configuration
+ * @return conf
+ * @throws IOException
+ */
+ public static String toXml(Configuration conf) throws IOException {
+ StringWriter writer = new StringWriter();
+ conf.writeXml(writer);
+ return writer.toString();
+ }
+
+
+ /**
+ * Register a resource as a default resource.
+ * Do not attempt to use this unless you understand that the
+ * order in which default resources are loaded affects the outcome,
+ * and that subclasses of Configuration often register new default
+ * resources
+ * @param resource the resource name
+ * @return the URL or null
+ */
+ public static URL registerDefaultResource(String resource) {
+ URL resURL = getResourceUrl(resource);
+ if (resURL != null) {
+ Configuration.addDefaultResource(resource);
+ }
+ return resURL;
+ }
+
+ /**
+ * Load a configuration from a resource on this classpath.
+ * If the resource is not found, an empty configuration is returned
+ * @param resource the resource name
+ * @return the loaded configuration.
+ */
+ public static Configuration loadFromResource(String resource) {
+ Configuration conf = new Configuration(false);
+ URL resURL = getResourceUrl(resource);
+ if (resURL != null) {
+ log.debug("loaded resources from {}", resURL);
+ conf.addResource(resource);
+ } else{
+ log.debug("failed to find {} on the classpath", resource);
+ }
+ return conf;
+
+ }
+
+ /**
+ * Get the URL to a resource, null if not on the CP
+ * @param resource resource to look for
+ * @return the URL or null
+ */
+ public static URL getResourceUrl(String resource) {
+ return ConfigHelper.class.getClassLoader()
+ .getResource(resource);
+ }
+
+ /**
+ * This goes through the keyset of one configuration and retrieves each value
+ * from a value source -a different or the same configuration. This triggers
+ * the property resolution process of the value, resolving any variables against
+ * in-config or inherited configurations
+ * @param keysource source of keys
+ * @param valuesource the source of values
+ * @return a new configuration where <code>foreach key in keysource, get(key)==valuesource.get(key)</code>
+ */
+ public static Configuration resolveConfiguration(
+ Iterable<Map.Entry<String, String>> keysource,
+ Configuration valuesource) {
+ Configuration result = new Configuration(false);
+ for (Map.Entry<String, String> entry : keysource) {
+ String key = entry.getKey();
+ String value = valuesource.get(key);
+ Preconditions.checkState(value != null,
+ "no reference for \"%s\" in values", key);
+ result.set(key, value);
+ }
+ return result;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/727e6d78/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/hadoop/yarn/service/utils/ConfigUtils.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/hadoop/yarn/service/utils/ConfigUtils.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/hadoop/yarn/service/utils/ConfigUtils.java
new file mode 100644
index 0000000..a969be9
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/hadoop/yarn/service/utils/ConfigUtils.java
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.service.utils;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.yarn.service.api.records.ConfigFormat;
+import org.apache.hadoop.yarn.service.utils.SliderFileSystem;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+public class ConfigUtils {
+ public static final String TEMPLATE_FILE = "template.file";
+
+ public static String replaceProps(Map<String, String> config, String content) {
+ Map<String, String> tokens = new HashMap<>();
+ for (Entry<String, String> entry : config.entrySet()) {
+ tokens.put("${" + entry.getKey() + "}", entry.getValue());
+ tokens.put("{{" + entry.getKey() + "}}", entry.getValue());
+ }
+ String value = content;
+ for (Map.Entry<String,String> token : tokens.entrySet()) {
+ value = value.replaceAll(Pattern.quote(token.getKey()),
+ Matcher.quoteReplacement(token.getValue()));
+ }
+ return value;
+ }
+
+ public static Map<String, String> replacePropsInConfig(
+ Map<String, String> config, Map<String, String> env) {
+ Map<String, String> tokens = new HashMap<>();
+ for (Entry<String, String> entry : env.entrySet()) {
+ tokens.put("${" + entry.getKey() + "}", entry.getValue());
+ }
+ Map<String, String> newConfig = new HashMap<>();
+ for (Entry<String, String> entry : config.entrySet()) {
+ String value = entry.getValue();
+ for (Map.Entry<String,String> token : tokens.entrySet()) {
+ value = value.replaceAll(Pattern.quote(token.getKey()),
+ Matcher.quoteReplacement(token.getValue()));
+ }
+ newConfig.put(entry.getKey(), entry.getValue());
+ }
+ return newConfig;
+ }
+
+ public static void prepConfigForTemplateOutputter(ConfigFormat configFormat,
+ Map<String, String> config, SliderFileSystem fileSystem,
+ String clusterName, String fileName) throws IOException {
+ if (!configFormat.equals(ConfigFormat.TEMPLATE)) {
+ return;
+ }
+ Path templateFile = null;
+ if (config.containsKey(TEMPLATE_FILE)) {
+ templateFile = fileSystem.buildResourcePath(config.get(TEMPLATE_FILE));
+ if (!fileSystem.isFile(templateFile)) {
+ templateFile = fileSystem.buildResourcePath(clusterName,
+ config.get(TEMPLATE_FILE));
+ }
+ if (!fileSystem.isFile(templateFile)) {
+ throw new IOException("config specified template file " + config
+ .get(TEMPLATE_FILE) + " but " + templateFile + " doesn't exist");
+ }
+ }
+ if (templateFile == null && fileName != null) {
+ templateFile = fileSystem.buildResourcePath(fileName);
+ if (!fileSystem.isFile(templateFile)) {
+ templateFile = fileSystem.buildResourcePath(clusterName,
+ fileName);
+ }
+ }
+ if (fileSystem.isFile(templateFile)) {
+ config.put("content", fileSystem.cat(templateFile));
+ } else {
+ config.put("content", "");
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/727e6d78/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/hadoop/yarn/service/utils/CoreFileSystem.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/hadoop/yarn/service/utils/CoreFileSystem.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/hadoop/yarn/service/utils/CoreFileSystem.java
new file mode 100644
index 0000000..fa3b402
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/hadoop/yarn/service/utils/CoreFileSystem.java
@@ -0,0 +1,521 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.service.utils;
+
+import com.google.common.base.Preconditions;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.CommonConfigurationKeys;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.util.VersionInfo;
+import org.apache.hadoop.yarn.api.records.LocalResource;
+import org.apache.hadoop.yarn.api.records.LocalResourceType;
+import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
+import org.apache.hadoop.yarn.api.records.URL;
+import org.apache.hadoop.yarn.service.conf.SliderExitCodes;
+import org.apache.hadoop.yarn.service.conf.YarnServiceConstants;
+import org.apache.hadoop.yarn.service.conf.YarnServiceConf;
+import org.apache.hadoop.yarn.service.exceptions.BadClusterStateException;
+import org.apache.hadoop.yarn.service.exceptions.ErrorStrings;
+import org.apache.hadoop.yarn.service.exceptions.SliderException;
+import org.apache.hadoop.yarn.util.Records;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+
+public class CoreFileSystem {
+ private static final Logger
+ log = LoggerFactory.getLogger(CoreFileSystem.class);
+
+ private static final String UTF_8 = "UTF-8";
+
+ protected final FileSystem fileSystem;
+ protected final Configuration configuration;
+
+ public CoreFileSystem(FileSystem fileSystem, Configuration configuration) {
+ Preconditions.checkNotNull(fileSystem,
+ "Cannot create a CoreFileSystem with a null FileSystem");
+ Preconditions.checkNotNull(configuration,
+ "Cannot create a CoreFileSystem with a null Configuration");
+ this.fileSystem = fileSystem;
+ this.configuration = configuration;
+ }
+
+ public CoreFileSystem(Configuration configuration) throws IOException {
+ Preconditions.checkNotNull(configuration,
+ "Cannot create a CoreFileSystem with a null Configuration");
+ this.fileSystem = FileSystem.get(configuration);
+ this.configuration = configuration;
+ }
+
+ /**
+ * Get the temp path for this cluster
+ * @param clustername name of the cluster
+ * @return path for temp files (is not purged)
+ */
+ public Path getTempPathForCluster(String clustername) {
+ Path clusterDir = buildClusterDirPath(clustername);
+ return new Path(clusterDir, YarnServiceConstants.TMP_DIR_PREFIX);
+ }
+
+ /**
+ * Returns the underlying FileSystem for this object.
+ *
+ * @return filesystem
+ */
+ public FileSystem getFileSystem() {
+ return fileSystem;
+ }
+
+ @Override
+ public String toString() {
+ final StringBuilder sb =
+ new StringBuilder("CoreFileSystem{");
+ sb.append("fileSystem=").append(fileSystem.getUri());
+ sb.append('}');
+ return sb.toString();
+ }
+
+ /**
+ * Build up the path string for a cluster instance -no attempt to
+ * create the directory is made
+ *
+ * @param clustername name of the cluster
+ * @return the path for persistent data
+ */
+ public Path buildClusterDirPath(String clustername) {
+ Preconditions.checkNotNull(clustername);
+ Path path = getBaseApplicationPath();
+ return new Path(path, YarnServiceConstants.SERVICES_DIRECTORY + "/" + clustername);
+ }
+
+
+ /**
+ * Build up the path string for keytab install location -no attempt to
+ * create the directory is made
+ *
+ * @return the path for keytab
+ */
+ public Path buildKeytabInstallationDirPath(String keytabFolder) {
+ Preconditions.checkNotNull(keytabFolder);
+ Path path = getBaseApplicationPath();
+ return new Path(path, YarnServiceConstants.KEYTAB_DIR + "/" + keytabFolder);
+ }
+
+ /**
+ * Build up the path string for keytab install location -no attempt to
+ * create the directory is made
+ *
+ * @return the path for keytab installation location
+ */
+ public Path buildKeytabPath(String keytabDir, String keytabName, String clusterName) {
+ Path homePath = getHomeDirectory();
+ Path baseKeytabDir;
+ if (keytabDir != null) {
+ baseKeytabDir = new Path(homePath, keytabDir);
+ } else {
+ baseKeytabDir = new Path(buildClusterDirPath(clusterName),
+ YarnServiceConstants.KEYTAB_DIR);
+ }
+ return keytabName == null ? baseKeytabDir :
+ new Path(baseKeytabDir, keytabName);
+ }
+
+ /**
+ * Build up the path string for resource install location -no attempt to
+ * create the directory is made
+ *
+ * @return the path for resource
+ */
+ public Path buildResourcePath(String resourceFolder) {
+ Preconditions.checkNotNull(resourceFolder);
+ Path path = getBaseApplicationPath();
+ return new Path(path, YarnServiceConstants.RESOURCE_DIR + "/" + resourceFolder);
+ }
+
+ /**
+ * Build up the path string for resource install location -no attempt to
+ * create the directory is made
+ *
+ * @return the path for resource
+ */
+ public Path buildResourcePath(String dirName, String fileName) {
+ Preconditions.checkNotNull(dirName);
+ Preconditions.checkNotNull(fileName);
+ Path path = getBaseApplicationPath();
+ return new Path(path, YarnServiceConstants.RESOURCE_DIR + "/" + dirName + "/" + fileName);
+ }
+
+ /**
+ * Create a directory with the given permissions.
+ *
+ * @param dir directory
+ * @param clusterPerms cluster permissions
+ * @throws IOException IO problem
+ * @throws BadClusterStateException any cluster state problem
+ */
+ @SuppressWarnings("deprecation")
+ public void createWithPermissions(Path dir, FsPermission clusterPerms) throws
+ IOException,
+ BadClusterStateException {
+ if (fileSystem.isFile(dir)) {
+ // HADOOP-9361 shows some filesystems don't correctly fail here
+ throw new BadClusterStateException(
+ "Cannot create a directory over a file %s", dir);
+ }
+ log.debug("mkdir {} with perms {}", dir, clusterPerms);
+ //no mask whatoever
+ fileSystem.getConf().set(CommonConfigurationKeys.FS_PERMISSIONS_UMASK_KEY, "000");
+ fileSystem.mkdirs(dir, clusterPerms);
+ //and force set it anyway just to make sure
+ fileSystem.setPermission(dir, clusterPerms);
+ }
+
+ /**
+ * Verify that the cluster directory is not present
+ *
+ * @param clustername name of the cluster
+ * @param clusterDirectory actual directory to look for
+ * @throws IOException trouble with FS
+ * @throws SliderException If the directory exists
+ */
+ public void verifyClusterDirectoryNonexistent(String clustername,
+ Path clusterDirectory)
+ throws IOException, SliderException {
+ if (fileSystem.exists(clusterDirectory)) {
+ throw new SliderException(SliderExitCodes.EXIT_INSTANCE_EXISTS,
+ ErrorStrings.PRINTF_E_INSTANCE_ALREADY_EXISTS, clustername,
+ clusterDirectory);
+ }
+ }
+ /**
+ * Verify that the given directory is not present
+ *
+ * @param clusterDirectory actual directory to look for
+ * @throws IOException trouble with FS
+ * @throws SliderException If the directory exists
+ */
+ public void verifyDirectoryNonexistent(Path clusterDirectory) throws
+ IOException,
+ SliderException {
+ if (fileSystem.exists(clusterDirectory)) {
+
+ log.error("Dir {} exists: {}",
+ clusterDirectory,
+ listFSDir(clusterDirectory));
+ throw new SliderException(SliderExitCodes.EXIT_INSTANCE_EXISTS,
+ ErrorStrings.PRINTF_E_INSTANCE_DIR_ALREADY_EXISTS,
+ clusterDirectory);
+ }
+ }
+
+ /**
+ * Verify that a user has write access to a directory.
+ * It does this by creating then deleting a temp file
+ *
+ * @param dirPath actual directory to look for
+ * @throws FileNotFoundException file not found
+ * @throws IOException trouble with FS
+ * @throws BadClusterStateException if the directory is not writeable
+ */
+ public void verifyDirectoryWriteAccess(Path dirPath) throws IOException,
+ SliderException {
+ verifyPathExists(dirPath);
+ Path tempFile = new Path(dirPath, "tmp-file-for-checks");
+ try {
+ FSDataOutputStream out ;
+ out = fileSystem.create(tempFile, true);
+ IOUtils.closeStream(out);
+ fileSystem.delete(tempFile, false);
+ } catch (IOException e) {
+ log.warn("Failed to create file {}: {}", tempFile, e);
+ throw new BadClusterStateException(e,
+ "Unable to write to directory %s : %s", dirPath, e.toString());
+ }
+ }
+
+ /**
+ * Verify that a path exists
+ * @param path path to check
+ * @throws FileNotFoundException file not found
+ * @throws IOException trouble with FS
+ */
+ public void verifyPathExists(Path path) throws IOException {
+ if (!fileSystem.exists(path)) {
+ throw new FileNotFoundException(path.toString());
+ }
+ }
+
+ /**
+ * Verify that a path exists
+ * @param path path to check
+ * @throws FileNotFoundException file not found or is not a file
+ * @throws IOException trouble with FS
+ */
+ public void verifyFileExists(Path path) throws IOException {
+ FileStatus status = fileSystem.getFileStatus(path);
+
+ if (!status.isFile()) {
+ throw new FileNotFoundException("Not a file: " + path.toString());
+ }
+ }
+
+ /**
+ * Given a path, check if it exists and is a file
+ *
+ * @param path
+ * absolute path to the file to check
+ * @return true if and only if path exists and is a file, false for all other
+ * reasons including if file check throws IOException
+ */
+ public boolean isFile(Path path) {
+ boolean isFile = false;
+ try {
+ FileStatus status = fileSystem.getFileStatus(path);
+ if (status.isFile()) {
+ isFile = true;
+ }
+ } catch (IOException e) {
+ // ignore, isFile is already set to false
+ }
+ return isFile;
+ }
+
+ /**
+ * Get the base path
+ *
+ * @return the base path optionally configured by
+ * {@link YarnServiceConf#YARN_SERVICE_BASE_PATH}
+ */
+ public Path getBaseApplicationPath() {
+ String configuredBasePath = configuration
+ .get(YarnServiceConf.YARN_SERVICE_BASE_PATH,
+ getHomeDirectory() + "/" + YarnServiceConstants.SERVICE_BASE_DIRECTORY);
+ return new Path(configuredBasePath);
+ }
+
+ /**
+ * Get slider dependency parent dir in HDFS
+ *
+ * @return the parent dir path of slider.tar.gz in HDFS
+ */
+ public Path getDependencyPath() {
+ String parentDir = YarnServiceConstants.DEPENDENCY_DIR;
+ return new Path(String.format(parentDir, VersionInfo.getVersion()));
+ }
+
+ /**
+ * Get slider.tar.gz absolute filepath in HDFS
+ *
+ * @return the absolute path to slider.tar.gz in HDFS
+ */
+ public Path getDependencyTarGzip() {
+ Path dependencyLibAmPath = getDependencyPath();
+ Path dependencyLibTarGzip = new Path(
+ dependencyLibAmPath.toUri().toString(),
+ YarnServiceConstants.DEPENDENCY_TAR_GZ_FILE_NAME
+ + YarnServiceConstants.DEPENDENCY_TAR_GZ_FILE_EXT);
+ return dependencyLibTarGzip;
+ }
+
+ public Path getHomeDirectory() {
+ return fileSystem.getHomeDirectory();
+ }
+
+ /**
+ * Create an AM resource from the
+ *
+ * @param destPath dest path in filesystem
+ * @param resourceType resource type
+ * @return the local resource for AM
+ */
+ public LocalResource createAmResource(Path destPath, LocalResourceType resourceType) throws IOException {
+ FileStatus destStatus = fileSystem.getFileStatus(destPath);
+ LocalResource amResource = Records.newRecord(LocalResource.class);
+ amResource.setType(resourceType);
+ // Set visibility of the resource
+ // Setting to most private option
+ amResource.setVisibility(LocalResourceVisibility.APPLICATION);
+ // Set the resource to be copied over
+ amResource.setResource(
+ URL.fromPath(fileSystem.resolvePath(destStatus.getPath())));
+ // Set timestamp and length of file so that the framework
+ // can do basic sanity checks for the local resource
+ // after it has been copied over to ensure it is the same
+ // resource the client intended to use with the application
+ amResource.setTimestamp(destStatus.getModificationTime());
+ amResource.setSize(destStatus.getLen());
+ return amResource;
+ }
+
+ /**
+ * Register all files under a fs path as a directory to push out
+ *
+ * @param srcDir src dir
+ * @param destRelativeDir dest dir (no trailing /)
+ * @return the map of entries
+ */
+ public Map<String, LocalResource> submitDirectory(Path srcDir, String destRelativeDir) throws IOException {
+ //now register each of the files in the directory to be
+ //copied to the destination
+ FileStatus[] fileset = fileSystem.listStatus(srcDir);
+ Map<String, LocalResource> localResources =
+ new HashMap<String, LocalResource>(fileset.length);
+ for (FileStatus entry : fileset) {
+
+ LocalResource resource = createAmResource(entry.getPath(),
+ LocalResourceType.FILE);
+ String relativePath = destRelativeDir + "/" + entry.getPath().getName();
+ localResources.put(relativePath, resource);
+ }
+ return localResources;
+ }
+
+ /**
+ * Submit a JAR containing a specific class, returning
+ * the resource to be mapped in
+ *
+ * @param clazz class to look for
+ * @param subdir subdirectory (expected to end in a "/")
+ * @param jarName <i>At the destination</i>
+ * @return the local resource ref
+ * @throws IOException trouble copying to HDFS
+ */
+ public LocalResource submitJarWithClass(Class clazz, Path tempPath, String subdir, String jarName)
+ throws IOException, SliderException {
+ File localFile = SliderUtils.findContainingJarOrFail(clazz);
+ return submitFile(localFile, tempPath, subdir, jarName);
+ }
+
+ /**
+ * Submit a local file to the filesystem references by the instance's cluster
+ * filesystem
+ *
+ * @param localFile filename
+ * @param subdir subdirectory (expected to end in a "/")
+ * @param destFileName destination filename
+ * @return the local resource ref
+ * @throws IOException trouble copying to HDFS
+ */
+ public LocalResource submitFile(File localFile, Path tempPath, String subdir, String destFileName)
+ throws IOException {
+ Path src = new Path(localFile.toString());
+ Path subdirPath = new Path(tempPath, subdir);
+ fileSystem.mkdirs(subdirPath);
+ Path destPath = new Path(subdirPath, destFileName);
+ log.debug("Copying {} (size={} bytes) to {}", localFile, localFile.length(), destPath);
+
+ fileSystem.copyFromLocalFile(false, true, src, destPath);
+
+ // Set the type of resource - file or archive
+ // archives are untarred at destination
+ // we don't need the jar file to be untarred for now
+ return createAmResource(destPath, LocalResourceType.FILE);
+ }
+
+ /**
+ * Submit the AM tar.gz resource referenced by the instance's cluster
+ * filesystem. Also, update the providerResources object with the new
+ * resource.
+ *
+ * @param providerResources
+ * the provider resource map to be updated
+ * @throws IOException
+ * trouble copying to HDFS
+ */
+ public void submitTarGzipAndUpdate(
+ Map<String, LocalResource> providerResources) throws IOException,
+ BadClusterStateException {
+ Path dependencyLibTarGzip = getDependencyTarGzip();
+ LocalResource lc = createAmResource(dependencyLibTarGzip,
+ LocalResourceType.ARCHIVE);
+ providerResources.put(YarnServiceConstants.DEPENDENCY_LOCALIZED_DIR_LINK, lc);
+ }
+
+ public void copyLocalFileToHdfs(File localPath,
+ Path destPath, FsPermission fp)
+ throws IOException {
+ if (localPath == null || destPath == null) {
+ throw new IOException("Either localPath or destPath is null");
+ }
+ fileSystem.getConf().set(CommonConfigurationKeys.FS_PERMISSIONS_UMASK_KEY,
+ "000");
+ fileSystem.mkdirs(destPath.getParent(), fp);
+ log.info("Copying file {} to {}", localPath.toURI(),
+ fileSystem.getScheme() + ":/" + destPath.toUri());
+
+ fileSystem.copyFromLocalFile(false, true, new Path(localPath.getPath()),
+ destPath);
+ // set file permissions of the destPath
+ fileSystem.setPermission(destPath, fp);
+ }
+
+ public void copyHdfsFileToLocal(Path hdfsPath, File destFile)
+ throws IOException {
+ if (hdfsPath == null || destFile == null) {
+ throw new IOException("Either hdfsPath or destPath is null");
+ }
+ log.info("Copying file {} to {}", hdfsPath.toUri(), destFile.toURI());
+
+ Path destPath = new Path(destFile.getPath());
+ fileSystem.copyToLocalFile(hdfsPath, destPath);
+ }
+
+ /**
+ * list entries in a filesystem directory
+ *
+ * @param path directory
+ * @return a listing, one to a line
+ * @throws IOException
+ */
+ public String listFSDir(Path path) throws IOException {
+ FileStatus[] stats = fileSystem.listStatus(path);
+ StringBuilder builder = new StringBuilder();
+ for (FileStatus stat : stats) {
+ builder.append(stat.getPath().toString())
+ .append("\t")
+ .append(stat.getLen())
+ .append("\n");
+ }
+ return builder.toString();
+ }
+
+ public String cat(Path path) throws IOException {
+ FileStatus status = fileSystem.getFileStatus(path);
+ byte[] b = new byte[(int) status.getLen()];
+ FSDataInputStream in = null;
+ try {
+ in = fileSystem.open(path);
+ int count = in.read(b);
+ return new String(b, 0, count, UTF_8);
+ } finally {
+ IOUtils.closeStream(in);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/727e6d78/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/hadoop/yarn/service/utils/Duration.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/hadoop/yarn/service/utils/Duration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/hadoop/yarn/service/utils/Duration.java
new file mode 100644
index 0000000..6fadfd3
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/hadoop/yarn/service/utils/Duration.java
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.service.utils;
+
+import java.io.Closeable;
+
+/**
+ * A duration in milliseconds. This class can be used
+ * to count time, and to be polled to see if a time limit has
+ * passed.
+ */
+public class Duration implements Closeable {
+ public long start, finish;
+ public final long limit;
+
+ /**
+ * Create a duration instance with a limit of 0
+ */
+ public Duration() {
+ this(0);
+ }
+
+ /**
+ * Create a duration with a limit specified in millis
+ * @param limit duration in milliseconds
+ */
+ public Duration(long limit) {
+ this.limit = limit;
+ }
+
+ /**
+ * Start
+ * @return self
+ */
+ public Duration start() {
+ start = now();
+ return this;
+ }
+
+ /**
+ * The close operation relays to {@link #finish()}.
+ * Implementing it allows Duration instances to be automatically
+ * finish()'d in Java7 try blocks for when used in measuring durations.
+ */
+ @Override
+ public final void close() {
+ finish();
+ }
+
+ public void finish() {
+ finish = now();
+ }
+
+ protected long now() {
+ return System.nanoTime()/1000000;
+ }
+
+ public long getInterval() {
+ return finish - start;
+ }
+
+ /**
+ * return true if the limit has been exceeded
+ * @return true if a limit was set and the current time
+ * exceeds it.
+ */
+ public boolean getLimitExceeded() {
+ return limit >= 0 && ((now() - start) > limit);
+ }
+
+ @Override
+ public String toString() {
+ StringBuilder builder = new StringBuilder();
+ builder.append("Duration");
+ if (finish >= start) {
+ builder.append(" finished at ").append(getInterval()).append(" millis;");
+ } else {
+ if (start > 0) {
+ builder.append(" started but not yet finished;");
+ } else {
+ builder.append(" unstarted;");
+ }
+ }
+ if (limit > 0) {
+ builder.append(" limit: ").append(limit).append(" millis");
+ if (getLimitExceeded()) {
+ builder.append(" - exceeded");
+ }
+ }
+ return builder.toString();
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/727e6d78/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/hadoop/yarn/service/utils/JsonSerDeser.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/hadoop/yarn/service/utils/JsonSerDeser.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/hadoop/yarn/service/utils/JsonSerDeser.java
new file mode 100644
index 0000000..7b22e3e
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/hadoop/yarn/service/utils/JsonSerDeser.java
@@ -0,0 +1,249 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.service.utils;
+
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.IOUtils;
+import org.codehaus.jackson.JsonGenerationException;
+import org.codehaus.jackson.JsonParseException;
+import org.codehaus.jackson.map.DeserializationConfig;
+import org.codehaus.jackson.map.JsonMappingException;
+import org.codehaus.jackson.map.ObjectMapper;
+import org.codehaus.jackson.map.PropertyNamingStrategy;
+import org.codehaus.jackson.map.SerializationConfig;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.EOFException;
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+
+/**
+ * Support for marshalling objects to and from JSON.
+ * This class is NOT thread safe; it constructs an object mapper
+ * as an instance field.
+ * @param <T>
+ */
+public class JsonSerDeser<T> {
+
+ private static final Logger log = LoggerFactory.getLogger(JsonSerDeser.class);
+ private static final String UTF_8 = "UTF-8";
+
+ private final Class<T> classType;
+ private final ObjectMapper mapper;
+
+ /**
+ * Create an instance bound to a specific type
+ * @param classType class type
+ */
+ public JsonSerDeser(Class<T> classType) {
+ this.classType = classType;
+ this.mapper = new ObjectMapper();
+ mapper.configure(DeserializationConfig.Feature.FAIL_ON_UNKNOWN_PROPERTIES, false);
+ }
+
+ public JsonSerDeser(Class<T> classType, PropertyNamingStrategy namingStrategy) {
+ this(classType);
+ mapper.setPropertyNamingStrategy(namingStrategy);
+ }
+
+ /**
+ * Convert from JSON
+ * @param json input
+ * @return the parsed JSON
+ * @throws IOException IO
+ * @throws JsonMappingException failure to map from the JSON to this class
+ */
+ public T fromJson(String json)
+ throws IOException, JsonParseException, JsonMappingException {
+ try {
+ return mapper.readValue(json, classType);
+ } catch (IOException e) {
+ log.error("Exception while parsing json : " + e + "\n" + json, e);
+ throw e;
+ }
+ }
+
+ /**
+ * Convert from a JSON file
+ * @param jsonFile input file
+ * @return the parsed JSON
+ * @throws IOException IO problems
+ * @throws JsonMappingException failure to map from the JSON to this class
+ */
+ public T fromFile(File jsonFile)
+ throws IOException, JsonParseException, JsonMappingException {
+ File absoluteFile = jsonFile.getAbsoluteFile();
+ try {
+ return mapper.readValue(absoluteFile, classType);
+ } catch (IOException e) {
+ log.error("Exception while parsing json file {}", absoluteFile, e);
+ throw e;
+ }
+ }
+
+ /**
+ * Convert from a JSON file
+ * @param resource input file
+ * @return the parsed JSON
+ * @throws IOException IO problems
+ * @throws JsonMappingException failure to map from the JSON to this class
+ */
+ public T fromResource(String resource)
+ throws IOException, JsonParseException, JsonMappingException {
+ try(InputStream resStream = this.getClass().getResourceAsStream(resource)) {
+ if (resStream == null) {
+ throw new FileNotFoundException(resource);
+ }
+ return (T) (mapper.readValue(resStream, classType));
+ } catch (IOException e) {
+ log.error("Exception while parsing json resource {}", resource, e);
+ throw e;
+ }
+ }
+
+ /**
+ * Convert from an input stream, closing the stream afterwards.
+ * @param stream
+ * @return the parsed JSON
+ * @throws IOException IO problems
+ */
+ public T fromStream(InputStream stream) throws IOException {
+ try {
+ return (T) (mapper.readValue(stream, classType));
+ } catch (IOException e) {
+ log.error("Exception while parsing json input stream", e);
+ throw e;
+ } finally {
+ IOUtils.closeStream(stream);
+ }
+ }
+
+ /**
+ * clone by converting to JSON and back again.
+ * This is much less efficient than any Java clone process.
+ * @param instance instance to duplicate
+ * @return a new instance
+ * @throws IOException problems.
+ */
+ public T fromInstance(T instance) throws IOException {
+ return fromJson(toJson(instance));
+ }
+
+ /**
+ * Deserialize from a byte array
+ * @param b
+ * @return the deserialized value
+ * @throws IOException parse problems
+ */
+ public T fromBytes(byte[] b) throws IOException {
+ String json = new String(b, 0, b.length, UTF_8);
+ return fromJson(json);
+ }
+
+ /**
+ * Load from a Hadoop filesystem
+ * @param fs filesystem
+ * @param path path
+ * @return a loaded CD
+ * @throws IOException IO problems
+ * @throws JsonParseException parse problems
+ * @throws JsonMappingException O/J mapping problems
+ */
+ public T load(FileSystem fs, Path path)
+ throws IOException, JsonParseException, JsonMappingException {
+ FileStatus status = fs.getFileStatus(path);
+ long len = status.getLen();
+ byte[] b = new byte[(int) len];
+ FSDataInputStream dataInputStream = fs.open(path);
+ int count = dataInputStream.read(b);
+ if (count != len) {
+ throw new EOFException("Read of " + path +" finished prematurely");
+ }
+ return fromBytes(b);
+ }
+
+
+ /**
+ * Save to a hadoop filesystem
+ * @param fs filesystem
+ * @param path path
+ * @param instance instance to save
+ * @param overwrite should any existing file be overwritten
+ * @throws IOException IO exception
+ */
+ public void save(FileSystem fs, Path path, T instance,
+ boolean overwrite) throws
+ IOException {
+ FSDataOutputStream dataOutputStream = fs.create(path, overwrite);
+ writeJsonAsBytes(instance, dataOutputStream);
+ }
+
+ /**
+ * Save an instance to a file
+ * @param instance instance to save
+ * @param file file
+ * @throws IOException
+ */
+ public void save(T instance, File file) throws
+ IOException {
+ writeJsonAsBytes(instance, new FileOutputStream(file.getAbsoluteFile()));
+ }
+
+ /**
+ * Write the json as bytes -then close the file
+ * @param dataOutputStream an outout stream that will always be closed
+ * @throws IOException on any failure
+ */
+ private void writeJsonAsBytes(T instance,
+ OutputStream dataOutputStream) throws IOException {
+ try {
+ String json = toJson(instance);
+ byte[] b = json.getBytes(UTF_8);
+ dataOutputStream.write(b);
+ dataOutputStream.flush();
+ dataOutputStream.close();
+ } finally {
+ IOUtils.closeStream(dataOutputStream);
+ }
+ }
+
+ /**
+ * Convert an object to a JSON string
+ * @param instance instance to convert
+ * @return a JSON string description
+ * @throws JsonParseException parse problems
+ * @throws JsonMappingException O/J mapping problems
+ */
+ public String toJson(T instance) throws IOException,
+ JsonGenerationException,
+ JsonMappingException {
+ mapper.configure(SerializationConfig.Feature.INDENT_OUTPUT, true);
+ return mapper.writeValueAsString(instance);
+ }
+
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org