You are viewing a plain text version of this content. The canonical link for it is here.
Posted to yarn-commits@hadoop.apache.org by zj...@apache.org on 2014/05/28 20:09:05 UTC
svn commit: r1598094 [2/3] - in /hadoop/common/trunk/hadoop-yarn-project: ./
hadoop-yarn/hadoop-yarn-common/src/main/resources/
hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/appl...
Added: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/timeline/MemoryTimelineStore.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/timeline/MemoryTimelineStore.java?rev=1598094&view=auto
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/timeline/MemoryTimelineStore.java (added)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/timeline/MemoryTimelineStore.java Wed May 28 18:09:04 2014
@@ -0,0 +1,373 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.timeline;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.EnumSet;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.PriorityQueue;
+import java.util.Set;
+import java.util.SortedSet;
+import java.util.TreeSet;
+
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.classification.InterfaceStability.Unstable;
+import org.apache.hadoop.service.AbstractService;
+import org.apache.hadoop.yarn.api.records.timeline.TimelineEntities;
+import org.apache.hadoop.yarn.api.records.timeline.TimelineEntity;
+import org.apache.hadoop.yarn.api.records.timeline.TimelineEvent;
+import org.apache.hadoop.yarn.api.records.timeline.TimelineEvents;
+import org.apache.hadoop.yarn.api.records.timeline.TimelineEvents.EventsOfOneEntity;
+import org.apache.hadoop.yarn.api.records.timeline.TimelinePutResponse;
+import org.apache.hadoop.yarn.api.records.timeline.TimelinePutResponse.TimelinePutError;
+
+/**
+ * In-memory implementation of {@link TimelineStore}. This
+ * implementation is for test purpose only. If users improperly instantiate it,
+ * they may encounter reading and writing history data in different memory
+ * store.
+ *
+ */
+@Private
+@Unstable
+public class MemoryTimelineStore
+ extends AbstractService implements TimelineStore {
+
+ private Map<EntityIdentifier, TimelineEntity> entities =
+ new HashMap<EntityIdentifier, TimelineEntity>();
+ private Map<EntityIdentifier, Long> entityInsertTimes =
+ new HashMap<EntityIdentifier, Long>();
+
+ public MemoryTimelineStore() {
+ super(MemoryTimelineStore.class.getName());
+ }
+
+ @Override
+ public TimelineEntities getEntities(String entityType, Long limit,
+ Long windowStart, Long windowEnd, String fromId, Long fromTs,
+ NameValuePair primaryFilter, Collection<NameValuePair> secondaryFilters,
+ EnumSet<Field> fields) {
+ if (limit == null) {
+ limit = DEFAULT_LIMIT;
+ }
+ if (windowStart == null) {
+ windowStart = Long.MIN_VALUE;
+ }
+ if (windowEnd == null) {
+ windowEnd = Long.MAX_VALUE;
+ }
+ if (fields == null) {
+ fields = EnumSet.allOf(Field.class);
+ }
+
+ Iterator<TimelineEntity> entityIterator = null;
+ if (fromId != null) {
+ TimelineEntity firstEntity = entities.get(new EntityIdentifier(fromId,
+ entityType));
+ if (firstEntity == null) {
+ return new TimelineEntities();
+ } else {
+ entityIterator = new TreeSet<TimelineEntity>(entities.values())
+ .tailSet(firstEntity, true).iterator();
+ }
+ }
+ if (entityIterator == null) {
+ entityIterator = new PriorityQueue<TimelineEntity>(entities.values())
+ .iterator();
+ }
+
+ List<TimelineEntity> entitiesSelected = new ArrayList<TimelineEntity>();
+ while (entityIterator.hasNext()) {
+ TimelineEntity entity = entityIterator.next();
+ if (entitiesSelected.size() >= limit) {
+ break;
+ }
+ if (!entity.getEntityType().equals(entityType)) {
+ continue;
+ }
+ if (entity.getStartTime() <= windowStart) {
+ continue;
+ }
+ if (entity.getStartTime() > windowEnd) {
+ continue;
+ }
+ if (fromTs != null && entityInsertTimes.get(new EntityIdentifier(
+ entity.getEntityId(), entity.getEntityType())) > fromTs) {
+ continue;
+ }
+ if (primaryFilter != null &&
+ !matchPrimaryFilter(entity.getPrimaryFilters(), primaryFilter)) {
+ continue;
+ }
+ if (secondaryFilters != null) { // AND logic
+ boolean flag = true;
+ for (NameValuePair secondaryFilter : secondaryFilters) {
+ if (secondaryFilter != null && !matchPrimaryFilter(
+ entity.getPrimaryFilters(), secondaryFilter) &&
+ !matchFilter(entity.getOtherInfo(), secondaryFilter)) {
+ flag = false;
+ break;
+ }
+ }
+ if (!flag) {
+ continue;
+ }
+ }
+ entitiesSelected.add(entity);
+ }
+ List<TimelineEntity> entitiesToReturn = new ArrayList<TimelineEntity>();
+ for (TimelineEntity entitySelected : entitiesSelected) {
+ entitiesToReturn.add(maskFields(entitySelected, fields));
+ }
+ Collections.sort(entitiesToReturn);
+ TimelineEntities entitiesWrapper = new TimelineEntities();
+ entitiesWrapper.setEntities(entitiesToReturn);
+ return entitiesWrapper;
+ }
+
+ @Override
+ public TimelineEntity getEntity(String entityId, String entityType,
+ EnumSet<Field> fieldsToRetrieve) {
+ if (fieldsToRetrieve == null) {
+ fieldsToRetrieve = EnumSet.allOf(Field.class);
+ }
+ TimelineEntity entity = entities.get(new EntityIdentifier(entityId, entityType));
+ if (entity == null) {
+ return null;
+ } else {
+ return maskFields(entity, fieldsToRetrieve);
+ }
+ }
+
+ @Override
+ public TimelineEvents getEntityTimelines(String entityType,
+ SortedSet<String> entityIds, Long limit, Long windowStart,
+ Long windowEnd,
+ Set<String> eventTypes) {
+ TimelineEvents allEvents = new TimelineEvents();
+ if (entityIds == null) {
+ return allEvents;
+ }
+ if (limit == null) {
+ limit = DEFAULT_LIMIT;
+ }
+ if (windowStart == null) {
+ windowStart = Long.MIN_VALUE;
+ }
+ if (windowEnd == null) {
+ windowEnd = Long.MAX_VALUE;
+ }
+ for (String entityId : entityIds) {
+ EntityIdentifier entityID = new EntityIdentifier(entityId, entityType);
+ TimelineEntity entity = entities.get(entityID);
+ if (entity == null) {
+ continue;
+ }
+ EventsOfOneEntity events = new EventsOfOneEntity();
+ events.setEntityId(entityId);
+ events.setEntityType(entityType);
+ for (TimelineEvent event : entity.getEvents()) {
+ if (events.getEvents().size() >= limit) {
+ break;
+ }
+ if (event.getTimestamp() <= windowStart) {
+ continue;
+ }
+ if (event.getTimestamp() > windowEnd) {
+ continue;
+ }
+ if (eventTypes != null && !eventTypes.contains(event.getEventType())) {
+ continue;
+ }
+ events.addEvent(event);
+ }
+ allEvents.addEvent(events);
+ }
+ return allEvents;
+ }
+
+ @Override
+ public TimelinePutResponse put(TimelineEntities data) {
+ TimelinePutResponse response = new TimelinePutResponse();
+ for (TimelineEntity entity : data.getEntities()) {
+ EntityIdentifier entityId =
+ new EntityIdentifier(entity.getEntityId(), entity.getEntityType());
+ // store entity info in memory
+ TimelineEntity existingEntity = entities.get(entityId);
+ if (existingEntity == null) {
+ existingEntity = new TimelineEntity();
+ existingEntity.setEntityId(entity.getEntityId());
+ existingEntity.setEntityType(entity.getEntityType());
+ existingEntity.setStartTime(entity.getStartTime());
+ entities.put(entityId, existingEntity);
+ entityInsertTimes.put(entityId, System.currentTimeMillis());
+ }
+ if (entity.getEvents() != null) {
+ if (existingEntity.getEvents() == null) {
+ existingEntity.setEvents(entity.getEvents());
+ } else {
+ existingEntity.addEvents(entity.getEvents());
+ }
+ Collections.sort(existingEntity.getEvents());
+ }
+ // check startTime
+ if (existingEntity.getStartTime() == null) {
+ if (existingEntity.getEvents() == null
+ || existingEntity.getEvents().isEmpty()) {
+ TimelinePutError error = new TimelinePutError();
+ error.setEntityId(entityId.getId());
+ error.setEntityType(entityId.getType());
+ error.setErrorCode(TimelinePutError.NO_START_TIME);
+ response.addError(error);
+ entities.remove(entityId);
+ entityInsertTimes.remove(entityId);
+ continue;
+ } else {
+ Long min = Long.MAX_VALUE;
+ for (TimelineEvent e : entity.getEvents()) {
+ if (min > e.getTimestamp()) {
+ min = e.getTimestamp();
+ }
+ }
+ existingEntity.setStartTime(min);
+ }
+ }
+ if (entity.getPrimaryFilters() != null) {
+ if (existingEntity.getPrimaryFilters() == null) {
+ existingEntity.setPrimaryFilters(new HashMap<String, Set<Object>>());
+ }
+ for (Entry<String, Set<Object>> pf :
+ entity.getPrimaryFilters().entrySet()) {
+ for (Object pfo : pf.getValue()) {
+ existingEntity.addPrimaryFilter(pf.getKey(), maybeConvert(pfo));
+ }
+ }
+ }
+ if (entity.getOtherInfo() != null) {
+ if (existingEntity.getOtherInfo() == null) {
+ existingEntity.setOtherInfo(new HashMap<String, Object>());
+ }
+ for (Entry<String, Object> info : entity.getOtherInfo().entrySet()) {
+ existingEntity.addOtherInfo(info.getKey(),
+ maybeConvert(info.getValue()));
+ }
+ }
+ // relate it to other entities
+ if (entity.getRelatedEntities() == null) {
+ continue;
+ }
+ for (Map.Entry<String, Set<String>> partRelatedEntities : entity
+ .getRelatedEntities().entrySet()) {
+ if (partRelatedEntities == null) {
+ continue;
+ }
+ for (String idStr : partRelatedEntities.getValue()) {
+ EntityIdentifier relatedEntityId =
+ new EntityIdentifier(idStr, partRelatedEntities.getKey());
+ TimelineEntity relatedEntity = entities.get(relatedEntityId);
+ if (relatedEntity != null) {
+ relatedEntity.addRelatedEntity(
+ existingEntity.getEntityType(), existingEntity.getEntityId());
+ } else {
+ relatedEntity = new TimelineEntity();
+ relatedEntity.setEntityId(relatedEntityId.getId());
+ relatedEntity.setEntityType(relatedEntityId.getType());
+ relatedEntity.setStartTime(existingEntity.getStartTime());
+ relatedEntity.addRelatedEntity(existingEntity.getEntityType(),
+ existingEntity.getEntityId());
+ entities.put(relatedEntityId, relatedEntity);
+ entityInsertTimes.put(relatedEntityId, System.currentTimeMillis());
+ }
+ }
+ }
+ }
+ return response;
+ }
+
+ private static TimelineEntity maskFields(
+ TimelineEntity entity, EnumSet<Field> fields) {
+ // Conceal the fields that are not going to be exposed
+ TimelineEntity entityToReturn = new TimelineEntity();
+ entityToReturn.setEntityId(entity.getEntityId());
+ entityToReturn.setEntityType(entity.getEntityType());
+ entityToReturn.setStartTime(entity.getStartTime());
+ // Deep copy
+ if (fields.contains(Field.EVENTS)) {
+ entityToReturn.addEvents(entity.getEvents());
+ } else if (fields.contains(Field.LAST_EVENT_ONLY)) {
+ entityToReturn.addEvent(entity.getEvents().get(0));
+ } else {
+ entityToReturn.setEvents(null);
+ }
+ if (fields.contains(Field.RELATED_ENTITIES)) {
+ entityToReturn.addRelatedEntities(entity.getRelatedEntities());
+ } else {
+ entityToReturn.setRelatedEntities(null);
+ }
+ if (fields.contains(Field.PRIMARY_FILTERS)) {
+ entityToReturn.addPrimaryFilters(entity.getPrimaryFilters());
+ } else {
+ entityToReturn.setPrimaryFilters(null);
+ }
+ if (fields.contains(Field.OTHER_INFO)) {
+ entityToReturn.addOtherInfo(entity.getOtherInfo());
+ } else {
+ entityToReturn.setOtherInfo(null);
+ }
+ return entityToReturn;
+ }
+
+ private static boolean matchFilter(Map<String, Object> tags,
+ NameValuePair filter) {
+ Object value = tags.get(filter.getName());
+ if (value == null) { // doesn't have the filter
+ return false;
+ } else if (!value.equals(filter.getValue())) { // doesn't match the filter
+ return false;
+ }
+ return true;
+ }
+
+ private static boolean matchPrimaryFilter(Map<String, Set<Object>> tags,
+ NameValuePair filter) {
+ Set<Object> value = tags.get(filter.getName());
+ if (value == null) { // doesn't have the filter
+ return false;
+ } else {
+ return value.contains(filter.getValue());
+ }
+ }
+
+ private static Object maybeConvert(Object o) {
+ if (o instanceof Long) {
+ Long l = (Long)o;
+ if (l >= Integer.MIN_VALUE && l <= Integer.MAX_VALUE) {
+ return l.intValue();
+ }
+ }
+ return o;
+ }
+
+}
Added: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/timeline/NameValuePair.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/timeline/NameValuePair.java?rev=1598094&view=auto
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/timeline/NameValuePair.java (added)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/timeline/NameValuePair.java Wed May 28 18:09:04 2014
@@ -0,0 +1,59 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.timeline;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+
+/**
+ * A class holding a name and value pair, used for specifying filters in
+ * {@link TimelineReader}.
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Unstable
+public class NameValuePair {
+ String name;
+ Object value;
+
+ public NameValuePair(String name, Object value) {
+ this.name = name;
+ this.value = value;
+ }
+
+ /**
+ * Get the name.
+ * @return The name.
+ */
+ public String getName() {
+
+ return name;
+ }
+
+ /**
+ * Get the value.
+ * @return The value.
+ */
+ public Object getValue() {
+ return value;
+ }
+
+ @Override
+ public String toString() {
+ return "{ name: " + name + ", value: " + value + " }";
+ }
+}
Added: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/timeline/TimelineReader.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/timeline/TimelineReader.java?rev=1598094&view=auto
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/timeline/TimelineReader.java (added)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/timeline/TimelineReader.java Wed May 28 18:09:04 2014
@@ -0,0 +1,155 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.timeline;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.EnumSet;
+import java.util.Set;
+import java.util.SortedSet;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.yarn.api.records.timeline.TimelineEntities;
+import org.apache.hadoop.yarn.api.records.timeline.TimelineEntity;
+import org.apache.hadoop.yarn.api.records.timeline.TimelineEvents;
+
+/**
+ * This interface is for retrieving timeline information.
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Unstable
+public interface TimelineReader {
+
+ /**
+ * Possible fields to retrieve for {@link #getEntities} and {@link #getEntity}
+ * .
+ */
+ enum Field {
+ EVENTS,
+ RELATED_ENTITIES,
+ PRIMARY_FILTERS,
+ OTHER_INFO,
+ LAST_EVENT_ONLY
+ }
+
+ /**
+ * Default limit for {@link #getEntities} and {@link #getEntityTimelines}.
+ */
+ final long DEFAULT_LIMIT = 100;
+
+ /**
+ * This method retrieves a list of entity information, {@link TimelineEntity},
+ * sorted by the starting timestamp for the entity, descending. The starting
+ * timestamp of an entity is a timestamp specified by the client. If it is not
+ * explicitly specified, it will be chosen by the store to be the earliest
+ * timestamp of the events received in the first put for the entity.
+ *
+ * @param entityType
+ * The type of entities to return (required).
+ * @param limit
+ * A limit on the number of entities to return. If null, defaults to
+ * {@link #DEFAULT_LIMIT}.
+ * @param windowStart
+ * The earliest start timestamp to retrieve (exclusive). If null,
+ * defaults to retrieving all entities until the limit is reached.
+ * @param windowEnd
+ * The latest start timestamp to retrieve (inclusive). If null,
+ * defaults to {@link Long#MAX_VALUE}
+ * @param fromId
+ * If fromId is not null, retrieve entities earlier than and
+ * including the specified ID. If no start time is found for the
+ * specified ID, an empty list of entities will be returned. The
+ * windowEnd parameter will take precedence if the start time of this
+ * entity falls later than windowEnd.
+ * @param fromTs
+ * If fromTs is not null, ignore entities that were inserted into the
+ * store after the given timestamp. The entity's insert timestamp
+ * used for this comparison is the store's system time when the first
+ * put for the entity was received (not the entity's start time).
+ * @param primaryFilter
+ * Retrieves only entities that have the specified primary filter. If
+ * null, retrieves all entities. This is an indexed retrieval, and no
+ * entities that do not match the filter are scanned.
+ * @param secondaryFilters
+ * Retrieves only entities that have exact matches for all the
+ * specified filters in their primary filters or other info. This is
+ * not an indexed retrieval, so all entities are scanned but only
+ * those matching the filters are returned.
+ * @param fieldsToRetrieve
+ * Specifies which fields of the entity object to retrieve (see
+ * {@link Field}). If the set of fields contains
+ * {@link Field#LAST_EVENT_ONLY} and not {@link Field#EVENTS}, the
+ * most recent event for each entity is retrieved. If null, retrieves
+ * all fields.
+ * @return An {@link TimelineEntities} object.
+ * @throws IOException
+ */
+ TimelineEntities getEntities(String entityType,
+ Long limit, Long windowStart, Long windowEnd, String fromId, Long fromTs,
+ NameValuePair primaryFilter, Collection<NameValuePair> secondaryFilters,
+ EnumSet<Field> fieldsToRetrieve) throws IOException;
+
+ /**
+ * This method retrieves the entity information for a given entity.
+ *
+ * @param entityId
+ * The entity whose information will be retrieved.
+ * @param entityType
+ * The type of the entity.
+ * @param fieldsToRetrieve
+ * Specifies which fields of the entity object to retrieve (see
+ * {@link Field}). If the set of fields contains
+ * {@link Field#LAST_EVENT_ONLY} and not {@link Field#EVENTS}, the
+ * most recent event for each entity is retrieved. If null, retrieves
+ * all fields.
+ * @return An {@link TimelineEntity} object.
+ * @throws IOException
+ */
+ TimelineEntity getEntity(String entityId, String entityType, EnumSet<Field>
+ fieldsToRetrieve) throws IOException;
+
+ /**
+ * This method retrieves the events for a list of entities all of the same
+ * entity type. The events for each entity are sorted in order of their
+ * timestamps, descending.
+ *
+ * @param entityType
+ * The type of entities to retrieve events for.
+ * @param entityIds
+ * The entity IDs to retrieve events for.
+ * @param limit
+ * A limit on the number of events to return for each entity. If
+ * null, defaults to {@link #DEFAULT_LIMIT} events per entity.
+ * @param windowStart
+ * If not null, retrieves only events later than the given time
+ * (exclusive)
+ * @param windowEnd
+ * If not null, retrieves only events earlier than the given time
+ * (inclusive)
+ * @param eventTypes
+ * Restricts the events returned to the given types. If null, events
+ * of all types will be returned.
+ * @return An {@link TimelineEvents} object.
+ * @throws IOException
+ */
+ TimelineEvents getEntityTimelines(String entityType,
+ SortedSet<String> entityIds, Long limit, Long windowStart,
+ Long windowEnd, Set<String> eventTypes) throws IOException;
+}
Added: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/timeline/TimelineStore.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/timeline/TimelineStore.java?rev=1598094&view=auto
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/timeline/TimelineStore.java (added)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/timeline/TimelineStore.java Wed May 28 18:09:04 2014
@@ -0,0 +1,42 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.timeline;
+
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.classification.InterfaceStability.Unstable;
+import org.apache.hadoop.service.Service;
+import org.apache.hadoop.yarn.api.records.timeline.TimelineEntity;
+
+@Private
+@Unstable
+public interface TimelineStore extends
+ Service, TimelineReader, TimelineWriter {
+
+ /**
+ * The system filter which will be automatically added to a
+ * {@link TimelineEntity}'s primary filter section when storing the entity.
+ * The filter key is case sensitive. Users are supposed not to use the key
+ * reserved by the timeline system.
+ */
+ @Private
+ enum SystemFilter {
+ ENTITY_OWNER
+ }
+
+}
Added: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/timeline/TimelineWriter.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/timeline/TimelineWriter.java?rev=1598094&view=auto
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/timeline/TimelineWriter.java (added)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/timeline/TimelineWriter.java Wed May 28 18:09:04 2014
@@ -0,0 +1,46 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.timeline;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.yarn.api.records.timeline.TimelineEntities;
+import org.apache.hadoop.yarn.api.records.timeline.TimelinePutResponse;
+
+import java.io.IOException;
+
+/**
+ * This interface is for storing timeline information.
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Unstable
+public interface TimelineWriter {
+
+ /**
+ * Stores entity information to the timeline store. Any errors occurring for
+ * individual put request objects will be reported in the response.
+ *
+ * @param data
+ * An {@link TimelineEntities} object.
+ * @return An {@link TimelinePutResponse} object.
+ * @throws IOException
+ */
+ TimelinePutResponse put(TimelineEntities data) throws IOException;
+
+}
Added: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/timeline/package-info.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/timeline/package-info.java?rev=1598094&view=auto
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/timeline/package-info.java (added)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/timeline/package-info.java Wed May 28 18:09:04 2014
@@ -0,0 +1,20 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+@InterfaceAudience.Private
+package org.apache.hadoop.yarn.server.timeline;
+import org.apache.hadoop.classification.InterfaceAudience;
Added: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/timeline/security/TimelineACLsManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/timeline/security/TimelineACLsManager.java?rev=1598094&view=auto
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/timeline/security/TimelineACLsManager.java (added)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/timeline/security/TimelineACLsManager.java Wed May 28 18:09:04 2014
@@ -0,0 +1,92 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.timeline.security;
+
+import java.io.IOException;
+import java.util.Set;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.yarn.api.records.timeline.TimelineEntity;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.security.AdminACLsManager;
+import org.apache.hadoop.yarn.server.timeline.EntityIdentifier;
+import org.apache.hadoop.yarn.server.timeline.TimelineStore.SystemFilter;
+
+import com.google.common.annotations.VisibleForTesting;
+
+/**
+ * <code>TimelineACLsManager</code> check the entity level timeline data access.
+ */
+@Private
+public class TimelineACLsManager {
+
+ private static final Log LOG = LogFactory.getLog(TimelineACLsManager.class);
+
+ private AdminACLsManager adminAclsManager;
+
+ public TimelineACLsManager(Configuration conf) {
+ this.adminAclsManager = new AdminACLsManager(conf);
+ }
+
+ public boolean checkAccess(UserGroupInformation callerUGI,
+ TimelineEntity entity) throws YarnException, IOException {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Verifying the access of " + callerUGI.getShortUserName()
+ + " on the timeline entity "
+ + new EntityIdentifier(entity.getEntityId(), entity.getEntityType()));
+ }
+
+ if (!adminAclsManager.areACLsEnabled()) {
+ return true;
+ }
+
+ Set<Object> values =
+ entity.getPrimaryFilters().get(
+ SystemFilter.ENTITY_OWNER.toString());
+ if (values == null || values.size() != 1) {
+ throw new YarnException("Owner information of the timeline entity "
+ + new EntityIdentifier(entity.getEntityId(), entity.getEntityType())
+ + " is corrupted.");
+ }
+ String owner = values.iterator().next().toString();
+ // TODO: Currently we just check the user is the admin or the timeline
+ // entity owner. In the future, we need to check whether the user is in the
+ // allowed user/group list
+ if (callerUGI != null
+ && (adminAclsManager.isAdmin(callerUGI) ||
+ callerUGI.getShortUserName().equals(owner))) {
+ return true;
+ }
+ return false;
+ }
+
+ @Private
+ @VisibleForTesting
+ public AdminACLsManager
+ setAdminACLsManager(AdminACLsManager adminAclsManager) {
+ AdminACLsManager oldAdminACLsManager = this.adminAclsManager;
+ this.adminAclsManager = adminAclsManager;
+ return oldAdminACLsManager;
+ }
+
+}
Added: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/timeline/security/TimelineAuthenticationFilter.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/timeline/security/TimelineAuthenticationFilter.java?rev=1598094&view=auto
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/timeline/security/TimelineAuthenticationFilter.java (added)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/timeline/security/TimelineAuthenticationFilter.java Wed May 28 18:09:04 2014
@@ -0,0 +1,48 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.timeline.security;
+
+import java.util.Properties;
+
+import javax.servlet.FilterConfig;
+import javax.servlet.ServletException;
+
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.classification.InterfaceStability.Unstable;
+import org.apache.hadoop.security.authentication.server.AuthenticationFilter;
+
+@Private
+@Unstable
+public class TimelineAuthenticationFilter extends AuthenticationFilter {
+
+ @Override
+ protected Properties getConfiguration(String configPrefix,
+ FilterConfig filterConfig) throws ServletException {
+ // In yarn-site.xml, we can simply set type to "kerberos". However, we need
+ // to replace the name here to use the customized Kerberos + DT service
+ // instead of the standard Kerberos handler.
+ Properties properties = super.getConfiguration(configPrefix, filterConfig);
+ if (properties.getProperty(AUTH_TYPE).equals("kerberos")) {
+ properties.setProperty(
+ AUTH_TYPE, TimelineClientAuthenticationService.class.getName());
+ }
+ return properties;
+ }
+
+}
Added: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/timeline/security/TimelineAuthenticationFilterInitializer.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/timeline/security/TimelineAuthenticationFilterInitializer.java?rev=1598094&view=auto
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/timeline/security/TimelineAuthenticationFilterInitializer.java (added)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/timeline/security/TimelineAuthenticationFilterInitializer.java Wed May 28 18:09:04 2014
@@ -0,0 +1,127 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.timeline.security;
+
+import java.io.FileReader;
+import java.io.IOException;
+import java.io.Reader;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.http.FilterContainer;
+import org.apache.hadoop.http.FilterInitializer;
+import org.apache.hadoop.http.HttpServer2;
+import org.apache.hadoop.security.SecurityUtil;
+
+/**
+ * <p>
+ * Initializes {@link TimelineAuthenticationFilter} which provides support for
+ * Kerberos HTTP SPNEGO authentication.
+ * <p/>
+ * <p>
+ * It enables Kerberos HTTP SPNEGO plus delegation token authentication for the
+ * timeline server.
+ * <p/>
+ * Refer to the <code>core-default.xml</code> file, after the comment 'HTTP
+ * Authentication' for details on the configuration options. All related
+ * configuration properties have 'hadoop.http.authentication.' as prefix.
+ */
+public class TimelineAuthenticationFilterInitializer extends FilterInitializer {
+
+ /**
+ * The configuration prefix of timeline Kerberos + DT authentication
+ */
+ public static final String PREFIX = "yarn.timeline-service.http.authentication.";
+
+ private static final String SIGNATURE_SECRET_FILE =
+ TimelineAuthenticationFilter.SIGNATURE_SECRET + ".file";
+
+ /**
+ * <p>
+ * Initializes {@link TimelineAuthenticationFilter}
+ * <p/>
+ * <p>
+ * Propagates to {@link TimelineAuthenticationFilter} configuration all YARN
+ * configuration properties prefixed with
+ * "yarn.timeline-service.authentication."
+ * </p>
+ *
+ * @param container
+ * The filter container
+ * @param conf
+ * Configuration for run-time parameters
+ */
+ @Override
+ public void initFilter(FilterContainer container, Configuration conf) {
+ Map<String, String> filterConfig = new HashMap<String, String>();
+
+ // setting the cookie path to root '/' so it is used for all resources.
+ filterConfig.put(TimelineAuthenticationFilter.COOKIE_PATH, "/");
+
+ for (Map.Entry<String, String> entry : conf) {
+ String name = entry.getKey();
+ if (name.startsWith(PREFIX)) {
+ String value = conf.get(name);
+ name = name.substring(PREFIX.length());
+ filterConfig.put(name, value);
+ }
+ }
+
+ String signatureSecretFile = filterConfig.get(SIGNATURE_SECRET_FILE);
+ if (signatureSecretFile != null) {
+ try {
+ StringBuilder secret = new StringBuilder();
+ Reader reader = new FileReader(signatureSecretFile);
+ int c = reader.read();
+ while (c > -1) {
+ secret.append((char) c);
+ c = reader.read();
+ }
+ reader.close();
+ filterConfig
+ .put(TimelineAuthenticationFilter.SIGNATURE_SECRET,
+ secret.toString());
+ } catch (IOException ex) {
+ throw new RuntimeException(
+ "Could not read HTTP signature secret file: "
+ + signatureSecretFile);
+ }
+ }
+
+ // Resolve _HOST into bind address
+ String bindAddress = conf.get(HttpServer2.BIND_ADDRESS);
+ String principal =
+ filterConfig.get(TimelineClientAuthenticationService.PRINCIPAL);
+ if (principal != null) {
+ try {
+ principal = SecurityUtil.getServerPrincipal(principal, bindAddress);
+ } catch (IOException ex) {
+ throw new RuntimeException(
+ "Could not resolve Kerberos principal name: " + ex.toString(), ex);
+ }
+ filterConfig.put(TimelineClientAuthenticationService.PRINCIPAL,
+ principal);
+ }
+
+ container.addGlobalFilter("Timeline Authentication Filter",
+ TimelineAuthenticationFilter.class.getName(),
+ filterConfig);
+ }
+}
Added: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/timeline/security/TimelineClientAuthenticationService.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/timeline/security/TimelineClientAuthenticationService.java?rev=1598094&view=auto
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/timeline/security/TimelineClientAuthenticationService.java (added)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/timeline/security/TimelineClientAuthenticationService.java Wed May 28 18:09:04 2014
@@ -0,0 +1,236 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.timeline.security;
+
+import java.io.IOException;
+import java.io.Writer;
+import java.text.MessageFormat;
+import java.util.HashSet;
+import java.util.Set;
+
+import javax.servlet.http.HttpServletRequest;
+import javax.servlet.http.HttpServletResponse;
+import javax.ws.rs.core.MediaType;
+
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.classification.InterfaceStability.Unstable;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.authentication.client.AuthenticationException;
+import org.apache.hadoop.security.authentication.server.AuthenticationToken;
+import org.apache.hadoop.security.authentication.server.KerberosAuthenticationHandler;
+import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.yarn.api.records.timeline.TimelineDelegationTokenResponse;
+import org.apache.hadoop.yarn.security.client.TimelineDelegationTokenIdentifier;
+import org.apache.hadoop.yarn.security.client.TimelineDelegationTokenOperation;
+import org.apache.hadoop.yarn.security.client.TimelineAuthenticationConsts;
+import org.apache.hadoop.yarn.server.applicationhistoryservice.webapp.AHSWebApp;
+import org.apache.hadoop.yarn.webapp.YarnJacksonJaxbJsonProvider;
+import org.codehaus.jackson.map.ObjectMapper;
+
+/**
+ * Server side <code>AuthenticationHandler</code> that authenticates requests
+ * using the incoming delegation token as a 'delegation' query string parameter.
+ * <p/>
+ * If not delegation token is present in the request it delegates to the
+ * {@link KerberosAuthenticationHandler}
+ */
+@Private
+@Unstable
+public class TimelineClientAuthenticationService
+ extends KerberosAuthenticationHandler {
+
+ public static final String TYPE = "kerberos-dt";
+ private static final Set<String> DELEGATION_TOKEN_OPS = new HashSet<String>();
+ private static final String OP_PARAM = "op";
+ private static final String ENTER = System.getProperty("line.separator");
+
+ private ObjectMapper mapper;
+
+ static {
+ DELEGATION_TOKEN_OPS.add(
+ TimelineDelegationTokenOperation.GETDELEGATIONTOKEN.toString());
+ DELEGATION_TOKEN_OPS.add(
+ TimelineDelegationTokenOperation.RENEWDELEGATIONTOKEN.toString());
+ DELEGATION_TOKEN_OPS.add(
+ TimelineDelegationTokenOperation.CANCELDELEGATIONTOKEN.toString());
+ }
+
+ public TimelineClientAuthenticationService() {
+ super();
+ mapper = new ObjectMapper();
+ YarnJacksonJaxbJsonProvider.configObjectMapper(mapper);
+ }
+
+ /**
+ * Returns authentication type of the handler.
+ *
+ * @return <code>delegationtoken-kerberos</code>
+ */
+ @Override
+ public String getType() {
+ return TYPE;
+ }
+
+ @Override
+ public boolean managementOperation(AuthenticationToken token,
+ HttpServletRequest request, HttpServletResponse response)
+ throws IOException, AuthenticationException {
+ boolean requestContinues = true;
+ String op = request.getParameter(OP_PARAM);
+ op = (op != null) ? op.toUpperCase() : null;
+ if (DELEGATION_TOKEN_OPS.contains(op) &&
+ !request.getMethod().equals("OPTIONS")) {
+ TimelineDelegationTokenOperation dtOp =
+ TimelineDelegationTokenOperation.valueOf(op);
+ if (dtOp.getHttpMethod().equals(request.getMethod())) {
+ if (dtOp.requiresKerberosCredentials() && token == null) {
+ response.sendError(HttpServletResponse.SC_UNAUTHORIZED,
+ MessageFormat.format(
+ "Operation [{0}] requires SPNEGO authentication established",
+ dtOp));
+ requestContinues = false;
+ } else {
+ TimelineDelegationTokenSecretManagerService secretManager =
+ AHSWebApp.getInstance()
+ .getTimelineDelegationTokenSecretManagerService();
+ try {
+ TimelineDelegationTokenResponse res = null;
+ switch (dtOp) {
+ case GETDELEGATIONTOKEN:
+ UserGroupInformation ownerUGI =
+ UserGroupInformation.createRemoteUser(token.getUserName());
+ String renewerParam =
+ request
+ .getParameter(TimelineAuthenticationConsts.RENEWER_PARAM);
+ if (renewerParam == null) {
+ renewerParam = token.getUserName();
+ }
+ Token<?> dToken =
+ secretManager.createToken(ownerUGI, renewerParam);
+ res = new TimelineDelegationTokenResponse();
+ res.setType(TimelineAuthenticationConsts.DELEGATION_TOKEN_URL);
+ res.setContent(dToken.encodeToUrlString());
+ break;
+ case RENEWDELEGATIONTOKEN:
+ case CANCELDELEGATIONTOKEN:
+ String tokenParam =
+ request
+ .getParameter(TimelineAuthenticationConsts.TOKEN_PARAM);
+ if (tokenParam == null) {
+ response.sendError(HttpServletResponse.SC_BAD_REQUEST,
+ MessageFormat
+ .format(
+ "Operation [{0}] requires the parameter [{1}]",
+ dtOp,
+ TimelineAuthenticationConsts.TOKEN_PARAM));
+ requestContinues = false;
+ } else {
+ if (dtOp == TimelineDelegationTokenOperation.CANCELDELEGATIONTOKEN) {
+ Token<TimelineDelegationTokenIdentifier> dt =
+ new Token<TimelineDelegationTokenIdentifier>();
+ dt.decodeFromUrlString(tokenParam);
+ secretManager.cancelToken(dt, token.getUserName());
+ } else {
+ Token<TimelineDelegationTokenIdentifier> dt =
+ new Token<TimelineDelegationTokenIdentifier>();
+ dt.decodeFromUrlString(tokenParam);
+ long expirationTime =
+ secretManager.renewToken(dt, token.getUserName());
+ res = new TimelineDelegationTokenResponse();
+ res.setType(TimelineAuthenticationConsts.DELEGATION_TOKEN_EXPIRATION_TIME);
+ res.setContent(expirationTime);
+ }
+ }
+ break;
+ }
+ if (requestContinues) {
+ response.setStatus(HttpServletResponse.SC_OK);
+ if (res != null) {
+ response.setContentType(MediaType.APPLICATION_JSON);
+ Writer writer = response.getWriter();
+ mapper.writeValue(writer, res);
+ writer.write(ENTER);
+ writer.flush();
+
+ }
+ requestContinues = false;
+ }
+ } catch (IOException e) {
+ throw new AuthenticationException(e.toString(), e);
+ }
+ }
+ } else {
+ response
+ .sendError(
+ HttpServletResponse.SC_BAD_REQUEST,
+ MessageFormat
+ .format(
+ "Wrong HTTP method [{0}] for operation [{1}], it should be [{2}]",
+ request.getMethod(), dtOp, dtOp.getHttpMethod()));
+ requestContinues = false;
+ }
+ }
+ return requestContinues;
+ }
+
+ /**
+ * Authenticates a request looking for the <code>delegation</code>
+ * query-string parameter and verifying it is a valid token. If there is not
+ * <code>delegation</code> query-string parameter, it delegates the
+ * authentication to the {@link KerberosAuthenticationHandler} unless it is
+ * disabled.
+ *
+ * @param request
+ * the HTTP client request.
+ * @param response
+ * the HTTP client response.
+ *
+ * @return the authentication token for the authenticated request.
+ * @throws IOException
+ * thrown if an IO error occurred.
+ * @throws AuthenticationException
+ * thrown if the authentication failed.
+ */
+ @Override
+ public AuthenticationToken authenticate(HttpServletRequest request,
+ HttpServletResponse response)
+ throws IOException, AuthenticationException {
+ AuthenticationToken token;
+ String delegationParam =
+ request
+ .getParameter(TimelineAuthenticationConsts.DELEGATION_PARAM);
+ if (delegationParam != null) {
+ Token<TimelineDelegationTokenIdentifier> dt =
+ new Token<TimelineDelegationTokenIdentifier>();
+ dt.decodeFromUrlString(delegationParam);
+ TimelineDelegationTokenSecretManagerService secretManager =
+ AHSWebApp.getInstance()
+ .getTimelineDelegationTokenSecretManagerService();
+ UserGroupInformation ugi = secretManager.verifyToken(dt);
+ final String shortName = ugi.getShortUserName();
+ // creating a ephemeral token
+ token = new AuthenticationToken(shortName, ugi.getUserName(), getType());
+ token.setExpires(0);
+ } else {
+ token = super.authenticate(request, response);
+ }
+ return token;
+ }
+
+}
Added: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/timeline/security/TimelineDelegationTokenSecretManagerService.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/timeline/security/TimelineDelegationTokenSecretManagerService.java?rev=1598094&view=auto
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/timeline/security/TimelineDelegationTokenSecretManagerService.java (added)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/timeline/security/TimelineDelegationTokenSecretManagerService.java Wed May 28 18:09:04 2014
@@ -0,0 +1,180 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.timeline.security;
+
+import java.io.ByteArrayInputStream;
+import java.io.DataInputStream;
+import java.io.IOException;
+import java.net.InetSocketAddress;
+
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.classification.InterfaceStability.Unstable;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.security.SecurityUtil;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.security.token.delegation.AbstractDelegationTokenSecretManager;
+import org.apache.hadoop.service.AbstractService;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.security.client.TimelineDelegationTokenIdentifier;
+import org.apache.hadoop.yarn.util.timeline.TimelineUtils;
+
+/**
+ * The service wrapper of {@link TimelineDelegationTokenSecretManager}
+ */
+@Private
+@Unstable
+public class TimelineDelegationTokenSecretManagerService extends AbstractService {
+
+ private TimelineDelegationTokenSecretManager secretManager = null;
+ private InetSocketAddress serviceAddr = null;
+
+ public TimelineDelegationTokenSecretManagerService() {
+ super(TimelineDelegationTokenSecretManagerService.class.getName());
+ }
+
+ @Override
+ protected void serviceInit(Configuration conf) throws Exception {
+ long secretKeyInterval =
+ conf.getLong(YarnConfiguration.DELEGATION_KEY_UPDATE_INTERVAL_KEY,
+ YarnConfiguration.DELEGATION_KEY_UPDATE_INTERVAL_DEFAULT);
+ long tokenMaxLifetime =
+ conf.getLong(YarnConfiguration.DELEGATION_TOKEN_MAX_LIFETIME_KEY,
+ YarnConfiguration.DELEGATION_TOKEN_MAX_LIFETIME_DEFAULT);
+ long tokenRenewInterval =
+ conf.getLong(YarnConfiguration.DELEGATION_TOKEN_RENEW_INTERVAL_KEY,
+ YarnConfiguration.DELEGATION_TOKEN_RENEW_INTERVAL_DEFAULT);
+ secretManager = new TimelineDelegationTokenSecretManager(secretKeyInterval,
+ tokenMaxLifetime, tokenRenewInterval,
+ 3600000);
+ secretManager.startThreads();
+
+ serviceAddr = TimelineUtils.getTimelineTokenServiceAddress(getConfig());
+ super.init(conf);
+ }
+
+ @Override
+ protected void serviceStop() throws Exception {
+ secretManager.stopThreads();
+ super.stop();
+ }
+
+ /**
+ * Creates a delegation token.
+ *
+ * @param ugi UGI creating the token.
+ * @param renewer token renewer.
+ * @return new delegation token.
+ * @throws IOException thrown if the token could not be created.
+ */
+ public Token<TimelineDelegationTokenIdentifier> createToken(
+ UserGroupInformation ugi, String renewer) throws IOException {
+ renewer = (renewer == null) ? ugi.getShortUserName() : renewer;
+ String user = ugi.getUserName();
+ Text owner = new Text(user);
+ Text realUser = null;
+ if (ugi.getRealUser() != null) {
+ realUser = new Text(ugi.getRealUser().getUserName());
+ }
+ TimelineDelegationTokenIdentifier tokenIdentifier =
+ new TimelineDelegationTokenIdentifier(owner, new Text(renewer), realUser);
+ Token<TimelineDelegationTokenIdentifier> token =
+ new Token<TimelineDelegationTokenIdentifier>(tokenIdentifier, secretManager);
+ SecurityUtil.setTokenService(token, serviceAddr);
+ return token;
+ }
+
+ /**
+ * Renews a delegation token.
+ *
+ * @param token delegation token to renew.
+ * @param renewer token renewer.
+ * @throws IOException thrown if the token could not be renewed.
+ */
+ public long renewToken(Token<TimelineDelegationTokenIdentifier> token,
+ String renewer) throws IOException {
+ return secretManager.renewToken(token, renewer);
+ }
+
+ /**
+ * Cancels a delegation token.
+ *
+ * @param token delegation token to cancel.
+ * @param canceler token canceler.
+ * @throws IOException thrown if the token could not be canceled.
+ */
+ public void cancelToken(Token<TimelineDelegationTokenIdentifier> token,
+ String canceler) throws IOException {
+ secretManager.cancelToken(token, canceler);
+ }
+
+ /**
+ * Verifies a delegation token.
+ *
+ * @param token delegation token to verify.
+ * @return the UGI for the token.
+ * @throws IOException thrown if the token could not be verified.
+ */
+ public UserGroupInformation verifyToken(Token<TimelineDelegationTokenIdentifier> token)
+ throws IOException {
+ ByteArrayInputStream buf = new ByteArrayInputStream(token.getIdentifier());
+ DataInputStream dis = new DataInputStream(buf);
+ TimelineDelegationTokenIdentifier id = new TimelineDelegationTokenIdentifier();
+ try {
+ id.readFields(dis);
+ secretManager.verifyToken(id, token.getPassword());
+ } finally {
+ dis.close();
+ }
+ return id.getUser();
+ }
+
+ /**
+ * Create a timeline secret manager
+ *
+ * @param delegationKeyUpdateInterval
+ * the number of seconds for rolling new secret keys.
+ * @param delegationTokenMaxLifetime
+ * the maximum lifetime of the delegation tokens
+ * @param delegationTokenRenewInterval
+ * how often the tokens must be renewed
+ * @param delegationTokenRemoverScanInterval
+ * how often the tokens are scanned for expired tokens
+ */
+ @Private
+ @Unstable
+ public static class TimelineDelegationTokenSecretManager extends
+ AbstractDelegationTokenSecretManager<TimelineDelegationTokenIdentifier> {
+
+ public TimelineDelegationTokenSecretManager(long delegationKeyUpdateInterval,
+ long delegationTokenMaxLifetime, long delegationTokenRenewInterval,
+ long delegationTokenRemoverScanInterval) {
+ super(delegationKeyUpdateInterval, delegationTokenMaxLifetime,
+ delegationTokenRenewInterval, delegationTokenRemoverScanInterval);
+ }
+
+ @Override
+ public TimelineDelegationTokenIdentifier createIdentifier() {
+ return new TimelineDelegationTokenIdentifier();
+ }
+
+ }
+
+}
Added: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/timeline/webapp/TimelineWebServices.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/timeline/webapp/TimelineWebServices.java?rev=1598094&view=auto
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/timeline/webapp/TimelineWebServices.java (added)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/timeline/webapp/TimelineWebServices.java Wed May 28 18:09:04 2014
@@ -0,0 +1,539 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.timeline.webapp;
+
+import static org.apache.hadoop.yarn.util.StringHelper.CSV_JOINER;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.EnumSet;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Set;
+import java.util.SortedSet;
+import java.util.TreeSet;
+
+import javax.servlet.http.HttpServletRequest;
+import javax.servlet.http.HttpServletResponse;
+import javax.ws.rs.Consumes;
+import javax.ws.rs.GET;
+import javax.ws.rs.POST;
+import javax.ws.rs.Path;
+import javax.ws.rs.PathParam;
+import javax.ws.rs.Produces;
+import javax.ws.rs.QueryParam;
+import javax.ws.rs.WebApplicationException;
+import javax.ws.rs.core.Context;
+import javax.ws.rs.core.MediaType;
+import javax.ws.rs.core.Response;
+import javax.xml.bind.annotation.XmlAccessType;
+import javax.xml.bind.annotation.XmlAccessorType;
+import javax.xml.bind.annotation.XmlElement;
+import javax.xml.bind.annotation.XmlRootElement;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience.Public;
+import org.apache.hadoop.classification.InterfaceStability.Unstable;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.yarn.api.records.timeline.TimelineEntities;
+import org.apache.hadoop.yarn.api.records.timeline.TimelineEntity;
+import org.apache.hadoop.yarn.api.records.timeline.TimelineEvents;
+import org.apache.hadoop.yarn.api.records.timeline.TimelinePutResponse;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.server.timeline.EntityIdentifier;
+import org.apache.hadoop.yarn.server.timeline.GenericObjectMapper;
+import org.apache.hadoop.yarn.server.timeline.NameValuePair;
+import org.apache.hadoop.yarn.server.timeline.TimelineStore;
+import org.apache.hadoop.yarn.server.timeline.TimelineReader.Field;
+import org.apache.hadoop.yarn.server.timeline.security.TimelineACLsManager;
+import org.apache.hadoop.yarn.util.timeline.TimelineUtils;
+import org.apache.hadoop.yarn.webapp.BadRequestException;
+import org.apache.hadoop.yarn.webapp.NotFoundException;
+
+import com.google.inject.Inject;
+import com.google.inject.Singleton;
+
+@Singleton
+@Path("/ws/v1/timeline")
+//TODO: support XML serialization/deserialization
+public class TimelineWebServices {
+
+ private static final Log LOG = LogFactory.getLog(TimelineWebServices.class);
+
+ private TimelineStore store;
+ private TimelineACLsManager timelineACLsManager;
+
+ @Inject
+ public TimelineWebServices(TimelineStore store,
+ TimelineACLsManager timelineACLsManager) {
+ this.store = store;
+ this.timelineACLsManager = timelineACLsManager;
+ }
+
+ @XmlRootElement(name = "about")
+ @XmlAccessorType(XmlAccessType.NONE)
+ @Public
+ @Unstable
+ public static class AboutInfo {
+
+ private String about;
+
+ public AboutInfo() {
+
+ }
+
+ public AboutInfo(String about) {
+ this.about = about;
+ }
+
+ @XmlElement(name = "About")
+ public String getAbout() {
+ return about;
+ }
+
+ public void setAbout(String about) {
+ this.about = about;
+ }
+
+ }
+
+ /**
+ * Return the description of the timeline web services.
+ */
+ @GET
+ @Produces({ MediaType.APPLICATION_JSON /* , MediaType.APPLICATION_XML */})
+ public AboutInfo about(
+ @Context HttpServletRequest req,
+ @Context HttpServletResponse res) {
+ init(res);
+ return new AboutInfo("Timeline API");
+ }
+
+ /**
+ * Return a list of entities that match the given parameters.
+ */
+ @GET
+ @Path("/{entityType}")
+ @Produces({ MediaType.APPLICATION_JSON /* , MediaType.APPLICATION_XML */})
+ public TimelineEntities getEntities(
+ @Context HttpServletRequest req,
+ @Context HttpServletResponse res,
+ @PathParam("entityType") String entityType,
+ @QueryParam("primaryFilter") String primaryFilter,
+ @QueryParam("secondaryFilter") String secondaryFilter,
+ @QueryParam("windowStart") String windowStart,
+ @QueryParam("windowEnd") String windowEnd,
+ @QueryParam("fromId") String fromId,
+ @QueryParam("fromTs") String fromTs,
+ @QueryParam("limit") String limit,
+ @QueryParam("fields") String fields) {
+ init(res);
+ TimelineEntities entities = null;
+ try {
+ EnumSet<Field> fieldEnums = parseFieldsStr(fields, ",");
+ boolean modified = extendFields(fieldEnums);
+ UserGroupInformation callerUGI = getUser(req);
+ entities = store.getEntities(
+ parseStr(entityType),
+ parseLongStr(limit),
+ parseLongStr(windowStart),
+ parseLongStr(windowEnd),
+ parseStr(fromId),
+ parseLongStr(fromTs),
+ parsePairStr(primaryFilter, ":"),
+ parsePairsStr(secondaryFilter, ",", ":"),
+ fieldEnums);
+ if (entities != null) {
+ Iterator<TimelineEntity> entitiesItr =
+ entities.getEntities().iterator();
+ while (entitiesItr.hasNext()) {
+ TimelineEntity entity = entitiesItr.next();
+ try {
+ // check ACLs
+ if (!timelineACLsManager.checkAccess(callerUGI, entity)) {
+ entitiesItr.remove();
+ } else {
+ // clean up system data
+ if (modified) {
+ entity.setPrimaryFilters(null);
+ } else {
+ cleanupOwnerInfo(entity);
+ }
+ }
+ } catch (YarnException e) {
+ LOG.error("Error when verifying access for user " + callerUGI
+ + " on the events of the timeline entity "
+ + new EntityIdentifier(entity.getEntityId(),
+ entity.getEntityType()), e);
+ entitiesItr.remove();
+ }
+ }
+ }
+ } catch (NumberFormatException e) {
+ throw new BadRequestException(
+ "windowStart, windowEnd or limit is not a numeric value.");
+ } catch (IllegalArgumentException e) {
+ throw new BadRequestException("requested invalid field.");
+ } catch (IOException e) {
+ LOG.error("Error getting entities", e);
+ throw new WebApplicationException(e,
+ Response.Status.INTERNAL_SERVER_ERROR);
+ }
+ if (entities == null) {
+ return new TimelineEntities();
+ }
+ return entities;
+ }
+
+ /**
+ * Return a single entity of the given entity type and Id.
+ */
+ @GET
+ @Path("/{entityType}/{entityId}")
+ @Produces({ MediaType.APPLICATION_JSON /* , MediaType.APPLICATION_XML */})
+ public TimelineEntity getEntity(
+ @Context HttpServletRequest req,
+ @Context HttpServletResponse res,
+ @PathParam("entityType") String entityType,
+ @PathParam("entityId") String entityId,
+ @QueryParam("fields") String fields) {
+ init(res);
+ TimelineEntity entity = null;
+ try {
+ EnumSet<Field> fieldEnums = parseFieldsStr(fields, ",");
+ boolean modified = extendFields(fieldEnums);
+ entity =
+ store.getEntity(parseStr(entityId), parseStr(entityType),
+ fieldEnums);
+ if (entity != null) {
+ // check ACLs
+ UserGroupInformation callerUGI = getUser(req);
+ if (!timelineACLsManager.checkAccess(callerUGI, entity)) {
+ entity = null;
+ } else {
+ // clean up the system data
+ if (modified) {
+ entity.setPrimaryFilters(null);
+ } else {
+ cleanupOwnerInfo(entity);
+ }
+ }
+ }
+ } catch (IllegalArgumentException e) {
+ throw new BadRequestException(
+ "requested invalid field.");
+ } catch (IOException e) {
+ LOG.error("Error getting entity", e);
+ throw new WebApplicationException(e,
+ Response.Status.INTERNAL_SERVER_ERROR);
+ } catch (YarnException e) {
+ LOG.error("Error getting entity", e);
+ throw new WebApplicationException(e,
+ Response.Status.INTERNAL_SERVER_ERROR);
+ }
+ if (entity == null) {
+ throw new NotFoundException("Timeline entity "
+ + new EntityIdentifier(parseStr(entityId), parseStr(entityType))
+ + " is not found");
+ }
+ return entity;
+ }
+
+ /**
+ * Return the events that match the given parameters.
+ */
+ @GET
+ @Path("/{entityType}/events")
+ @Produces({ MediaType.APPLICATION_JSON /* , MediaType.APPLICATION_XML */})
+ public TimelineEvents getEvents(
+ @Context HttpServletRequest req,
+ @Context HttpServletResponse res,
+ @PathParam("entityType") String entityType,
+ @QueryParam("entityId") String entityId,
+ @QueryParam("eventType") String eventType,
+ @QueryParam("windowStart") String windowStart,
+ @QueryParam("windowEnd") String windowEnd,
+ @QueryParam("limit") String limit) {
+ init(res);
+ TimelineEvents events = null;
+ try {
+ UserGroupInformation callerUGI = getUser(req);
+ events = store.getEntityTimelines(
+ parseStr(entityType),
+ parseArrayStr(entityId, ","),
+ parseLongStr(limit),
+ parseLongStr(windowStart),
+ parseLongStr(windowEnd),
+ parseArrayStr(eventType, ","));
+ if (events != null) {
+ Iterator<TimelineEvents.EventsOfOneEntity> eventsItr =
+ events.getAllEvents().iterator();
+ while (eventsItr.hasNext()) {
+ TimelineEvents.EventsOfOneEntity eventsOfOneEntity = eventsItr.next();
+ try {
+ TimelineEntity entity = store.getEntity(
+ eventsOfOneEntity.getEntityId(),
+ eventsOfOneEntity.getEntityType(),
+ EnumSet.of(Field.PRIMARY_FILTERS));
+ // check ACLs
+ if (!timelineACLsManager.checkAccess(callerUGI, entity)) {
+ eventsItr.remove();
+ }
+ } catch (Exception e) {
+ LOG.error("Error when verifying access for user " + callerUGI
+ + " on the events of the timeline entity "
+ + new EntityIdentifier(eventsOfOneEntity.getEntityId(),
+ eventsOfOneEntity.getEntityType()), e);
+ eventsItr.remove();
+ }
+ }
+ }
+ } catch (NumberFormatException e) {
+ throw new BadRequestException(
+ "windowStart, windowEnd or limit is not a numeric value.");
+ } catch (IOException e) {
+ LOG.error("Error getting entity timelines", e);
+ throw new WebApplicationException(e,
+ Response.Status.INTERNAL_SERVER_ERROR);
+ }
+ if (events == null) {
+ return new TimelineEvents();
+ }
+ return events;
+ }
+
+ /**
+ * Store the given entities into the timeline store, and return the errors
+ * that happen during storing.
+ */
+ @POST
+ @Consumes({ MediaType.APPLICATION_JSON /* , MediaType.APPLICATION_XML */})
+ public TimelinePutResponse postEntities(
+ @Context HttpServletRequest req,
+ @Context HttpServletResponse res,
+ TimelineEntities entities) {
+ init(res);
+ if (entities == null) {
+ return new TimelinePutResponse();
+ }
+ UserGroupInformation callerUGI = getUser(req);
+ try {
+ List<EntityIdentifier> entityIDs = new ArrayList<EntityIdentifier>();
+ TimelineEntities entitiesToPut = new TimelineEntities();
+ List<TimelinePutResponse.TimelinePutError> errors =
+ new ArrayList<TimelinePutResponse.TimelinePutError>();
+ for (TimelineEntity entity : entities.getEntities()) {
+ EntityIdentifier entityID =
+ new EntityIdentifier(entity.getEntityId(), entity.getEntityType());
+
+ // check if there is existing entity
+ TimelineEntity existingEntity = null;
+ try {
+ existingEntity =
+ store.getEntity(entityID.getId(), entityID.getType(),
+ EnumSet.of(Field.PRIMARY_FILTERS));
+ if (existingEntity != null
+ && !timelineACLsManager.checkAccess(callerUGI, existingEntity)) {
+ throw new YarnException("The timeline entity " + entityID
+ + " was not put by " + callerUGI + " before");
+ }
+ } catch (Exception e) {
+ // Skip the entity which already exists and was put by others
+ LOG.warn("Skip the timeline entity: " + entityID + ", because "
+ + e.getMessage());
+ TimelinePutResponse.TimelinePutError error =
+ new TimelinePutResponse.TimelinePutError();
+ error.setEntityId(entityID.getId());
+ error.setEntityType(entityID.getType());
+ error.setErrorCode(
+ TimelinePutResponse.TimelinePutError.ACCESS_DENIED);
+ errors.add(error);
+ continue;
+ }
+
+ // inject owner information for the access check if this is the first
+ // time to post the entity, in case it's the admin who is updating
+ // the timeline data.
+ try {
+ if (existingEntity == null) {
+ injectOwnerInfo(entity,
+ callerUGI == null ? "" : callerUGI.getShortUserName());
+ }
+ } catch (YarnException e) {
+ // Skip the entity which messes up the primary filter and record the
+ // error
+ LOG.warn("Skip the timeline entity: " + entityID + ", because "
+ + e.getMessage());
+ TimelinePutResponse.TimelinePutError error =
+ new TimelinePutResponse.TimelinePutError();
+ error.setEntityId(entityID.getId());
+ error.setEntityType(entityID.getType());
+ error.setErrorCode(
+ TimelinePutResponse.TimelinePutError.SYSTEM_FILTER_CONFLICT);
+ errors.add(error);
+ continue;
+ }
+
+ entityIDs.add(entityID);
+ entitiesToPut.addEntity(entity);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Storing the entity " + entityID + ", JSON-style content: "
+ + TimelineUtils.dumpTimelineRecordtoJSON(entity));
+ }
+ }
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Storing entities: " + CSV_JOINER.join(entityIDs));
+ }
+ TimelinePutResponse response = store.put(entitiesToPut);
+ // add the errors of timeline system filter key conflict
+ response.addErrors(errors);
+ return response;
+ } catch (IOException e) {
+ LOG.error("Error putting entities", e);
+ throw new WebApplicationException(e,
+ Response.Status.INTERNAL_SERVER_ERROR);
+ }
+ }
+
+ private void init(HttpServletResponse response) {
+ response.setContentType(null);
+ }
+
+ private static SortedSet<String> parseArrayStr(String str, String delimiter) {
+ if (str == null) {
+ return null;
+ }
+ SortedSet<String> strSet = new TreeSet<String>();
+ String[] strs = str.split(delimiter);
+ for (String aStr : strs) {
+ strSet.add(aStr.trim());
+ }
+ return strSet;
+ }
+
+ private static NameValuePair parsePairStr(String str, String delimiter) {
+ if (str == null) {
+ return null;
+ }
+ String[] strs = str.split(delimiter, 2);
+ try {
+ return new NameValuePair(strs[0].trim(),
+ GenericObjectMapper.OBJECT_READER.readValue(strs[1].trim()));
+ } catch (Exception e) {
+ // didn't work as an Object, keep it as a String
+ return new NameValuePair(strs[0].trim(), strs[1].trim());
+ }
+ }
+
+ private static Collection<NameValuePair> parsePairsStr(
+ String str, String aDelimiter, String pDelimiter) {
+ if (str == null) {
+ return null;
+ }
+ String[] strs = str.split(aDelimiter);
+ Set<NameValuePair> pairs = new HashSet<NameValuePair>();
+ for (String aStr : strs) {
+ pairs.add(parsePairStr(aStr, pDelimiter));
+ }
+ return pairs;
+ }
+
+ private static EnumSet<Field> parseFieldsStr(String str, String delimiter) {
+ if (str == null) {
+ return null;
+ }
+ String[] strs = str.split(delimiter);
+ List<Field> fieldList = new ArrayList<Field>();
+ for (String s : strs) {
+ s = s.trim().toUpperCase();
+ if (s.equals("EVENTS")) {
+ fieldList.add(Field.EVENTS);
+ } else if (s.equals("LASTEVENTONLY")) {
+ fieldList.add(Field.LAST_EVENT_ONLY);
+ } else if (s.equals("RELATEDENTITIES")) {
+ fieldList.add(Field.RELATED_ENTITIES);
+ } else if (s.equals("PRIMARYFILTERS")) {
+ fieldList.add(Field.PRIMARY_FILTERS);
+ } else if (s.equals("OTHERINFO")) {
+ fieldList.add(Field.OTHER_INFO);
+ } else {
+ throw new IllegalArgumentException("Requested nonexistent field " + s);
+ }
+ }
+ if (fieldList.size() == 0) {
+ return null;
+ }
+ Field f1 = fieldList.remove(fieldList.size() - 1);
+ if (fieldList.size() == 0) {
+ return EnumSet.of(f1);
+ } else {
+ return EnumSet.of(f1, fieldList.toArray(new Field[fieldList.size()]));
+ }
+ }
+
+ private static boolean extendFields(EnumSet<Field> fieldEnums) {
+ boolean modified = false;
+ if (fieldEnums != null && !fieldEnums.contains(Field.PRIMARY_FILTERS)) {
+ fieldEnums.add(Field.PRIMARY_FILTERS);
+ modified = true;
+ }
+ return modified;
+ }
+ private static Long parseLongStr(String str) {
+ return str == null ? null : Long.parseLong(str.trim());
+ }
+
+ private static String parseStr(String str) {
+ return str == null ? null : str.trim();
+ }
+
+ private static UserGroupInformation getUser(HttpServletRequest req) {
+ String remoteUser = req.getRemoteUser();
+ UserGroupInformation callerUGI = null;
+ if (remoteUser != null) {
+ callerUGI = UserGroupInformation.createRemoteUser(remoteUser);
+ }
+ return callerUGI;
+ }
+
+ private static void injectOwnerInfo(TimelineEntity timelineEntity,
+ String owner) throws YarnException {
+ if (timelineEntity.getPrimaryFilters() != null &&
+ timelineEntity.getPrimaryFilters().containsKey(
+ TimelineStore.SystemFilter.ENTITY_OWNER)) {
+ throw new YarnException(
+ "User should not use the timeline system filter key: "
+ + TimelineStore.SystemFilter.ENTITY_OWNER);
+ }
+ timelineEntity.addPrimaryFilter(
+ TimelineStore.SystemFilter.ENTITY_OWNER
+ .toString(), owner);
+ }
+
+ private static void cleanupOwnerInfo(TimelineEntity timelineEntity) {
+ if (timelineEntity.getPrimaryFilters() != null) {
+ timelineEntity.getPrimaryFilters().remove(
+ TimelineStore.SystemFilter.ENTITY_OWNER.toString());
+ }
+ }
+
+}
Added: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/test/java/org/apache/hadoop/yarn/server/timeline/TestGenericObjectMapper.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/test/java/org/apache/hadoop/yarn/server/timeline/TestGenericObjectMapper.java?rev=1598094&view=auto
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/test/java/org/apache/hadoop/yarn/server/timeline/TestGenericObjectMapper.java (added)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/test/java/org/apache/hadoop/yarn/server/timeline/TestGenericObjectMapper.java Wed May 28 18:09:04 2014
@@ -0,0 +1,102 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.timeline;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.io.WritableComparator;
+import org.apache.hadoop.yarn.server.timeline.GenericObjectMapper;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static org.junit.Assert.assertEquals;
+
+@InterfaceAudience.Private
+@InterfaceStability.Unstable
+public class TestGenericObjectMapper {
+
+ @Test
+ public void testEncoding() {
+ testEncoding(Long.MAX_VALUE);
+ testEncoding(Long.MIN_VALUE);
+ testEncoding(0l);
+ testEncoding(128l);
+ testEncoding(256l);
+ testEncoding(512l);
+ testEncoding(-256l);
+ }
+
+ private static void testEncoding(long l) {
+ byte[] b = GenericObjectMapper.writeReverseOrderedLong(l);
+ assertEquals("error decoding", l,
+ GenericObjectMapper.readReverseOrderedLong(b, 0));
+ byte[] buf = new byte[16];
+ System.arraycopy(b, 0, buf, 5, 8);
+ assertEquals("error decoding at offset", l,
+ GenericObjectMapper.readReverseOrderedLong(buf, 5));
+ if (l > Long.MIN_VALUE) {
+ byte[] a = GenericObjectMapper.writeReverseOrderedLong(l-1);
+ assertEquals("error preserving ordering", 1,
+ WritableComparator.compareBytes(a, 0, a.length, b, 0, b.length));
+ }
+ if (l < Long.MAX_VALUE) {
+ byte[] c = GenericObjectMapper.writeReverseOrderedLong(l+1);
+ assertEquals("error preserving ordering", 1,
+ WritableComparator.compareBytes(b, 0, b.length, c, 0, c.length));
+ }
+ }
+
+ private static void verify(Object o) throws IOException {
+ assertEquals(o, GenericObjectMapper.read(GenericObjectMapper.write(o)));
+ }
+
+ @Test
+ public void testValueTypes() throws IOException {
+ verify(Integer.MAX_VALUE);
+ verify(Integer.MIN_VALUE);
+ assertEquals(Integer.MAX_VALUE, GenericObjectMapper.read(
+ GenericObjectMapper.write((long) Integer.MAX_VALUE)));
+ assertEquals(Integer.MIN_VALUE, GenericObjectMapper.read(
+ GenericObjectMapper.write((long) Integer.MIN_VALUE)));
+ verify((long)Integer.MAX_VALUE + 1l);
+ verify((long)Integer.MIN_VALUE - 1l);
+
+ verify(Long.MAX_VALUE);
+ verify(Long.MIN_VALUE);
+
+ assertEquals(42, GenericObjectMapper.read(GenericObjectMapper.write(42l)));
+ verify(42);
+ verify(1.23);
+ verify("abc");
+ verify(true);
+ List<String> list = new ArrayList<String>();
+ list.add("123");
+ list.add("abc");
+ verify(list);
+ Map<String,String> map = new HashMap<String,String>();
+ map.put("k1","v1");
+ map.put("k2","v2");
+ verify(map);
+ }
+
+}