You are viewing a plain text version of this content. The canonical link for it is here.
Posted to yarn-commits@hadoop.apache.org by zj...@apache.org on 2014/05/28 20:09:05 UTC

svn commit: r1598094 [2/3] - in /hadoop/common/trunk/hadoop-yarn-project: ./ hadoop-yarn/hadoop-yarn-common/src/main/resources/ hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/appl...

Added: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/timeline/MemoryTimelineStore.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/timeline/MemoryTimelineStore.java?rev=1598094&view=auto
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/timeline/MemoryTimelineStore.java (added)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/timeline/MemoryTimelineStore.java Wed May 28 18:09:04 2014
@@ -0,0 +1,373 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.timeline;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.EnumSet;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.PriorityQueue;
+import java.util.Set;
+import java.util.SortedSet;
+import java.util.TreeSet;
+
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.classification.InterfaceStability.Unstable;
+import org.apache.hadoop.service.AbstractService;
+import org.apache.hadoop.yarn.api.records.timeline.TimelineEntities;
+import org.apache.hadoop.yarn.api.records.timeline.TimelineEntity;
+import org.apache.hadoop.yarn.api.records.timeline.TimelineEvent;
+import org.apache.hadoop.yarn.api.records.timeline.TimelineEvents;
+import org.apache.hadoop.yarn.api.records.timeline.TimelineEvents.EventsOfOneEntity;
+import org.apache.hadoop.yarn.api.records.timeline.TimelinePutResponse;
+import org.apache.hadoop.yarn.api.records.timeline.TimelinePutResponse.TimelinePutError;
+
+/**
+ * In-memory implementation of {@link TimelineStore}. This
+ * implementation is for test purpose only. If users improperly instantiate it,
+ * they may encounter reading and writing history data in different memory
+ * store.
+ * 
+ */
+@Private
+@Unstable
+public class MemoryTimelineStore
+    extends AbstractService implements TimelineStore {
+
+  private Map<EntityIdentifier, TimelineEntity> entities =
+      new HashMap<EntityIdentifier, TimelineEntity>();
+  private Map<EntityIdentifier, Long> entityInsertTimes =
+      new HashMap<EntityIdentifier, Long>();
+
+  public MemoryTimelineStore() {
+    super(MemoryTimelineStore.class.getName());
+  }
+
+  @Override
+  public TimelineEntities getEntities(String entityType, Long limit,
+      Long windowStart, Long windowEnd, String fromId, Long fromTs,
+      NameValuePair primaryFilter, Collection<NameValuePair> secondaryFilters,
+      EnumSet<Field> fields) {
+    if (limit == null) {
+      limit = DEFAULT_LIMIT;
+    }
+    if (windowStart == null) {
+      windowStart = Long.MIN_VALUE;
+    }
+    if (windowEnd == null) {
+      windowEnd = Long.MAX_VALUE;
+    }
+    if (fields == null) {
+      fields = EnumSet.allOf(Field.class);
+    }
+
+    Iterator<TimelineEntity> entityIterator = null;
+    if (fromId != null) {
+      TimelineEntity firstEntity = entities.get(new EntityIdentifier(fromId,
+          entityType));
+      if (firstEntity == null) {
+        return new TimelineEntities();
+      } else {
+        entityIterator = new TreeSet<TimelineEntity>(entities.values())
+            .tailSet(firstEntity, true).iterator();
+      }
+    }
+    if (entityIterator == null) {
+      entityIterator = new PriorityQueue<TimelineEntity>(entities.values())
+          .iterator();
+    }
+
+    List<TimelineEntity> entitiesSelected = new ArrayList<TimelineEntity>();
+    while (entityIterator.hasNext()) {
+      TimelineEntity entity = entityIterator.next();
+      if (entitiesSelected.size() >= limit) {
+        break;
+      }
+      if (!entity.getEntityType().equals(entityType)) {
+        continue;
+      }
+      if (entity.getStartTime() <= windowStart) {
+        continue;
+      }
+      if (entity.getStartTime() > windowEnd) {
+        continue;
+      }
+      if (fromTs != null && entityInsertTimes.get(new EntityIdentifier(
+          entity.getEntityId(), entity.getEntityType())) > fromTs) {
+        continue;
+      }
+      if (primaryFilter != null &&
+          !matchPrimaryFilter(entity.getPrimaryFilters(), primaryFilter)) {
+        continue;
+      }
+      if (secondaryFilters != null) { // AND logic
+        boolean flag = true;
+        for (NameValuePair secondaryFilter : secondaryFilters) {
+          if (secondaryFilter != null && !matchPrimaryFilter(
+              entity.getPrimaryFilters(), secondaryFilter) &&
+              !matchFilter(entity.getOtherInfo(), secondaryFilter)) {
+            flag = false;
+            break;
+          }
+        }
+        if (!flag) {
+          continue;
+        }
+      }
+      entitiesSelected.add(entity);
+    }
+    List<TimelineEntity> entitiesToReturn = new ArrayList<TimelineEntity>();
+    for (TimelineEntity entitySelected : entitiesSelected) {
+      entitiesToReturn.add(maskFields(entitySelected, fields));
+    }
+    Collections.sort(entitiesToReturn);
+    TimelineEntities entitiesWrapper = new TimelineEntities();
+    entitiesWrapper.setEntities(entitiesToReturn);
+    return entitiesWrapper;
+  }
+
+  @Override
+  public TimelineEntity getEntity(String entityId, String entityType,
+      EnumSet<Field> fieldsToRetrieve) {
+    if (fieldsToRetrieve == null) {
+      fieldsToRetrieve = EnumSet.allOf(Field.class);
+    }
+    TimelineEntity entity = entities.get(new EntityIdentifier(entityId, entityType));
+    if (entity == null) {
+      return null;
+    } else {
+      return maskFields(entity, fieldsToRetrieve);
+    }
+  }
+
+  @Override
+  public TimelineEvents getEntityTimelines(String entityType,
+      SortedSet<String> entityIds, Long limit, Long windowStart,
+      Long windowEnd,
+      Set<String> eventTypes) {
+    TimelineEvents allEvents = new TimelineEvents();
+    if (entityIds == null) {
+      return allEvents;
+    }
+    if (limit == null) {
+      limit = DEFAULT_LIMIT;
+    }
+    if (windowStart == null) {
+      windowStart = Long.MIN_VALUE;
+    }
+    if (windowEnd == null) {
+      windowEnd = Long.MAX_VALUE;
+    }
+    for (String entityId : entityIds) {
+      EntityIdentifier entityID = new EntityIdentifier(entityId, entityType);
+      TimelineEntity entity = entities.get(entityID);
+      if (entity == null) {
+        continue;
+      }
+      EventsOfOneEntity events = new EventsOfOneEntity();
+      events.setEntityId(entityId);
+      events.setEntityType(entityType);
+      for (TimelineEvent event : entity.getEvents()) {
+        if (events.getEvents().size() >= limit) {
+          break;
+        }
+        if (event.getTimestamp() <= windowStart) {
+          continue;
+        }
+        if (event.getTimestamp() > windowEnd) {
+          continue;
+        }
+        if (eventTypes != null && !eventTypes.contains(event.getEventType())) {
+          continue;
+        }
+        events.addEvent(event);
+      }
+      allEvents.addEvent(events);
+    }
+    return allEvents;
+  }
+
+  @Override
+  public TimelinePutResponse put(TimelineEntities data) {
+    TimelinePutResponse response = new TimelinePutResponse();
+    for (TimelineEntity entity : data.getEntities()) {
+      EntityIdentifier entityId =
+          new EntityIdentifier(entity.getEntityId(), entity.getEntityType());
+      // store entity info in memory
+      TimelineEntity existingEntity = entities.get(entityId);
+      if (existingEntity == null) {
+        existingEntity = new TimelineEntity();
+        existingEntity.setEntityId(entity.getEntityId());
+        existingEntity.setEntityType(entity.getEntityType());
+        existingEntity.setStartTime(entity.getStartTime());
+        entities.put(entityId, existingEntity);
+        entityInsertTimes.put(entityId, System.currentTimeMillis());
+      }
+      if (entity.getEvents() != null) {
+        if (existingEntity.getEvents() == null) {
+          existingEntity.setEvents(entity.getEvents());
+        } else {
+          existingEntity.addEvents(entity.getEvents());
+        }
+        Collections.sort(existingEntity.getEvents());
+      }
+      // check startTime
+      if (existingEntity.getStartTime() == null) {
+        if (existingEntity.getEvents() == null
+            || existingEntity.getEvents().isEmpty()) {
+          TimelinePutError error = new TimelinePutError();
+          error.setEntityId(entityId.getId());
+          error.setEntityType(entityId.getType());
+          error.setErrorCode(TimelinePutError.NO_START_TIME);
+          response.addError(error);
+          entities.remove(entityId);
+          entityInsertTimes.remove(entityId);
+          continue;
+        } else {
+          Long min = Long.MAX_VALUE;
+          for (TimelineEvent e : entity.getEvents()) {
+            if (min > e.getTimestamp()) {
+              min = e.getTimestamp();
+            }
+          }
+          existingEntity.setStartTime(min);
+        }
+      }
+      if (entity.getPrimaryFilters() != null) {
+        if (existingEntity.getPrimaryFilters() == null) {
+          existingEntity.setPrimaryFilters(new HashMap<String, Set<Object>>());
+        }
+        for (Entry<String, Set<Object>> pf :
+            entity.getPrimaryFilters().entrySet()) {
+          for (Object pfo : pf.getValue()) {
+            existingEntity.addPrimaryFilter(pf.getKey(), maybeConvert(pfo));
+          }
+        }
+      }
+      if (entity.getOtherInfo() != null) {
+        if (existingEntity.getOtherInfo() == null) {
+          existingEntity.setOtherInfo(new HashMap<String, Object>());
+        }
+        for (Entry<String, Object> info : entity.getOtherInfo().entrySet()) {
+          existingEntity.addOtherInfo(info.getKey(),
+              maybeConvert(info.getValue()));
+        }
+      }
+      // relate it to other entities
+      if (entity.getRelatedEntities() == null) {
+        continue;
+      }
+      for (Map.Entry<String, Set<String>> partRelatedEntities : entity
+          .getRelatedEntities().entrySet()) {
+        if (partRelatedEntities == null) {
+          continue;
+        }
+        for (String idStr : partRelatedEntities.getValue()) {
+          EntityIdentifier relatedEntityId =
+              new EntityIdentifier(idStr, partRelatedEntities.getKey());
+          TimelineEntity relatedEntity = entities.get(relatedEntityId);
+          if (relatedEntity != null) {
+            relatedEntity.addRelatedEntity(
+                existingEntity.getEntityType(), existingEntity.getEntityId());
+          } else {
+            relatedEntity = new TimelineEntity();
+            relatedEntity.setEntityId(relatedEntityId.getId());
+            relatedEntity.setEntityType(relatedEntityId.getType());
+            relatedEntity.setStartTime(existingEntity.getStartTime());
+            relatedEntity.addRelatedEntity(existingEntity.getEntityType(),
+                existingEntity.getEntityId());
+            entities.put(relatedEntityId, relatedEntity);
+            entityInsertTimes.put(relatedEntityId, System.currentTimeMillis());
+          }
+        }
+      }
+    }
+    return response;
+  }
+
+  private static TimelineEntity maskFields(
+      TimelineEntity entity, EnumSet<Field> fields) {
+    // Conceal the fields that are not going to be exposed
+    TimelineEntity entityToReturn = new TimelineEntity();
+    entityToReturn.setEntityId(entity.getEntityId());
+    entityToReturn.setEntityType(entity.getEntityType());
+    entityToReturn.setStartTime(entity.getStartTime());
+    // Deep copy
+    if (fields.contains(Field.EVENTS)) {
+      entityToReturn.addEvents(entity.getEvents());
+    } else if (fields.contains(Field.LAST_EVENT_ONLY)) {
+      entityToReturn.addEvent(entity.getEvents().get(0));
+    } else {
+      entityToReturn.setEvents(null);
+    }
+    if (fields.contains(Field.RELATED_ENTITIES)) {
+      entityToReturn.addRelatedEntities(entity.getRelatedEntities());
+    } else {
+      entityToReturn.setRelatedEntities(null);
+    }
+    if (fields.contains(Field.PRIMARY_FILTERS)) {
+      entityToReturn.addPrimaryFilters(entity.getPrimaryFilters());
+    } else {
+      entityToReturn.setPrimaryFilters(null);
+    }
+    if (fields.contains(Field.OTHER_INFO)) {
+      entityToReturn.addOtherInfo(entity.getOtherInfo());
+    } else {
+      entityToReturn.setOtherInfo(null);
+    }
+    return entityToReturn;
+  }
+
+  private static boolean matchFilter(Map<String, Object> tags,
+      NameValuePair filter) {
+    Object value = tags.get(filter.getName());
+    if (value == null) { // doesn't have the filter
+      return false;
+    } else if (!value.equals(filter.getValue())) { // doesn't match the filter
+      return false;
+    }
+    return true;
+  }
+
+  private static boolean matchPrimaryFilter(Map<String, Set<Object>> tags,
+      NameValuePair filter) {
+    Set<Object> value = tags.get(filter.getName());
+    if (value == null) { // doesn't have the filter
+      return false;
+    } else {
+      return value.contains(filter.getValue());
+    }
+  }
+
+  private static Object maybeConvert(Object o) {
+    if (o instanceof Long) {
+      Long l = (Long)o;
+      if (l >= Integer.MIN_VALUE && l <= Integer.MAX_VALUE) {
+        return l.intValue();
+      }
+    }
+    return o;
+  }
+
+}

Added: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/timeline/NameValuePair.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/timeline/NameValuePair.java?rev=1598094&view=auto
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/timeline/NameValuePair.java (added)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/timeline/NameValuePair.java Wed May 28 18:09:04 2014
@@ -0,0 +1,59 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.timeline;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+
+/**
+ * A class holding a name and value pair, used for specifying filters in
+ * {@link TimelineReader}.
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Unstable
+public class NameValuePair {
+  String name;
+  Object value;
+
+  public NameValuePair(String name, Object value) {
+    this.name = name;
+    this.value = value;
+  }
+
+  /**
+   * Get the name.
+   * @return The name.
+   */
+  public String getName() {
+
+    return name;
+  }
+
+  /**
+   * Get the value.
+   * @return The value.
+   */
+  public Object getValue() {
+    return value;
+  }
+
+  @Override
+  public String toString() {
+    return "{ name: " + name + ", value: " + value + " }";
+  }
+}

Added: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/timeline/TimelineReader.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/timeline/TimelineReader.java?rev=1598094&view=auto
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/timeline/TimelineReader.java (added)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/timeline/TimelineReader.java Wed May 28 18:09:04 2014
@@ -0,0 +1,155 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.timeline;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.EnumSet;
+import java.util.Set;
+import java.util.SortedSet;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.yarn.api.records.timeline.TimelineEntities;
+import org.apache.hadoop.yarn.api.records.timeline.TimelineEntity;
+import org.apache.hadoop.yarn.api.records.timeline.TimelineEvents;
+
+/**
+ * This interface is for retrieving timeline information.
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Unstable
+public interface TimelineReader {
+
+  /**
+   * Possible fields to retrieve for {@link #getEntities} and {@link #getEntity}
+   * .
+   */
+  enum Field {
+    EVENTS,
+    RELATED_ENTITIES,
+    PRIMARY_FILTERS,
+    OTHER_INFO,
+    LAST_EVENT_ONLY
+  }
+
+  /**
+   * Default limit for {@link #getEntities} and {@link #getEntityTimelines}.
+   */
+  final long DEFAULT_LIMIT = 100;
+
+  /**
+   * This method retrieves a list of entity information, {@link TimelineEntity},
+   * sorted by the starting timestamp for the entity, descending. The starting
+   * timestamp of an entity is a timestamp specified by the client. If it is not
+   * explicitly specified, it will be chosen by the store to be the earliest
+   * timestamp of the events received in the first put for the entity.
+   * 
+   * @param entityType
+   *          The type of entities to return (required).
+   * @param limit
+   *          A limit on the number of entities to return. If null, defaults to
+   *          {@link #DEFAULT_LIMIT}.
+   * @param windowStart
+   *          The earliest start timestamp to retrieve (exclusive). If null,
+   *          defaults to retrieving all entities until the limit is reached.
+   * @param windowEnd
+   *          The latest start timestamp to retrieve (inclusive). If null,
+   *          defaults to {@link Long#MAX_VALUE}
+   * @param fromId
+   *          If fromId is not null, retrieve entities earlier than and
+   *          including the specified ID. If no start time is found for the
+   *          specified ID, an empty list of entities will be returned. The
+   *          windowEnd parameter will take precedence if the start time of this
+   *          entity falls later than windowEnd.
+   * @param fromTs
+   *          If fromTs is not null, ignore entities that were inserted into the
+   *          store after the given timestamp. The entity's insert timestamp
+   *          used for this comparison is the store's system time when the first
+   *          put for the entity was received (not the entity's start time).
+   * @param primaryFilter
+   *          Retrieves only entities that have the specified primary filter. If
+   *          null, retrieves all entities. This is an indexed retrieval, and no
+   *          entities that do not match the filter are scanned.
+   * @param secondaryFilters
+   *          Retrieves only entities that have exact matches for all the
+   *          specified filters in their primary filters or other info. This is
+   *          not an indexed retrieval, so all entities are scanned but only
+   *          those matching the filters are returned.
+   * @param fieldsToRetrieve
+   *          Specifies which fields of the entity object to retrieve (see
+   *          {@link Field}). If the set of fields contains
+   *          {@link Field#LAST_EVENT_ONLY} and not {@link Field#EVENTS}, the
+   *          most recent event for each entity is retrieved. If null, retrieves
+   *          all fields.
+   * @return An {@link TimelineEntities} object.
+   * @throws IOException
+   */
+  TimelineEntities getEntities(String entityType,
+      Long limit, Long windowStart, Long windowEnd, String fromId, Long fromTs,
+      NameValuePair primaryFilter, Collection<NameValuePair> secondaryFilters,
+      EnumSet<Field> fieldsToRetrieve) throws IOException;
+
+  /**
+   * This method retrieves the entity information for a given entity.
+   * 
+   * @param entityId
+   *          The entity whose information will be retrieved.
+   * @param entityType
+   *          The type of the entity.
+   * @param fieldsToRetrieve
+   *          Specifies which fields of the entity object to retrieve (see
+   *          {@link Field}). If the set of fields contains
+   *          {@link Field#LAST_EVENT_ONLY} and not {@link Field#EVENTS}, the
+   *          most recent event for each entity is retrieved. If null, retrieves
+   *          all fields.
+   * @return An {@link TimelineEntity} object.
+   * @throws IOException
+   */
+  TimelineEntity getEntity(String entityId, String entityType, EnumSet<Field>
+      fieldsToRetrieve) throws IOException;
+
+  /**
+   * This method retrieves the events for a list of entities all of the same
+   * entity type. The events for each entity are sorted in order of their
+   * timestamps, descending.
+   * 
+   * @param entityType
+   *          The type of entities to retrieve events for.
+   * @param entityIds
+   *          The entity IDs to retrieve events for.
+   * @param limit
+   *          A limit on the number of events to return for each entity. If
+   *          null, defaults to {@link #DEFAULT_LIMIT} events per entity.
+   * @param windowStart
+   *          If not null, retrieves only events later than the given time
+   *          (exclusive)
+   * @param windowEnd
+   *          If not null, retrieves only events earlier than the given time
+   *          (inclusive)
+   * @param eventTypes
+   *          Restricts the events returned to the given types. If null, events
+   *          of all types will be returned.
+   * @return An {@link TimelineEvents} object.
+   * @throws IOException
+   */
+  TimelineEvents getEntityTimelines(String entityType,
+      SortedSet<String> entityIds, Long limit, Long windowStart,
+      Long windowEnd, Set<String> eventTypes) throws IOException;
+}

Added: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/timeline/TimelineStore.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/timeline/TimelineStore.java?rev=1598094&view=auto
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/timeline/TimelineStore.java (added)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/timeline/TimelineStore.java Wed May 28 18:09:04 2014
@@ -0,0 +1,42 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.timeline;
+
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.classification.InterfaceStability.Unstable;
+import org.apache.hadoop.service.Service;
+import org.apache.hadoop.yarn.api.records.timeline.TimelineEntity;
+
+@Private
+@Unstable
+public interface TimelineStore extends
+    Service, TimelineReader, TimelineWriter {
+
+  /**
+   * The system filter which will be automatically added to a
+   * {@link TimelineEntity}'s primary filter section when storing the entity.
+   * The filter key is case sensitive. Users are supposed not to use the key
+   * reserved by the timeline system.
+   */
+  @Private
+  enum SystemFilter {
+    ENTITY_OWNER
+  }
+
+}

Added: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/timeline/TimelineWriter.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/timeline/TimelineWriter.java?rev=1598094&view=auto
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/timeline/TimelineWriter.java (added)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/timeline/TimelineWriter.java Wed May 28 18:09:04 2014
@@ -0,0 +1,46 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.timeline;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.yarn.api.records.timeline.TimelineEntities;
+import org.apache.hadoop.yarn.api.records.timeline.TimelinePutResponse;
+
+import java.io.IOException;
+
+/**
+ * This interface is for storing timeline information.
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Unstable
+public interface TimelineWriter {
+
+  /**
+   * Stores entity information to the timeline store. Any errors occurring for
+   * individual put request objects will be reported in the response.
+   * 
+   * @param data
+   *          An {@link TimelineEntities} object.
+   * @return An {@link TimelinePutResponse} object.
+   * @throws IOException
+   */
+  TimelinePutResponse put(TimelineEntities data) throws IOException;
+
+}

Added: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/timeline/package-info.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/timeline/package-info.java?rev=1598094&view=auto
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/timeline/package-info.java (added)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/timeline/package-info.java Wed May 28 18:09:04 2014
@@ -0,0 +1,20 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+@InterfaceAudience.Private
+package org.apache.hadoop.yarn.server.timeline;
+import org.apache.hadoop.classification.InterfaceAudience;

Added: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/timeline/security/TimelineACLsManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/timeline/security/TimelineACLsManager.java?rev=1598094&view=auto
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/timeline/security/TimelineACLsManager.java (added)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/timeline/security/TimelineACLsManager.java Wed May 28 18:09:04 2014
@@ -0,0 +1,92 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.timeline.security;
+
+import java.io.IOException;
+import java.util.Set;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.yarn.api.records.timeline.TimelineEntity;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.security.AdminACLsManager;
+import org.apache.hadoop.yarn.server.timeline.EntityIdentifier;
+import org.apache.hadoop.yarn.server.timeline.TimelineStore.SystemFilter;
+
+import com.google.common.annotations.VisibleForTesting;
+
+/**
+ * <code>TimelineACLsManager</code> check the entity level timeline data access.
+ */
+@Private
+public class TimelineACLsManager {
+
+  private static final Log LOG = LogFactory.getLog(TimelineACLsManager.class);
+
+  private AdminACLsManager adminAclsManager;
+
+  public TimelineACLsManager(Configuration conf) {
+    this.adminAclsManager = new AdminACLsManager(conf);
+  }
+
+  public boolean checkAccess(UserGroupInformation callerUGI,
+      TimelineEntity entity) throws YarnException, IOException {
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Verifying the access of " + callerUGI.getShortUserName()
+          + " on the timeline entity "
+          + new EntityIdentifier(entity.getEntityId(), entity.getEntityType()));
+    }
+
+    if (!adminAclsManager.areACLsEnabled()) {
+      return true;
+    }
+
+    Set<Object> values =
+        entity.getPrimaryFilters().get(
+            SystemFilter.ENTITY_OWNER.toString());
+    if (values == null || values.size() != 1) {
+      throw new YarnException("Owner information of the timeline entity "
+          + new EntityIdentifier(entity.getEntityId(), entity.getEntityType())
+          + " is corrupted.");
+    }
+    String owner = values.iterator().next().toString();
+    // TODO: Currently we just check the user is the admin or the timeline
+    // entity owner. In the future, we need to check whether the user is in the
+    // allowed user/group list
+    if (callerUGI != null
+        && (adminAclsManager.isAdmin(callerUGI) ||
+            callerUGI.getShortUserName().equals(owner))) {
+      return true;
+    }
+    return false;
+  }
+
+  @Private
+  @VisibleForTesting
+  public AdminACLsManager
+      setAdminACLsManager(AdminACLsManager adminAclsManager) {
+    AdminACLsManager oldAdminACLsManager = this.adminAclsManager;
+    this.adminAclsManager = adminAclsManager;
+    return oldAdminACLsManager;
+  }
+
+}

Added: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/timeline/security/TimelineAuthenticationFilter.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/timeline/security/TimelineAuthenticationFilter.java?rev=1598094&view=auto
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/timeline/security/TimelineAuthenticationFilter.java (added)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/timeline/security/TimelineAuthenticationFilter.java Wed May 28 18:09:04 2014
@@ -0,0 +1,48 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.timeline.security;
+
+import java.util.Properties;
+
+import javax.servlet.FilterConfig;
+import javax.servlet.ServletException;
+
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.classification.InterfaceStability.Unstable;
+import org.apache.hadoop.security.authentication.server.AuthenticationFilter;
+
+@Private
+@Unstable
+public class TimelineAuthenticationFilter extends AuthenticationFilter {
+
+  @Override
+  protected Properties getConfiguration(String configPrefix,
+      FilterConfig filterConfig) throws ServletException {
+    // In yarn-site.xml, we can simply set type to "kerberos". However, we need
+    // to replace the name here to use the customized Kerberos + DT service
+    // instead of the standard Kerberos handler.
+    Properties properties = super.getConfiguration(configPrefix, filterConfig);
+    if (properties.getProperty(AUTH_TYPE).equals("kerberos")) {
+      properties.setProperty(
+          AUTH_TYPE, TimelineClientAuthenticationService.class.getName());
+    }
+    return properties;
+  }
+
+}

Added: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/timeline/security/TimelineAuthenticationFilterInitializer.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/timeline/security/TimelineAuthenticationFilterInitializer.java?rev=1598094&view=auto
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/timeline/security/TimelineAuthenticationFilterInitializer.java (added)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/timeline/security/TimelineAuthenticationFilterInitializer.java Wed May 28 18:09:04 2014
@@ -0,0 +1,127 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.timeline.security;
+
+import java.io.FileReader;
+import java.io.IOException;
+import java.io.Reader;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.http.FilterContainer;
+import org.apache.hadoop.http.FilterInitializer;
+import org.apache.hadoop.http.HttpServer2;
+import org.apache.hadoop.security.SecurityUtil;
+
+/**
+ * <p>
+ * Initializes {@link TimelineAuthenticationFilter} which provides support for
+ * Kerberos HTTP SPNEGO authentication.
+ * <p/>
+ * <p>
+ * It enables Kerberos HTTP SPNEGO plus delegation token authentication for the
+ * timeline server.
+ * <p/>
+ * Refer to the <code>core-default.xml</code> file, after the comment 'HTTP
+ * Authentication' for details on the configuration options. All related
+ * configuration properties have 'hadoop.http.authentication.' as prefix.
+ */
+public class TimelineAuthenticationFilterInitializer extends FilterInitializer {
+
+  /**
+   * The configuration prefix of timeline Kerberos + DT authentication
+   */
+  public static final String PREFIX = "yarn.timeline-service.http.authentication.";
+
+  private static final String SIGNATURE_SECRET_FILE =
+      TimelineAuthenticationFilter.SIGNATURE_SECRET + ".file";
+
+  /**
+   * <p>
+   * Initializes {@link TimelineAuthenticationFilter}
+   * <p/>
+   * <p>
+   * Propagates to {@link TimelineAuthenticationFilter} configuration all YARN
+   * configuration properties prefixed with
+   * "yarn.timeline-service.authentication."
+   * </p>
+   * 
+   * @param container
+   *          The filter container
+   * @param conf
+   *          Configuration for run-time parameters
+   */
+  @Override
+  public void initFilter(FilterContainer container, Configuration conf) {
+    Map<String, String> filterConfig = new HashMap<String, String>();
+
+    // setting the cookie path to root '/' so it is used for all resources.
+    filterConfig.put(TimelineAuthenticationFilter.COOKIE_PATH, "/");
+
+    for (Map.Entry<String, String> entry : conf) {
+      String name = entry.getKey();
+      if (name.startsWith(PREFIX)) {
+        String value = conf.get(name);
+        name = name.substring(PREFIX.length());
+        filterConfig.put(name, value);
+      }
+    }
+
+    String signatureSecretFile = filterConfig.get(SIGNATURE_SECRET_FILE);
+    if (signatureSecretFile != null) {
+      try {
+        StringBuilder secret = new StringBuilder();
+        Reader reader = new FileReader(signatureSecretFile);
+        int c = reader.read();
+        while (c > -1) {
+          secret.append((char) c);
+          c = reader.read();
+        }
+        reader.close();
+        filterConfig
+            .put(TimelineAuthenticationFilter.SIGNATURE_SECRET,
+                secret.toString());
+      } catch (IOException ex) {
+        throw new RuntimeException(
+            "Could not read HTTP signature secret file: "
+                + signatureSecretFile);
+      }
+    }
+
+    // Resolve _HOST into bind address
+    String bindAddress = conf.get(HttpServer2.BIND_ADDRESS);
+    String principal =
+        filterConfig.get(TimelineClientAuthenticationService.PRINCIPAL);
+    if (principal != null) {
+      try {
+        principal = SecurityUtil.getServerPrincipal(principal, bindAddress);
+      } catch (IOException ex) {
+        throw new RuntimeException(
+            "Could not resolve Kerberos principal name: " + ex.toString(), ex);
+      }
+      filterConfig.put(TimelineClientAuthenticationService.PRINCIPAL,
+          principal);
+    }
+
+    container.addGlobalFilter("Timeline Authentication Filter",
+        TimelineAuthenticationFilter.class.getName(),
+        filterConfig);
+  }
+}

Added: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/timeline/security/TimelineClientAuthenticationService.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/timeline/security/TimelineClientAuthenticationService.java?rev=1598094&view=auto
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/timeline/security/TimelineClientAuthenticationService.java (added)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/timeline/security/TimelineClientAuthenticationService.java Wed May 28 18:09:04 2014
@@ -0,0 +1,236 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.timeline.security;
+
+import java.io.IOException;
+import java.io.Writer;
+import java.text.MessageFormat;
+import java.util.HashSet;
+import java.util.Set;
+
+import javax.servlet.http.HttpServletRequest;
+import javax.servlet.http.HttpServletResponse;
+import javax.ws.rs.core.MediaType;
+
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.classification.InterfaceStability.Unstable;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.authentication.client.AuthenticationException;
+import org.apache.hadoop.security.authentication.server.AuthenticationToken;
+import org.apache.hadoop.security.authentication.server.KerberosAuthenticationHandler;
+import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.yarn.api.records.timeline.TimelineDelegationTokenResponse;
+import org.apache.hadoop.yarn.security.client.TimelineDelegationTokenIdentifier;
+import org.apache.hadoop.yarn.security.client.TimelineDelegationTokenOperation;
+import org.apache.hadoop.yarn.security.client.TimelineAuthenticationConsts;
+import org.apache.hadoop.yarn.server.applicationhistoryservice.webapp.AHSWebApp;
+import org.apache.hadoop.yarn.webapp.YarnJacksonJaxbJsonProvider;
+import org.codehaus.jackson.map.ObjectMapper;
+
+/**
+ * Server side <code>AuthenticationHandler</code> that authenticates requests
+ * using the incoming delegation token as a 'delegation' query string parameter.
+ * <p/>
+ * If not delegation token is present in the request it delegates to the
+ * {@link KerberosAuthenticationHandler}
+ */
+@Private
+@Unstable
+public class TimelineClientAuthenticationService
+    extends KerberosAuthenticationHandler {
+
+  public static final String TYPE = "kerberos-dt";
+  private static final Set<String> DELEGATION_TOKEN_OPS = new HashSet<String>();
+  private static final String OP_PARAM = "op";
+  private static final String ENTER = System.getProperty("line.separator");
+
+  private ObjectMapper mapper;
+
+  static {
+    DELEGATION_TOKEN_OPS.add(
+        TimelineDelegationTokenOperation.GETDELEGATIONTOKEN.toString());
+    DELEGATION_TOKEN_OPS.add(
+        TimelineDelegationTokenOperation.RENEWDELEGATIONTOKEN.toString());
+    DELEGATION_TOKEN_OPS.add(
+        TimelineDelegationTokenOperation.CANCELDELEGATIONTOKEN.toString());
+  }
+
+  public TimelineClientAuthenticationService() {
+    super();
+    mapper = new ObjectMapper();
+    YarnJacksonJaxbJsonProvider.configObjectMapper(mapper);
+  }
+
+  /**
+   * Returns authentication type of the handler.
+   * 
+   * @return <code>delegationtoken-kerberos</code>
+   */
+  @Override
+  public String getType() {
+    return TYPE;
+  }
+
+  @Override
+  public boolean managementOperation(AuthenticationToken token,
+      HttpServletRequest request, HttpServletResponse response)
+      throws IOException, AuthenticationException {
+    boolean requestContinues = true;
+    String op = request.getParameter(OP_PARAM);
+    op = (op != null) ? op.toUpperCase() : null;
+    if (DELEGATION_TOKEN_OPS.contains(op) &&
+        !request.getMethod().equals("OPTIONS")) {
+      TimelineDelegationTokenOperation dtOp =
+          TimelineDelegationTokenOperation.valueOf(op);
+      if (dtOp.getHttpMethod().equals(request.getMethod())) {
+        if (dtOp.requiresKerberosCredentials() && token == null) {
+          response.sendError(HttpServletResponse.SC_UNAUTHORIZED,
+              MessageFormat.format(
+                  "Operation [{0}] requires SPNEGO authentication established",
+                  dtOp));
+          requestContinues = false;
+        } else {
+          TimelineDelegationTokenSecretManagerService secretManager =
+              AHSWebApp.getInstance()
+                  .getTimelineDelegationTokenSecretManagerService();
+          try {
+            TimelineDelegationTokenResponse res = null;
+            switch (dtOp) {
+              case GETDELEGATIONTOKEN:
+                UserGroupInformation ownerUGI =
+                    UserGroupInformation.createRemoteUser(token.getUserName());
+                String renewerParam =
+                    request
+                        .getParameter(TimelineAuthenticationConsts.RENEWER_PARAM);
+                if (renewerParam == null) {
+                  renewerParam = token.getUserName();
+                }
+                Token<?> dToken =
+                    secretManager.createToken(ownerUGI, renewerParam);
+                res = new TimelineDelegationTokenResponse();
+                res.setType(TimelineAuthenticationConsts.DELEGATION_TOKEN_URL);
+                res.setContent(dToken.encodeToUrlString());
+                break;
+              case RENEWDELEGATIONTOKEN:
+              case CANCELDELEGATIONTOKEN:
+                String tokenParam =
+                    request
+                        .getParameter(TimelineAuthenticationConsts.TOKEN_PARAM);
+                if (tokenParam == null) {
+                  response.sendError(HttpServletResponse.SC_BAD_REQUEST,
+                      MessageFormat
+                          .format(
+                              "Operation [{0}] requires the parameter [{1}]",
+                              dtOp,
+                              TimelineAuthenticationConsts.TOKEN_PARAM));
+                  requestContinues = false;
+                } else {
+                  if (dtOp == TimelineDelegationTokenOperation.CANCELDELEGATIONTOKEN) {
+                    Token<TimelineDelegationTokenIdentifier> dt =
+                        new Token<TimelineDelegationTokenIdentifier>();
+                    dt.decodeFromUrlString(tokenParam);
+                    secretManager.cancelToken(dt, token.getUserName());
+                  } else {
+                    Token<TimelineDelegationTokenIdentifier> dt =
+                        new Token<TimelineDelegationTokenIdentifier>();
+                    dt.decodeFromUrlString(tokenParam);
+                    long expirationTime =
+                        secretManager.renewToken(dt, token.getUserName());
+                    res = new TimelineDelegationTokenResponse();
+                    res.setType(TimelineAuthenticationConsts.DELEGATION_TOKEN_EXPIRATION_TIME);
+                    res.setContent(expirationTime);
+                  }
+                }
+                break;
+            }
+            if (requestContinues) {
+              response.setStatus(HttpServletResponse.SC_OK);
+              if (res != null) {
+                response.setContentType(MediaType.APPLICATION_JSON);
+                Writer writer = response.getWriter();
+                mapper.writeValue(writer, res);
+                writer.write(ENTER);
+                writer.flush();
+
+              }
+              requestContinues = false;
+            }
+          } catch (IOException e) {
+            throw new AuthenticationException(e.toString(), e);
+          }
+        }
+      } else {
+        response
+            .sendError(
+                HttpServletResponse.SC_BAD_REQUEST,
+                MessageFormat
+                    .format(
+                        "Wrong HTTP method [{0}] for operation [{1}], it should be [{2}]",
+                        request.getMethod(), dtOp, dtOp.getHttpMethod()));
+        requestContinues = false;
+      }
+    }
+    return requestContinues;
+  }
+
+  /**
+   * Authenticates a request looking for the <code>delegation</code>
+   * query-string parameter and verifying it is a valid token. If there is not
+   * <code>delegation</code> query-string parameter, it delegates the
+   * authentication to the {@link KerberosAuthenticationHandler} unless it is
+   * disabled.
+   * 
+   * @param request
+   *          the HTTP client request.
+   * @param response
+   *          the HTTP client response.
+   * 
+   * @return the authentication token for the authenticated request.
+   * @throws IOException
+   *           thrown if an IO error occurred.
+   * @throws AuthenticationException
+   *           thrown if the authentication failed.
+   */
+  @Override
+  public AuthenticationToken authenticate(HttpServletRequest request,
+      HttpServletResponse response)
+      throws IOException, AuthenticationException {
+    AuthenticationToken token;
+    String delegationParam =
+        request
+            .getParameter(TimelineAuthenticationConsts.DELEGATION_PARAM);
+    if (delegationParam != null) {
+      Token<TimelineDelegationTokenIdentifier> dt =
+          new Token<TimelineDelegationTokenIdentifier>();
+      dt.decodeFromUrlString(delegationParam);
+      TimelineDelegationTokenSecretManagerService secretManager =
+          AHSWebApp.getInstance()
+              .getTimelineDelegationTokenSecretManagerService();
+      UserGroupInformation ugi = secretManager.verifyToken(dt);
+      final String shortName = ugi.getShortUserName();
+      // creating a ephemeral token
+      token = new AuthenticationToken(shortName, ugi.getUserName(), getType());
+      token.setExpires(0);
+    } else {
+      token = super.authenticate(request, response);
+    }
+    return token;
+  }
+
+}

Added: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/timeline/security/TimelineDelegationTokenSecretManagerService.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/timeline/security/TimelineDelegationTokenSecretManagerService.java?rev=1598094&view=auto
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/timeline/security/TimelineDelegationTokenSecretManagerService.java (added)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/timeline/security/TimelineDelegationTokenSecretManagerService.java Wed May 28 18:09:04 2014
@@ -0,0 +1,180 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.timeline.security;
+
+import java.io.ByteArrayInputStream;
+import java.io.DataInputStream;
+import java.io.IOException;
+import java.net.InetSocketAddress;
+
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.classification.InterfaceStability.Unstable;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.security.SecurityUtil;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.security.token.delegation.AbstractDelegationTokenSecretManager;
+import org.apache.hadoop.service.AbstractService;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.security.client.TimelineDelegationTokenIdentifier;
+import org.apache.hadoop.yarn.util.timeline.TimelineUtils;
+
+/**
+ * The service wrapper of {@link TimelineDelegationTokenSecretManager}
+ */
+@Private
+@Unstable
+public class TimelineDelegationTokenSecretManagerService extends AbstractService {
+
+  private TimelineDelegationTokenSecretManager secretManager = null;
+  private InetSocketAddress serviceAddr = null;
+
+  public TimelineDelegationTokenSecretManagerService() {
+    super(TimelineDelegationTokenSecretManagerService.class.getName());
+  }
+
+  @Override
+  protected void serviceInit(Configuration conf) throws Exception {
+    long secretKeyInterval =
+        conf.getLong(YarnConfiguration.DELEGATION_KEY_UPDATE_INTERVAL_KEY,
+            YarnConfiguration.DELEGATION_KEY_UPDATE_INTERVAL_DEFAULT);
+    long tokenMaxLifetime =
+        conf.getLong(YarnConfiguration.DELEGATION_TOKEN_MAX_LIFETIME_KEY,
+            YarnConfiguration.DELEGATION_TOKEN_MAX_LIFETIME_DEFAULT);
+    long tokenRenewInterval =
+        conf.getLong(YarnConfiguration.DELEGATION_TOKEN_RENEW_INTERVAL_KEY,
+            YarnConfiguration.DELEGATION_TOKEN_RENEW_INTERVAL_DEFAULT);
+    secretManager = new TimelineDelegationTokenSecretManager(secretKeyInterval,
+        tokenMaxLifetime, tokenRenewInterval,
+        3600000);
+    secretManager.startThreads();
+
+    serviceAddr = TimelineUtils.getTimelineTokenServiceAddress(getConfig());
+    super.init(conf);
+  }
+
+  @Override
+  protected void serviceStop() throws Exception {
+    secretManager.stopThreads();
+    super.stop();
+  }
+
+  /**
+   * Creates a delegation token.
+   *
+   * @param ugi UGI creating the token.
+   * @param renewer token renewer.
+   * @return new delegation token.
+   * @throws IOException thrown if the token could not be created.
+   */
+  public Token<TimelineDelegationTokenIdentifier> createToken(
+      UserGroupInformation ugi, String renewer) throws IOException {
+    renewer = (renewer == null) ? ugi.getShortUserName() : renewer;
+    String user = ugi.getUserName();
+    Text owner = new Text(user);
+    Text realUser = null;
+    if (ugi.getRealUser() != null) {
+      realUser = new Text(ugi.getRealUser().getUserName());
+    }
+    TimelineDelegationTokenIdentifier tokenIdentifier =
+        new TimelineDelegationTokenIdentifier(owner, new Text(renewer), realUser);
+    Token<TimelineDelegationTokenIdentifier> token =
+        new Token<TimelineDelegationTokenIdentifier>(tokenIdentifier, secretManager);
+    SecurityUtil.setTokenService(token, serviceAddr);
+    return token;
+  }
+
+  /**
+   * Renews a delegation token.
+   *
+   * @param token delegation token to renew.
+   * @param renewer token renewer.
+   * @throws IOException thrown if the token could not be renewed.
+   */
+  public long renewToken(Token<TimelineDelegationTokenIdentifier> token,
+      String renewer) throws IOException {
+      return secretManager.renewToken(token, renewer);
+  }
+
+  /**
+   * Cancels a delegation token.
+   *
+   * @param token delegation token to cancel.
+   * @param canceler token canceler.
+   * @throws IOException thrown if the token could not be canceled.
+   */
+  public void cancelToken(Token<TimelineDelegationTokenIdentifier> token,
+      String canceler) throws IOException {
+    secretManager.cancelToken(token, canceler);
+  }
+
+  /**
+   * Verifies a delegation token.
+   *
+   * @param token delegation token to verify.
+   * @return the UGI for the token.
+   * @throws IOException thrown if the token could not be verified.
+   */
+  public UserGroupInformation verifyToken(Token<TimelineDelegationTokenIdentifier> token)
+    throws IOException {
+    ByteArrayInputStream buf = new ByteArrayInputStream(token.getIdentifier());
+    DataInputStream dis = new DataInputStream(buf);
+    TimelineDelegationTokenIdentifier id = new TimelineDelegationTokenIdentifier();
+    try {
+      id.readFields(dis);
+      secretManager.verifyToken(id, token.getPassword());
+    } finally {
+      dis.close();
+    }
+    return id.getUser();
+  }
+
+  /**
+   * Create a timeline secret manager
+   * 
+   * @param delegationKeyUpdateInterval
+   *          the number of seconds for rolling new secret keys.
+   * @param delegationTokenMaxLifetime
+   *          the maximum lifetime of the delegation tokens
+   * @param delegationTokenRenewInterval
+   *          how often the tokens must be renewed
+   * @param delegationTokenRemoverScanInterval
+   *          how often the tokens are scanned for expired tokens
+   */
+  @Private
+  @Unstable
+  public static class TimelineDelegationTokenSecretManager extends
+      AbstractDelegationTokenSecretManager<TimelineDelegationTokenIdentifier> {
+
+    public TimelineDelegationTokenSecretManager(long delegationKeyUpdateInterval,
+        long delegationTokenMaxLifetime, long delegationTokenRenewInterval,
+        long delegationTokenRemoverScanInterval) {
+      super(delegationKeyUpdateInterval, delegationTokenMaxLifetime,
+          delegationTokenRenewInterval, delegationTokenRemoverScanInterval);
+    }
+
+    @Override
+    public TimelineDelegationTokenIdentifier createIdentifier() {
+      return new TimelineDelegationTokenIdentifier();
+    }
+
+  }
+
+}

Added: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/timeline/webapp/TimelineWebServices.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/timeline/webapp/TimelineWebServices.java?rev=1598094&view=auto
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/timeline/webapp/TimelineWebServices.java (added)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/timeline/webapp/TimelineWebServices.java Wed May 28 18:09:04 2014
@@ -0,0 +1,539 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.timeline.webapp;
+
+import static org.apache.hadoop.yarn.util.StringHelper.CSV_JOINER;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.EnumSet;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Set;
+import java.util.SortedSet;
+import java.util.TreeSet;
+
+import javax.servlet.http.HttpServletRequest;
+import javax.servlet.http.HttpServletResponse;
+import javax.ws.rs.Consumes;
+import javax.ws.rs.GET;
+import javax.ws.rs.POST;
+import javax.ws.rs.Path;
+import javax.ws.rs.PathParam;
+import javax.ws.rs.Produces;
+import javax.ws.rs.QueryParam;
+import javax.ws.rs.WebApplicationException;
+import javax.ws.rs.core.Context;
+import javax.ws.rs.core.MediaType;
+import javax.ws.rs.core.Response;
+import javax.xml.bind.annotation.XmlAccessType;
+import javax.xml.bind.annotation.XmlAccessorType;
+import javax.xml.bind.annotation.XmlElement;
+import javax.xml.bind.annotation.XmlRootElement;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience.Public;
+import org.apache.hadoop.classification.InterfaceStability.Unstable;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.yarn.api.records.timeline.TimelineEntities;
+import org.apache.hadoop.yarn.api.records.timeline.TimelineEntity;
+import org.apache.hadoop.yarn.api.records.timeline.TimelineEvents;
+import org.apache.hadoop.yarn.api.records.timeline.TimelinePutResponse;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.server.timeline.EntityIdentifier;
+import org.apache.hadoop.yarn.server.timeline.GenericObjectMapper;
+import org.apache.hadoop.yarn.server.timeline.NameValuePair;
+import org.apache.hadoop.yarn.server.timeline.TimelineStore;
+import org.apache.hadoop.yarn.server.timeline.TimelineReader.Field;
+import org.apache.hadoop.yarn.server.timeline.security.TimelineACLsManager;
+import org.apache.hadoop.yarn.util.timeline.TimelineUtils;
+import org.apache.hadoop.yarn.webapp.BadRequestException;
+import org.apache.hadoop.yarn.webapp.NotFoundException;
+
+import com.google.inject.Inject;
+import com.google.inject.Singleton;
+
+@Singleton
+@Path("/ws/v1/timeline")
+//TODO: support XML serialization/deserialization
+public class TimelineWebServices {
+
+  private static final Log LOG = LogFactory.getLog(TimelineWebServices.class);
+
+  private TimelineStore store;
+  private TimelineACLsManager timelineACLsManager;
+
+  @Inject
+  public TimelineWebServices(TimelineStore store,
+      TimelineACLsManager timelineACLsManager) {
+    this.store = store;
+    this.timelineACLsManager = timelineACLsManager;
+  }
+
+  @XmlRootElement(name = "about")
+  @XmlAccessorType(XmlAccessType.NONE)
+  @Public
+  @Unstable
+  public static class AboutInfo {
+
+    private String about;
+
+    public AboutInfo() {
+
+    }
+
+    public AboutInfo(String about) {
+      this.about = about;
+    }
+
+    @XmlElement(name = "About")
+    public String getAbout() {
+      return about;
+    }
+
+    public void setAbout(String about) {
+      this.about = about;
+    }
+
+  }
+
+  /**
+   * Return the description of the timeline web services.
+   */
+  @GET
+  @Produces({ MediaType.APPLICATION_JSON /* , MediaType.APPLICATION_XML */})
+  public AboutInfo about(
+      @Context HttpServletRequest req,
+      @Context HttpServletResponse res) {
+    init(res);
+    return new AboutInfo("Timeline API");
+  }
+
+  /**
+   * Return a list of entities that match the given parameters.
+   */
+  @GET
+  @Path("/{entityType}")
+  @Produces({ MediaType.APPLICATION_JSON /* , MediaType.APPLICATION_XML */})
+  public TimelineEntities getEntities(
+      @Context HttpServletRequest req,
+      @Context HttpServletResponse res,
+      @PathParam("entityType") String entityType,
+      @QueryParam("primaryFilter") String primaryFilter,
+      @QueryParam("secondaryFilter") String secondaryFilter,
+      @QueryParam("windowStart") String windowStart,
+      @QueryParam("windowEnd") String windowEnd,
+      @QueryParam("fromId") String fromId,
+      @QueryParam("fromTs") String fromTs,
+      @QueryParam("limit") String limit,
+      @QueryParam("fields") String fields) {
+    init(res);
+    TimelineEntities entities = null;
+    try {
+      EnumSet<Field> fieldEnums = parseFieldsStr(fields, ",");
+      boolean modified = extendFields(fieldEnums);
+      UserGroupInformation callerUGI = getUser(req);
+      entities = store.getEntities(
+          parseStr(entityType),
+          parseLongStr(limit),
+          parseLongStr(windowStart),
+          parseLongStr(windowEnd),
+          parseStr(fromId),
+          parseLongStr(fromTs),
+          parsePairStr(primaryFilter, ":"),
+          parsePairsStr(secondaryFilter, ",", ":"),
+          fieldEnums);
+      if (entities != null) {
+        Iterator<TimelineEntity> entitiesItr =
+            entities.getEntities().iterator();
+        while (entitiesItr.hasNext()) {
+          TimelineEntity entity = entitiesItr.next();
+          try {
+            // check ACLs
+            if (!timelineACLsManager.checkAccess(callerUGI, entity)) {
+              entitiesItr.remove();
+            } else {
+              // clean up system data
+              if (modified) {
+                entity.setPrimaryFilters(null);
+              } else {
+                cleanupOwnerInfo(entity);
+              }
+            }
+          } catch (YarnException e) {
+            LOG.error("Error when verifying access for user " + callerUGI
+                + " on the events of the timeline entity "
+                + new EntityIdentifier(entity.getEntityId(),
+                    entity.getEntityType()), e);
+            entitiesItr.remove();
+          }
+        }
+      }
+    } catch (NumberFormatException e) {
+      throw new BadRequestException(
+          "windowStart, windowEnd or limit is not a numeric value.");
+    } catch (IllegalArgumentException e) {
+      throw new BadRequestException("requested invalid field.");
+    } catch (IOException e) {
+      LOG.error("Error getting entities", e);
+      throw new WebApplicationException(e,
+          Response.Status.INTERNAL_SERVER_ERROR);
+    }
+    if (entities == null) {
+      return new TimelineEntities();
+    }
+    return entities;
+  }
+
+  /**
+   * Return a single entity of the given entity type and Id.
+   */
+  @GET
+  @Path("/{entityType}/{entityId}")
+  @Produces({ MediaType.APPLICATION_JSON /* , MediaType.APPLICATION_XML */})
+  public TimelineEntity getEntity(
+      @Context HttpServletRequest req,
+      @Context HttpServletResponse res,
+      @PathParam("entityType") String entityType,
+      @PathParam("entityId") String entityId,
+      @QueryParam("fields") String fields) {
+    init(res);
+    TimelineEntity entity = null;
+    try {
+      EnumSet<Field> fieldEnums = parseFieldsStr(fields, ",");
+      boolean modified = extendFields(fieldEnums);
+      entity =
+          store.getEntity(parseStr(entityId), parseStr(entityType),
+              fieldEnums);
+      if (entity != null) {
+        // check ACLs
+        UserGroupInformation callerUGI = getUser(req);
+        if (!timelineACLsManager.checkAccess(callerUGI, entity)) {
+          entity = null;
+        } else {
+          // clean up the system data
+          if (modified) {
+            entity.setPrimaryFilters(null);
+          } else {
+            cleanupOwnerInfo(entity);
+          }
+        }
+      }
+    } catch (IllegalArgumentException e) {
+      throw new BadRequestException(
+          "requested invalid field.");
+    } catch (IOException e) {
+      LOG.error("Error getting entity", e);
+      throw new WebApplicationException(e,
+          Response.Status.INTERNAL_SERVER_ERROR);
+    } catch (YarnException e) {
+      LOG.error("Error getting entity", e);
+      throw new WebApplicationException(e,
+          Response.Status.INTERNAL_SERVER_ERROR);
+    }
+    if (entity == null) {
+      throw new NotFoundException("Timeline entity "
+          + new EntityIdentifier(parseStr(entityId), parseStr(entityType))
+          + " is not found");
+    }
+    return entity;
+  }
+
+  /**
+   * Return the events that match the given parameters.
+   */
+  @GET
+  @Path("/{entityType}/events")
+  @Produces({ MediaType.APPLICATION_JSON /* , MediaType.APPLICATION_XML */})
+  public TimelineEvents getEvents(
+      @Context HttpServletRequest req,
+      @Context HttpServletResponse res,
+      @PathParam("entityType") String entityType,
+      @QueryParam("entityId") String entityId,
+      @QueryParam("eventType") String eventType,
+      @QueryParam("windowStart") String windowStart,
+      @QueryParam("windowEnd") String windowEnd,
+      @QueryParam("limit") String limit) {
+    init(res);
+    TimelineEvents events = null;
+    try {
+      UserGroupInformation callerUGI = getUser(req);
+      events = store.getEntityTimelines(
+          parseStr(entityType),
+          parseArrayStr(entityId, ","),
+          parseLongStr(limit),
+          parseLongStr(windowStart),
+          parseLongStr(windowEnd),
+          parseArrayStr(eventType, ","));
+      if (events != null) {
+        Iterator<TimelineEvents.EventsOfOneEntity> eventsItr =
+            events.getAllEvents().iterator();
+        while (eventsItr.hasNext()) {
+          TimelineEvents.EventsOfOneEntity eventsOfOneEntity = eventsItr.next();
+          try {
+            TimelineEntity entity = store.getEntity(
+                eventsOfOneEntity.getEntityId(),
+                eventsOfOneEntity.getEntityType(),
+                EnumSet.of(Field.PRIMARY_FILTERS));
+            // check ACLs
+            if (!timelineACLsManager.checkAccess(callerUGI, entity)) {
+              eventsItr.remove();
+            }
+          } catch (Exception e) {
+            LOG.error("Error when verifying access for user " + callerUGI
+                + " on the events of the timeline entity "
+                + new EntityIdentifier(eventsOfOneEntity.getEntityId(),
+                    eventsOfOneEntity.getEntityType()), e);
+            eventsItr.remove();
+          }
+        }
+      }
+    } catch (NumberFormatException e) {
+      throw new BadRequestException(
+          "windowStart, windowEnd or limit is not a numeric value.");
+    } catch (IOException e) {
+      LOG.error("Error getting entity timelines", e);
+      throw new WebApplicationException(e,
+          Response.Status.INTERNAL_SERVER_ERROR);
+    }
+    if (events == null) {
+      return new TimelineEvents();
+    }
+    return events;
+  }
+
+  /**
+   * Store the given entities into the timeline store, and return the errors
+   * that happen during storing.
+   */
+  @POST
+  @Consumes({ MediaType.APPLICATION_JSON /* , MediaType.APPLICATION_XML */})
+  public TimelinePutResponse postEntities(
+      @Context HttpServletRequest req,
+      @Context HttpServletResponse res,
+      TimelineEntities entities) {
+    init(res);
+    if (entities == null) {
+      return new TimelinePutResponse();
+    }
+    UserGroupInformation callerUGI = getUser(req);
+    try {
+      List<EntityIdentifier> entityIDs = new ArrayList<EntityIdentifier>();
+      TimelineEntities entitiesToPut = new TimelineEntities();
+      List<TimelinePutResponse.TimelinePutError> errors =
+          new ArrayList<TimelinePutResponse.TimelinePutError>();
+      for (TimelineEntity entity : entities.getEntities()) {
+        EntityIdentifier entityID =
+            new EntityIdentifier(entity.getEntityId(), entity.getEntityType());
+
+        // check if there is existing entity
+        TimelineEntity existingEntity = null;
+        try {
+          existingEntity =
+              store.getEntity(entityID.getId(), entityID.getType(),
+                  EnumSet.of(Field.PRIMARY_FILTERS));
+          if (existingEntity != null
+              && !timelineACLsManager.checkAccess(callerUGI, existingEntity)) {
+            throw new YarnException("The timeline entity " + entityID
+                + " was not put by " + callerUGI + " before");
+          }
+        } catch (Exception e) {
+          // Skip the entity which already exists and was put by others
+          LOG.warn("Skip the timeline entity: " + entityID + ", because "
+              + e.getMessage());
+          TimelinePutResponse.TimelinePutError error =
+              new TimelinePutResponse.TimelinePutError();
+          error.setEntityId(entityID.getId());
+          error.setEntityType(entityID.getType());
+          error.setErrorCode(
+              TimelinePutResponse.TimelinePutError.ACCESS_DENIED);
+          errors.add(error);
+          continue;
+        }
+
+        // inject owner information for the access check if this is the first
+        // time to post the entity, in case it's the admin who is updating
+        // the timeline data.
+        try {
+          if (existingEntity == null) {
+            injectOwnerInfo(entity,
+                callerUGI == null ? "" : callerUGI.getShortUserName());
+          }
+        } catch (YarnException e) {
+          // Skip the entity which messes up the primary filter and record the
+          // error
+          LOG.warn("Skip the timeline entity: " + entityID + ", because "
+              + e.getMessage());
+          TimelinePutResponse.TimelinePutError error =
+              new TimelinePutResponse.TimelinePutError();
+          error.setEntityId(entityID.getId());
+          error.setEntityType(entityID.getType());
+          error.setErrorCode(
+              TimelinePutResponse.TimelinePutError.SYSTEM_FILTER_CONFLICT);
+          errors.add(error);
+          continue;
+        }
+
+        entityIDs.add(entityID);
+        entitiesToPut.addEntity(entity);
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("Storing the entity " + entityID + ", JSON-style content: "
+              + TimelineUtils.dumpTimelineRecordtoJSON(entity));
+        }
+      }
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Storing entities: " + CSV_JOINER.join(entityIDs));
+      }
+      TimelinePutResponse response =  store.put(entitiesToPut);
+      // add the errors of timeline system filter key conflict
+      response.addErrors(errors);
+      return response;
+    } catch (IOException e) {
+      LOG.error("Error putting entities", e);
+      throw new WebApplicationException(e,
+          Response.Status.INTERNAL_SERVER_ERROR);
+    }
+  }
+
+  private void init(HttpServletResponse response) {
+    response.setContentType(null);
+  }
+
+  private static SortedSet<String> parseArrayStr(String str, String delimiter) {
+    if (str == null) {
+      return null;
+    }
+    SortedSet<String> strSet = new TreeSet<String>();
+    String[] strs = str.split(delimiter);
+    for (String aStr : strs) {
+      strSet.add(aStr.trim());
+    }
+    return strSet;
+  }
+
+  private static NameValuePair parsePairStr(String str, String delimiter) {
+    if (str == null) {
+      return null;
+    }
+    String[] strs = str.split(delimiter, 2);
+    try {
+      return new NameValuePair(strs[0].trim(),
+          GenericObjectMapper.OBJECT_READER.readValue(strs[1].trim()));
+    } catch (Exception e) {
+      // didn't work as an Object, keep it as a String
+      return new NameValuePair(strs[0].trim(), strs[1].trim());
+    }
+  }
+
+  private static Collection<NameValuePair> parsePairsStr(
+      String str, String aDelimiter, String pDelimiter) {
+    if (str == null) {
+      return null;
+    }
+    String[] strs = str.split(aDelimiter);
+    Set<NameValuePair> pairs = new HashSet<NameValuePair>();
+    for (String aStr : strs) {
+      pairs.add(parsePairStr(aStr, pDelimiter));
+    }
+    return pairs;
+  }
+
+  private static EnumSet<Field> parseFieldsStr(String str, String delimiter) {
+    if (str == null) {
+      return null;
+    }
+    String[] strs = str.split(delimiter);
+    List<Field> fieldList = new ArrayList<Field>();
+    for (String s : strs) {
+      s = s.trim().toUpperCase();
+      if (s.equals("EVENTS")) {
+        fieldList.add(Field.EVENTS);
+      } else if (s.equals("LASTEVENTONLY")) {
+        fieldList.add(Field.LAST_EVENT_ONLY);
+      } else if (s.equals("RELATEDENTITIES")) {
+        fieldList.add(Field.RELATED_ENTITIES);
+      } else if (s.equals("PRIMARYFILTERS")) {
+        fieldList.add(Field.PRIMARY_FILTERS);
+      } else if (s.equals("OTHERINFO")) {
+        fieldList.add(Field.OTHER_INFO);
+      } else {
+        throw new IllegalArgumentException("Requested nonexistent field " + s);
+      }
+    }
+    if (fieldList.size() == 0) {
+      return null;
+    }
+    Field f1 = fieldList.remove(fieldList.size() - 1);
+    if (fieldList.size() == 0) {
+      return EnumSet.of(f1);
+    } else {
+      return EnumSet.of(f1, fieldList.toArray(new Field[fieldList.size()]));
+    }
+  }
+
+  private static boolean extendFields(EnumSet<Field> fieldEnums) {
+    boolean modified = false;
+    if (fieldEnums != null && !fieldEnums.contains(Field.PRIMARY_FILTERS)) {
+      fieldEnums.add(Field.PRIMARY_FILTERS);
+      modified = true;
+    }
+    return modified;
+  }
+  private static Long parseLongStr(String str) {
+    return str == null ? null : Long.parseLong(str.trim());
+  }
+
+  private static String parseStr(String str) {
+    return str == null ? null : str.trim();
+  }
+
+  private static UserGroupInformation getUser(HttpServletRequest req) {
+    String remoteUser = req.getRemoteUser();
+    UserGroupInformation callerUGI = null;
+    if (remoteUser != null) {
+      callerUGI = UserGroupInformation.createRemoteUser(remoteUser);
+    }
+    return callerUGI;
+  }
+
+  private static void injectOwnerInfo(TimelineEntity timelineEntity,
+      String owner) throws YarnException {
+    if (timelineEntity.getPrimaryFilters() != null &&
+        timelineEntity.getPrimaryFilters().containsKey(
+            TimelineStore.SystemFilter.ENTITY_OWNER)) {
+      throw new YarnException(
+          "User should not use the timeline system filter key: "
+              + TimelineStore.SystemFilter.ENTITY_OWNER);
+    }
+    timelineEntity.addPrimaryFilter(
+        TimelineStore.SystemFilter.ENTITY_OWNER
+            .toString(), owner);
+  }
+
+  private static void cleanupOwnerInfo(TimelineEntity timelineEntity) {
+    if (timelineEntity.getPrimaryFilters() != null) {
+      timelineEntity.getPrimaryFilters().remove(
+          TimelineStore.SystemFilter.ENTITY_OWNER.toString());
+    }
+  }
+
+}

Added: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/test/java/org/apache/hadoop/yarn/server/timeline/TestGenericObjectMapper.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/test/java/org/apache/hadoop/yarn/server/timeline/TestGenericObjectMapper.java?rev=1598094&view=auto
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/test/java/org/apache/hadoop/yarn/server/timeline/TestGenericObjectMapper.java (added)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/test/java/org/apache/hadoop/yarn/server/timeline/TestGenericObjectMapper.java Wed May 28 18:09:04 2014
@@ -0,0 +1,102 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.timeline;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.io.WritableComparator;
+import org.apache.hadoop.yarn.server.timeline.GenericObjectMapper;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static org.junit.Assert.assertEquals;
+
+@InterfaceAudience.Private
+@InterfaceStability.Unstable
+public class TestGenericObjectMapper {
+
+  @Test
+  public void testEncoding() {
+    testEncoding(Long.MAX_VALUE);
+    testEncoding(Long.MIN_VALUE);
+    testEncoding(0l);
+    testEncoding(128l);
+    testEncoding(256l);
+    testEncoding(512l);
+    testEncoding(-256l);
+  }
+
+  private static void testEncoding(long l) {
+    byte[] b = GenericObjectMapper.writeReverseOrderedLong(l);
+    assertEquals("error decoding", l,
+        GenericObjectMapper.readReverseOrderedLong(b, 0));
+    byte[] buf = new byte[16];
+    System.arraycopy(b, 0, buf, 5, 8);
+    assertEquals("error decoding at offset", l,
+        GenericObjectMapper.readReverseOrderedLong(buf, 5));
+    if (l > Long.MIN_VALUE) {
+      byte[] a = GenericObjectMapper.writeReverseOrderedLong(l-1);
+      assertEquals("error preserving ordering", 1,
+          WritableComparator.compareBytes(a, 0, a.length, b, 0, b.length));
+    }
+    if (l < Long.MAX_VALUE) {
+      byte[] c = GenericObjectMapper.writeReverseOrderedLong(l+1);
+      assertEquals("error preserving ordering", 1,
+          WritableComparator.compareBytes(b, 0, b.length, c, 0, c.length));
+    }
+  }
+
+  private static void verify(Object o) throws IOException {
+    assertEquals(o, GenericObjectMapper.read(GenericObjectMapper.write(o)));
+  }
+
+  @Test
+  public void testValueTypes() throws IOException {
+    verify(Integer.MAX_VALUE);
+    verify(Integer.MIN_VALUE);
+    assertEquals(Integer.MAX_VALUE, GenericObjectMapper.read(
+        GenericObjectMapper.write((long) Integer.MAX_VALUE)));
+    assertEquals(Integer.MIN_VALUE, GenericObjectMapper.read(
+        GenericObjectMapper.write((long) Integer.MIN_VALUE)));
+    verify((long)Integer.MAX_VALUE + 1l);
+    verify((long)Integer.MIN_VALUE - 1l);
+
+    verify(Long.MAX_VALUE);
+    verify(Long.MIN_VALUE);
+
+    assertEquals(42, GenericObjectMapper.read(GenericObjectMapper.write(42l)));
+    verify(42);
+    verify(1.23);
+    verify("abc");
+    verify(true);
+    List<String> list = new ArrayList<String>();
+    list.add("123");
+    list.add("abc");
+    verify(list);
+    Map<String,String> map = new HashMap<String,String>();
+    map.put("k1","v1");
+    map.put("k2","v2");
+    verify(map);
+  }
+
+}