You are viewing a plain text version of this content. The canonical link for it is here.
Posted to yarn-commits@hadoop.apache.org by vi...@apache.org on 2014/02/22 21:55:07 UTC
svn commit: r1570922 [2/3] - in /hadoop/common/trunk/hadoop-yarn-project: ./
hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/apptimeline/
hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/timeline...
Added: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/timeline/LeveldbTimelineStore.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/timeline/LeveldbTimelineStore.java?rev=1570922&view=auto
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/timeline/LeveldbTimelineStore.java (added)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/timeline/LeveldbTimelineStore.java Sat Feb 22 20:55:06 2014
@@ -0,0 +1,873 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.applicationhistoryservice.timeline;
+
+import java.io.ByteArrayOutputStream;
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.EnumSet;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+import java.util.SortedSet;
+import java.util.TreeMap;
+
+import com.google.common.annotations.VisibleForTesting;
+import org.apache.commons.collections.map.LRUMap;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.io.WritableComparator;
+import org.apache.hadoop.service.AbstractService;
+import org.apache.hadoop.yarn.api.records.timeline.TimelineEntities;
+import org.apache.hadoop.yarn.api.records.timeline.TimelineEntity;
+import org.apache.hadoop.yarn.api.records.timeline.TimelineEvent;
+import org.apache.hadoop.yarn.api.records.timeline.TimelineEvents;
+import org.apache.hadoop.yarn.api.records.timeline.TimelinePutResponse;
+import org.apache.hadoop.yarn.api.records.timeline.TimelineEvents.EventsOfOneEntity;
+import org.apache.hadoop.yarn.api.records.timeline.TimelinePutResponse.TimelinePutError;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.fusesource.leveldbjni.JniDBFactory;
+import org.iq80.leveldb.DB;
+import org.iq80.leveldb.DBIterator;
+import org.iq80.leveldb.Options;
+import org.iq80.leveldb.WriteBatch;
+
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.timeline.GenericObjectMapper.readReverseOrderedLong;
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.timeline.GenericObjectMapper.writeReverseOrderedLong;
+
+/**
+ * An implementation of a timeline store backed by leveldb.
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Unstable
+public class LeveldbTimelineStore extends AbstractService
+ implements TimelineStore {
+ private static final Log LOG = LogFactory
+ .getLog(LeveldbTimelineStore.class);
+
+ private static final String FILENAME = "leveldb-timeline-store.ldb";
+
+ private static final byte[] START_TIME_LOOKUP_PREFIX = "k".getBytes();
+ private static final byte[] ENTITY_ENTRY_PREFIX = "e".getBytes();
+ private static final byte[] INDEXED_ENTRY_PREFIX = "i".getBytes();
+
+ private static final byte[] PRIMARY_FILTER_COLUMN = "f".getBytes();
+ private static final byte[] OTHER_INFO_COLUMN = "i".getBytes();
+ private static final byte[] RELATED_COLUMN = "r".getBytes();
+ private static final byte[] TIME_COLUMN = "t".getBytes();
+
+ private static final byte[] EMPTY_BYTES = new byte[0];
+
+ private static final int START_TIME_CACHE_SIZE = 10000;
+
+ @SuppressWarnings("unchecked")
+ private final Map<EntityIdentifier, Long> startTimeCache =
+ Collections.synchronizedMap(new LRUMap(START_TIME_CACHE_SIZE));
+
+ private DB db;
+
+ public LeveldbTimelineStore() {
+ super(LeveldbTimelineStore.class.getName());
+ }
+
+ @Override
+ protected void serviceInit(Configuration conf) throws Exception {
+ Options options = new Options();
+ options.createIfMissing(true);
+ JniDBFactory factory = new JniDBFactory();
+ String path = conf.get(YarnConfiguration.TIMELINE_SERVICE_LEVELDB_PATH);
+ File p = new File(path);
+ if (!p.exists())
+ if (!p.mkdirs())
+ throw new IOException("Couldn't create directory for leveldb " +
+ "timeline store " + path);
+ LOG.info("Using leveldb path " + path);
+ db = factory.open(new File(path, FILENAME), options);
+ super.serviceInit(conf);
+ }
+
+ @Override
+ protected void serviceStop() throws Exception {
+ IOUtils.cleanup(LOG, db);
+ super.serviceStop();
+ }
+
+ private static class KeyBuilder {
+ private static final int MAX_NUMBER_OF_KEY_ELEMENTS = 10;
+ private byte[][] b;
+ private boolean[] useSeparator;
+ private int index;
+ private int length;
+
+ public KeyBuilder(int size) {
+ b = new byte[size][];
+ useSeparator = new boolean[size];
+ index = 0;
+ length = 0;
+ }
+
+ public static KeyBuilder newInstance() {
+ return new KeyBuilder(MAX_NUMBER_OF_KEY_ELEMENTS);
+ }
+
+ public KeyBuilder add(String s) {
+ return add(s.getBytes(), true);
+ }
+
+ public KeyBuilder add(byte[] t) {
+ return add(t, false);
+ }
+
+ public KeyBuilder add(byte[] t, boolean sep) {
+ b[index] = t;
+ useSeparator[index] = sep;
+ length += t.length;
+ if (sep)
+ length++;
+ index++;
+ return this;
+ }
+
+ public byte[] getBytes() throws IOException {
+ ByteArrayOutputStream baos = new ByteArrayOutputStream(length);
+ for (int i = 0; i < index; i++) {
+ baos.write(b[i]);
+ if (i < index-1 && useSeparator[i])
+ baos.write(0x0);
+ }
+ return baos.toByteArray();
+ }
+
+ public byte[] getBytesForLookup() throws IOException {
+ ByteArrayOutputStream baos = new ByteArrayOutputStream(length);
+ for (int i = 0; i < index; i++) {
+ baos.write(b[i]);
+ if (useSeparator[i])
+ baos.write(0x0);
+ }
+ return baos.toByteArray();
+ }
+ }
+
+ private static class KeyParser {
+ private final byte[] b;
+ private int offset;
+
+ public KeyParser(byte[] b, int offset) {
+ this.b = b;
+ this.offset = offset;
+ }
+
+ public String getNextString() throws IOException {
+ if (offset >= b.length)
+ throw new IOException(
+ "tried to read nonexistent string from byte array");
+ int i = 0;
+ while (offset+i < b.length && b[offset+i] != 0x0)
+ i++;
+ String s = new String(b, offset, i);
+ offset = offset + i + 1;
+ return s;
+ }
+
+ public long getNextLong() throws IOException {
+ if (offset+8 >= b.length)
+ throw new IOException("byte array ran out when trying to read long");
+ long l = readReverseOrderedLong(b, offset);
+ offset += 8;
+ return l;
+ }
+
+ public int getOffset() {
+ return offset;
+ }
+ }
+
+ @Override
+ public TimelineEntity getEntity(String entityId, String entityType,
+ EnumSet<Field> fields) throws IOException {
+ DBIterator iterator = null;
+ try {
+ byte[] revStartTime = getStartTime(entityId, entityType, null, null, null);
+ if (revStartTime == null)
+ return null;
+ byte[] prefix = KeyBuilder.newInstance().add(ENTITY_ENTRY_PREFIX)
+ .add(entityType).add(revStartTime).add(entityId).getBytesForLookup();
+
+ iterator = db.iterator();
+ iterator.seek(prefix);
+
+ return getEntity(entityId, entityType,
+ readReverseOrderedLong(revStartTime, 0), fields, iterator, prefix,
+ prefix.length);
+ } finally {
+ IOUtils.cleanup(LOG, iterator);
+ }
+ }
+
+ /**
+ * Read entity from a db iterator. If no information is found in the
+ * specified fields for this entity, return null.
+ */
+ private static TimelineEntity getEntity(String entityId, String entityType,
+ Long startTime, EnumSet<Field> fields, DBIterator iterator,
+ byte[] prefix, int prefixlen) throws IOException {
+ if (fields == null)
+ fields = EnumSet.allOf(Field.class);
+
+ TimelineEntity entity = new TimelineEntity();
+ boolean events = false;
+ boolean lastEvent = false;
+ if (fields.contains(Field.EVENTS)) {
+ events = true;
+ entity.setEvents(new ArrayList<TimelineEvent>());
+ } else if (fields.contains(Field.LAST_EVENT_ONLY)) {
+ lastEvent = true;
+ entity.setEvents(new ArrayList<TimelineEvent>());
+ }
+ else {
+ entity.setEvents(null);
+ }
+ boolean relatedEntities = false;
+ if (fields.contains(Field.RELATED_ENTITIES)) {
+ relatedEntities = true;
+ } else {
+ entity.setRelatedEntities(null);
+ }
+ boolean primaryFilters = false;
+ if (fields.contains(Field.PRIMARY_FILTERS)) {
+ primaryFilters = true;
+ } else {
+ entity.setPrimaryFilters(null);
+ }
+ boolean otherInfo = false;
+ if (fields.contains(Field.OTHER_INFO)) {
+ otherInfo = true;
+ entity.setOtherInfo(new HashMap<String, Object>());
+ } else {
+ entity.setOtherInfo(null);
+ }
+
+ // iterate through the entity's entry, parsing information if it is part
+ // of a requested field
+ for (; iterator.hasNext(); iterator.next()) {
+ byte[] key = iterator.peekNext().getKey();
+ if (!prefixMatches(prefix, prefixlen, key))
+ break;
+ if (key[prefixlen] == PRIMARY_FILTER_COLUMN[0]) {
+ if (primaryFilters) {
+ addPrimaryFilter(entity, key,
+ prefixlen + PRIMARY_FILTER_COLUMN.length);
+ }
+ } else if (key[prefixlen] == OTHER_INFO_COLUMN[0]) {
+ if (otherInfo) {
+ entity.addOtherInfo(parseRemainingKey(key,
+ prefixlen + OTHER_INFO_COLUMN.length),
+ GenericObjectMapper.read(iterator.peekNext().getValue()));
+ }
+ } else if (key[prefixlen] == RELATED_COLUMN[0]) {
+ if (relatedEntities) {
+ addRelatedEntity(entity, key,
+ prefixlen + RELATED_COLUMN.length);
+ }
+ } else if (key[prefixlen] == TIME_COLUMN[0]) {
+ if (events || (lastEvent && entity.getEvents().size() == 0)) {
+ TimelineEvent event = getEntityEvent(null, key, prefixlen +
+ TIME_COLUMN.length, iterator.peekNext().getValue());
+ if (event != null) {
+ entity.addEvent(event);
+ }
+ }
+ } else {
+ LOG.warn(String.format("Found unexpected column for entity %s of " +
+ "type %s (0x%02x)", entityId, entityType, key[prefixlen]));
+ }
+ }
+
+ entity.setEntityId(entityId);
+ entity.setEntityType(entityType);
+ entity.setStartTime(startTime);
+
+ return entity;
+ }
+
+ @Override
+ public TimelineEvents getEntityTimelines(String entityType,
+ SortedSet<String> entityIds, Long limit, Long windowStart,
+ Long windowEnd, Set<String> eventType) throws IOException {
+ TimelineEvents events = new TimelineEvents();
+ if (entityIds == null || entityIds.isEmpty())
+ return events;
+ // create a lexicographically-ordered map from start time to entities
+ Map<byte[], List<EntityIdentifier>> startTimeMap = new TreeMap<byte[],
+ List<EntityIdentifier>>(new Comparator<byte[]>() {
+ @Override
+ public int compare(byte[] o1, byte[] o2) {
+ return WritableComparator.compareBytes(o1, 0, o1.length, o2, 0,
+ o2.length);
+ }
+ });
+ DBIterator iterator = null;
+ try {
+ // look up start times for the specified entities
+ // skip entities with no start time
+ for (String entity : entityIds) {
+ byte[] startTime = getStartTime(entity, entityType, null, null, null);
+ if (startTime != null) {
+ List<EntityIdentifier> entities = startTimeMap.get(startTime);
+ if (entities == null) {
+ entities = new ArrayList<EntityIdentifier>();
+ startTimeMap.put(startTime, entities);
+ }
+ entities.add(new EntityIdentifier(entity, entityType));
+ }
+ }
+ for (Entry<byte[], List<EntityIdentifier>> entry :
+ startTimeMap.entrySet()) {
+ // look up the events matching the given parameters (limit,
+ // start time, end time, event types) for entities whose start times
+ // were found and add the entities to the return list
+ byte[] revStartTime = entry.getKey();
+ for (EntityIdentifier entityID : entry.getValue()) {
+ EventsOfOneEntity entity = new EventsOfOneEntity();
+ entity.setEntityId(entityID.getId());
+ entity.setEntityType(entityType);
+ events.addEvent(entity);
+ KeyBuilder kb = KeyBuilder.newInstance().add(ENTITY_ENTRY_PREFIX)
+ .add(entityType).add(revStartTime).add(entityID.getId())
+ .add(TIME_COLUMN);
+ byte[] prefix = kb.getBytesForLookup();
+ if (windowEnd == null) {
+ windowEnd = Long.MAX_VALUE;
+ }
+ byte[] revts = writeReverseOrderedLong(windowEnd);
+ kb.add(revts);
+ byte[] first = kb.getBytesForLookup();
+ byte[] last = null;
+ if (windowStart != null) {
+ last = KeyBuilder.newInstance().add(prefix)
+ .add(writeReverseOrderedLong(windowStart)).getBytesForLookup();
+ }
+ if (limit == null) {
+ limit = DEFAULT_LIMIT;
+ }
+ iterator = db.iterator();
+ for (iterator.seek(first); entity.getEvents().size() < limit &&
+ iterator.hasNext(); iterator.next()) {
+ byte[] key = iterator.peekNext().getKey();
+ if (!prefixMatches(prefix, prefix.length, key) || (last != null &&
+ WritableComparator.compareBytes(key, 0, key.length, last, 0,
+ last.length) > 0))
+ break;
+ TimelineEvent event = getEntityEvent(eventType, key, prefix.length,
+ iterator.peekNext().getValue());
+ if (event != null)
+ entity.addEvent(event);
+ }
+ }
+ }
+ } finally {
+ IOUtils.cleanup(LOG, iterator);
+ }
+ return events;
+ }
+
+ /**
+ * Returns true if the byte array begins with the specified prefix.
+ */
+ private static boolean prefixMatches(byte[] prefix, int prefixlen,
+ byte[] b) {
+ if (b.length < prefixlen)
+ return false;
+ return WritableComparator.compareBytes(prefix, 0, prefixlen, b, 0,
+ prefixlen) == 0;
+ }
+
+ @Override
+ public TimelineEntities getEntities(String entityType,
+ Long limit, Long windowStart, Long windowEnd,
+ NameValuePair primaryFilter, Collection<NameValuePair> secondaryFilters,
+ EnumSet<Field> fields) throws IOException {
+ if (primaryFilter == null) {
+ // if no primary filter is specified, prefix the lookup with
+ // ENTITY_ENTRY_PREFIX
+ return getEntityByTime(ENTITY_ENTRY_PREFIX, entityType, limit,
+ windowStart, windowEnd, secondaryFilters, fields);
+ } else {
+ // if a primary filter is specified, prefix the lookup with
+ // INDEXED_ENTRY_PREFIX + primaryFilterName + primaryFilterValue +
+ // ENTITY_ENTRY_PREFIX
+ byte[] base = KeyBuilder.newInstance().add(INDEXED_ENTRY_PREFIX)
+ .add(primaryFilter.getName())
+ .add(GenericObjectMapper.write(primaryFilter.getValue()), true)
+ .add(ENTITY_ENTRY_PREFIX).getBytesForLookup();
+ return getEntityByTime(base, entityType, limit, windowStart, windowEnd,
+ secondaryFilters, fields);
+ }
+ }
+
+ /**
+ * Retrieves a list of entities satisfying given parameters.
+ *
+ * @param base A byte array prefix for the lookup
+ * @param entityType The type of the entity
+ * @param limit A limit on the number of entities to return
+ * @param starttime The earliest entity start time to retrieve (exclusive)
+ * @param endtime The latest entity start time to retrieve (inclusive)
+ * @param secondaryFilters Filter pairs that the entities should match
+ * @param fields The set of fields to retrieve
+ * @return A list of entities
+ * @throws IOException
+ */
+ private TimelineEntities getEntityByTime(byte[] base,
+ String entityType, Long limit, Long starttime, Long endtime,
+ Collection<NameValuePair> secondaryFilters, EnumSet<Field> fields)
+ throws IOException {
+ DBIterator iterator = null;
+ try {
+ KeyBuilder kb = KeyBuilder.newInstance().add(base).add(entityType);
+ // only db keys matching the prefix (base + entity type) will be parsed
+ byte[] prefix = kb.getBytesForLookup();
+ if (endtime == null) {
+ // if end time is null, place no restriction on end time
+ endtime = Long.MAX_VALUE;
+ }
+ // using end time, construct a first key that will be seeked to
+ byte[] revts = writeReverseOrderedLong(endtime);
+ kb.add(revts);
+ byte[] first = kb.getBytesForLookup();
+ byte[] last = null;
+ if (starttime != null) {
+ // if start time is not null, set a last key that will not be
+ // iterated past
+ last = KeyBuilder.newInstance().add(base).add(entityType)
+ .add(writeReverseOrderedLong(starttime)).getBytesForLookup();
+ }
+ if (limit == null) {
+ // if limit is not specified, use the default
+ limit = DEFAULT_LIMIT;
+ }
+
+ TimelineEntities entities = new TimelineEntities();
+ iterator = db.iterator();
+ iterator.seek(first);
+ // iterate until one of the following conditions is met: limit is
+ // reached, there are no more keys, the key prefix no longer matches,
+ // or a start time has been specified and reached/exceeded
+ while (entities.getEntities().size() < limit && iterator.hasNext()) {
+ byte[] key = iterator.peekNext().getKey();
+ if (!prefixMatches(prefix, prefix.length, key) || (last != null &&
+ WritableComparator.compareBytes(key, 0, key.length, last, 0,
+ last.length) > 0))
+ break;
+ // read the start time and entityId from the current key
+ KeyParser kp = new KeyParser(key, prefix.length);
+ Long startTime = kp.getNextLong();
+ String entityId = kp.getNextString();
+ // parse the entity that owns this key, iterating over all keys for
+ // the entity
+ TimelineEntity entity = getEntity(entityId, entityType, startTime,
+ fields, iterator, key, kp.getOffset());
+ if (entity == null)
+ continue;
+ // determine if the retrieved entity matches the provided secondary
+ // filters, and if so add it to the list of entities to return
+ boolean filterPassed = true;
+ if (secondaryFilters != null) {
+ for (NameValuePair filter : secondaryFilters) {
+ Object v = entity.getOtherInfo().get(filter.getName());
+ if (v == null) {
+ Set<Object> vs = entity.getPrimaryFilters()
+ .get(filter.getName());
+ if (vs != null && !vs.contains(filter.getValue())) {
+ filterPassed = false;
+ break;
+ }
+ } else if (!v.equals(filter.getValue())) {
+ filterPassed = false;
+ break;
+ }
+ }
+ }
+ if (filterPassed)
+ entities.addEntity(entity);
+ }
+ return entities;
+ } finally {
+ IOUtils.cleanup(LOG, iterator);
+ }
+ }
+
+ /**
+ * Put a single entity. If there is an error, add a TimelinePutError to the given
+ * response.
+ */
+ private void put(TimelineEntity entity, TimelinePutResponse response) {
+ WriteBatch writeBatch = null;
+ try {
+ writeBatch = db.createWriteBatch();
+ List<TimelineEvent> events = entity.getEvents();
+ // look up the start time for the entity
+ byte[] revStartTime = getStartTime(entity.getEntityId(),
+ entity.getEntityType(), entity.getStartTime(), events,
+ writeBatch);
+ if (revStartTime == null) {
+ // if no start time is found, add an error and return
+ TimelinePutError error = new TimelinePutError();
+ error.setEntityId(entity.getEntityId());
+ error.setEntityType(entity.getEntityType());
+ error.setErrorCode(TimelinePutError.NO_START_TIME);
+ response.addError(error);
+ return;
+ }
+ Long revStartTimeLong = readReverseOrderedLong(revStartTime, 0);
+ Map<String, Set<Object>> primaryFilters = entity.getPrimaryFilters();
+
+ // write event entries
+ if (events != null && !events.isEmpty()) {
+ for (TimelineEvent event : events) {
+ byte[] revts = writeReverseOrderedLong(event.getTimestamp());
+ byte[] key = createEntityEventKey(entity.getEntityId(),
+ entity.getEntityType(), revStartTime, revts,
+ event.getEventType());
+ byte[] value = GenericObjectMapper.write(event.getEventInfo());
+ writeBatch.put(key, value);
+ writePrimaryFilterEntries(writeBatch, primaryFilters, key, value);
+ }
+ }
+
+ // write related entity entries
+ Map<String, Set<String>> relatedEntities =
+ entity.getRelatedEntities();
+ if (relatedEntities != null && !relatedEntities.isEmpty()) {
+ for (Entry<String, Set<String>> relatedEntityList :
+ relatedEntities.entrySet()) {
+ String relatedEntityType = relatedEntityList.getKey();
+ for (String relatedEntityId : relatedEntityList.getValue()) {
+ // look up start time of related entity
+ byte[] relatedEntityStartTime = getStartTime(relatedEntityId,
+ relatedEntityType, null, null, writeBatch);
+ if (relatedEntityStartTime == null) {
+ // if start time is not found, set start time of the related
+ // entity to the start time of this entity, and write it to the
+ // db and the cache
+ relatedEntityStartTime = revStartTime;
+ writeBatch.put(createStartTimeLookupKey(relatedEntityId,
+ relatedEntityType), relatedEntityStartTime);
+ startTimeCache.put(new EntityIdentifier(relatedEntityId,
+ relatedEntityType), revStartTimeLong);
+ }
+ // write reverse entry (related entity -> entity)
+ byte[] key = createReleatedEntityKey(relatedEntityId,
+ relatedEntityType, relatedEntityStartTime,
+ entity.getEntityId(), entity.getEntityType());
+ writeBatch.put(key, EMPTY_BYTES);
+ // TODO: write forward entry (entity -> related entity)?
+ }
+ }
+ }
+
+ // write primary filter entries
+ if (primaryFilters != null && !primaryFilters.isEmpty()) {
+ for (Entry<String, Set<Object>> primaryFilter :
+ primaryFilters.entrySet()) {
+ for (Object primaryFilterValue : primaryFilter.getValue()) {
+ byte[] key = createPrimaryFilterKey(entity.getEntityId(),
+ entity.getEntityType(), revStartTime,
+ primaryFilter.getKey(), primaryFilterValue);
+ writeBatch.put(key, EMPTY_BYTES);
+ writePrimaryFilterEntries(writeBatch, primaryFilters, key,
+ EMPTY_BYTES);
+ }
+ }
+ }
+
+ // write other info entries
+ Map<String, Object> otherInfo = entity.getOtherInfo();
+ if (otherInfo != null && !otherInfo.isEmpty()) {
+ for (Entry<String, Object> i : otherInfo.entrySet()) {
+ byte[] key = createOtherInfoKey(entity.getEntityId(),
+ entity.getEntityType(), revStartTime, i.getKey());
+ byte[] value = GenericObjectMapper.write(i.getValue());
+ writeBatch.put(key, value);
+ writePrimaryFilterEntries(writeBatch, primaryFilters, key, value);
+ }
+ }
+ db.write(writeBatch);
+ } catch (IOException e) {
+ LOG.error("Error putting entity " + entity.getEntityId() +
+ " of type " + entity.getEntityType(), e);
+ TimelinePutError error = new TimelinePutError();
+ error.setEntityId(entity.getEntityId());
+ error.setEntityType(entity.getEntityType());
+ error.setErrorCode(TimelinePutError.IO_EXCEPTION);
+ response.addError(error);
+ } finally {
+ IOUtils.cleanup(LOG, writeBatch);
+ }
+ }
+
+ /**
+ * For a given key / value pair that has been written to the db,
+ * write additional entries to the db for each primary filter.
+ */
+ private static void writePrimaryFilterEntries(WriteBatch writeBatch,
+ Map<String, Set<Object>> primaryFilters, byte[] key, byte[] value)
+ throws IOException {
+ if (primaryFilters != null && !primaryFilters.isEmpty()) {
+ for (Entry<String, Set<Object>> pf : primaryFilters.entrySet()) {
+ for (Object pfval : pf.getValue()) {
+ writeBatch.put(addPrimaryFilterToKey(pf.getKey(), pfval,
+ key), value);
+ }
+ }
+ }
+ }
+
+ @Override
+ public TimelinePutResponse put(TimelineEntities entities) {
+ TimelinePutResponse response = new TimelinePutResponse();
+ for (TimelineEntity entity : entities.getEntities()) {
+ put(entity, response);
+ }
+ return response;
+ }
+
+ /**
+ * Get the unique start time for a given entity as a byte array that sorts
+ * the timestamps in reverse order (see {@link
+ * GenericObjectMapper#writeReverseOrderedLong(long)}).
+ *
+ * @param entityId The id of the entity
+ * @param entityType The type of the entity
+ * @param startTime The start time of the entity, or null
+ * @param events A list of events for the entity, or null
+ * @param writeBatch A leveldb write batch, if the method is called by a
+ * put as opposed to a get
+ * @return A byte array
+ * @throws IOException
+ */
+ private byte[] getStartTime(String entityId, String entityType,
+ Long startTime, List<TimelineEvent> events, WriteBatch writeBatch)
+ throws IOException {
+ EntityIdentifier entity = new EntityIdentifier(entityId, entityType);
+ if (startTime == null) {
+ // start time is not provided, so try to look it up
+ if (startTimeCache.containsKey(entity)) {
+ // found the start time in the cache
+ startTime = startTimeCache.get(entity);
+ } else {
+ // try to look up the start time in the db
+ byte[] b = createStartTimeLookupKey(entity.getId(), entity.getType());
+ byte[] v = db.get(b);
+ if (v == null) {
+ // did not find the start time in the db
+ // if this is a put, try to set it from the provided events
+ if (events == null || writeBatch == null) {
+ // no events, or not a put, so return null
+ return null;
+ }
+ Long min = Long.MAX_VALUE;
+ for (TimelineEvent e : events)
+ if (min > e.getTimestamp())
+ min = e.getTimestamp();
+ startTime = min;
+ // selected start time as minimum timestamp of provided events
+ // write start time to db and cache
+ writeBatch.put(b, writeReverseOrderedLong(startTime));
+ startTimeCache.put(entity, startTime);
+ } else {
+ // found the start time in the db
+ startTime = readReverseOrderedLong(v, 0);
+ if (writeBatch != null) {
+ // if this is a put, re-add the start time to the cache
+ startTimeCache.put(entity, startTime);
+ }
+ }
+ }
+ } else {
+ // start time is provided
+ // TODO: verify start time in db as well as cache?
+ if (startTimeCache.containsKey(entity)) {
+ // if the start time is already in the cache,
+ // and it is different from the provided start time,
+ // use the one from the cache
+ if (!startTime.equals(startTimeCache.get(entity)))
+ startTime = startTimeCache.get(entity);
+ } else if (writeBatch != null) {
+ // if this is a put, write the provided start time to the db and the
+ // cache
+ byte[] b = createStartTimeLookupKey(entity.getId(), entity.getType());
+ writeBatch.put(b, writeReverseOrderedLong(startTime));
+ startTimeCache.put(entity, startTime);
+ }
+ }
+ return writeReverseOrderedLong(startTime);
+ }
+
+ /**
+ * Creates a key for looking up the start time of a given entity,
+ * of the form START_TIME_LOOKUP_PREFIX + entitytype + entity.
+ */
+ private static byte[] createStartTimeLookupKey(String entity,
+ String entitytype) throws IOException {
+ return KeyBuilder.newInstance().add(START_TIME_LOOKUP_PREFIX)
+ .add(entitytype).add(entity).getBytes();
+ }
+
+ /**
+ * Creates an index entry for the given key of the form
+ * INDEXED_ENTRY_PREFIX + primaryfiltername + primaryfiltervalue + key.
+ */
+ private static byte[] addPrimaryFilterToKey(String primaryFilterName,
+ Object primaryFilterValue, byte[] key) throws IOException {
+ return KeyBuilder.newInstance().add(INDEXED_ENTRY_PREFIX)
+ .add(primaryFilterName)
+ .add(GenericObjectMapper.write(primaryFilterValue), true).add(key)
+ .getBytes();
+ }
+
+ /**
+ * Creates an event key, serializing ENTITY_ENTRY_PREFIX + entitytype +
+ * revstarttime + entity + TIME_COLUMN + reveventtimestamp + eventtype.
+ */
+ private static byte[] createEntityEventKey(String entity, String entitytype,
+ byte[] revStartTime, byte[] reveventtimestamp, String eventtype)
+ throws IOException {
+ return KeyBuilder.newInstance().add(ENTITY_ENTRY_PREFIX)
+ .add(entitytype).add(revStartTime).add(entity).add(TIME_COLUMN)
+ .add(reveventtimestamp).add(eventtype).getBytes();
+ }
+
+ /**
+ * Creates an event object from the given key, offset, and value. If the
+ * event type is not contained in the specified set of event types,
+ * returns null.
+ */
+ private static TimelineEvent getEntityEvent(Set<String> eventTypes, byte[] key,
+ int offset, byte[] value) throws IOException {
+ KeyParser kp = new KeyParser(key, offset);
+ long ts = kp.getNextLong();
+ String tstype = kp.getNextString();
+ if (eventTypes == null || eventTypes.contains(tstype)) {
+ TimelineEvent event = new TimelineEvent();
+ event.setTimestamp(ts);
+ event.setEventType(tstype);
+ Object o = GenericObjectMapper.read(value);
+ if (o == null) {
+ event.setEventInfo(null);
+ } else if (o instanceof Map) {
+ @SuppressWarnings("unchecked")
+ Map<String, Object> m = (Map<String, Object>) o;
+ event.setEventInfo(m);
+ } else {
+ throw new IOException("Couldn't deserialize event info map");
+ }
+ return event;
+ }
+ return null;
+ }
+
+ /**
+ * Creates a primary filter key, serializing ENTITY_ENTRY_PREFIX +
+ * entitytype + revstarttime + entity + PRIMARY_FILTER_COLUMN + name + value.
+ */
+ private static byte[] createPrimaryFilterKey(String entity,
+ String entitytype, byte[] revStartTime, String name, Object value)
+ throws IOException {
+ return KeyBuilder.newInstance().add(ENTITY_ENTRY_PREFIX).add(entitytype)
+ .add(revStartTime).add(entity).add(PRIMARY_FILTER_COLUMN).add(name)
+ .add(GenericObjectMapper.write(value)).getBytes();
+ }
+
+ /**
+ * Parses the primary filter from the given key at the given offset and
+ * adds it to the given entity.
+ */
+ private static void addPrimaryFilter(TimelineEntity entity, byte[] key,
+ int offset) throws IOException {
+ KeyParser kp = new KeyParser(key, offset);
+ String name = kp.getNextString();
+ Object value = GenericObjectMapper.read(key, kp.getOffset());
+ entity.addPrimaryFilter(name, value);
+ }
+
+ /**
+ * Creates an other info key, serializing ENTITY_ENTRY_PREFIX + entitytype +
+ * revstarttime + entity + OTHER_INFO_COLUMN + name.
+ */
+ private static byte[] createOtherInfoKey(String entity, String entitytype,
+ byte[] revStartTime, String name) throws IOException {
+ return KeyBuilder.newInstance().add(ENTITY_ENTRY_PREFIX).add(entitytype)
+ .add(revStartTime).add(entity).add(OTHER_INFO_COLUMN).add(name)
+ .getBytes();
+ }
+
+ /**
+ * Creates a string representation of the byte array from the given offset
+ * to the end of the array (for parsing other info keys).
+ */
+ private static String parseRemainingKey(byte[] b, int offset) {
+ return new String(b, offset, b.length - offset);
+ }
+
+ /**
+ * Creates a related entity key, serializing ENTITY_ENTRY_PREFIX +
+ * entitytype + revstarttime + entity + RELATED_COLUMN + relatedentitytype +
+ * relatedentity.
+ */
+ private static byte[] createReleatedEntityKey(String entity,
+ String entitytype, byte[] revStartTime, String relatedEntity,
+ String relatedEntityType) throws IOException {
+ return KeyBuilder.newInstance().add(ENTITY_ENTRY_PREFIX).add(entitytype)
+ .add(revStartTime).add(entity).add(RELATED_COLUMN)
+ .add(relatedEntityType).add(relatedEntity).getBytes();
+ }
+
+ /**
+ * Parses the related entity from the given key at the given offset and
+ * adds it to the given entity.
+ */
+ private static void addRelatedEntity(TimelineEntity entity, byte[] key,
+ int offset) throws IOException {
+ KeyParser kp = new KeyParser(key, offset);
+ String type = kp.getNextString();
+ String id = kp.getNextString();
+ entity.addRelatedEntity(type, id);
+ }
+
+ /**
+ * Clears the cache to test reloading start times from leveldb (only for
+ * testing).
+ */
+ @VisibleForTesting
+ void clearStartTimeCache() {
+ startTimeCache.clear();
+ }
+}
Added: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/timeline/MemoryTimelineStore.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/timeline/MemoryTimelineStore.java?rev=1570922&view=auto
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/timeline/MemoryTimelineStore.java (added)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/timeline/MemoryTimelineStore.java Sat Feb 22 20:55:06 2014
@@ -0,0 +1,306 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.applicationhistoryservice.timeline;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.EnumSet;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.PriorityQueue;
+import java.util.Set;
+import java.util.SortedSet;
+
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.classification.InterfaceStability.Unstable;
+import org.apache.hadoop.service.AbstractService;
+import org.apache.hadoop.yarn.api.records.timeline.TimelineEntities;
+import org.apache.hadoop.yarn.api.records.timeline.TimelineEntity;
+import org.apache.hadoop.yarn.api.records.timeline.TimelineEvent;
+import org.apache.hadoop.yarn.api.records.timeline.TimelineEvents;
+import org.apache.hadoop.yarn.api.records.timeline.TimelinePutResponse;
+import org.apache.hadoop.yarn.api.records.timeline.TimelineEvents.EventsOfOneEntity;
+import org.apache.hadoop.yarn.api.records.timeline.TimelinePutResponse.TimelinePutError;
+
+/**
+ * In-memory implementation of {@link TimelineStore}. This
+ * implementation is for test purpose only. If users improperly instantiate it,
+ * they may encounter reading and writing history data in different memory
+ * store.
+ *
+ */
+@Private
+@Unstable
+public class MemoryTimelineStore
+ extends AbstractService implements TimelineStore {
+
+ private Map<EntityIdentifier, TimelineEntity> entities =
+ new HashMap<EntityIdentifier, TimelineEntity>();
+
+ public MemoryTimelineStore() {
+ super(MemoryTimelineStore.class.getName());
+ }
+
+ @Override
+ public TimelineEntities getEntities(String entityType, Long limit,
+ Long windowStart, Long windowEnd, NameValuePair primaryFilter,
+ Collection<NameValuePair> secondaryFilters, EnumSet<Field> fields) {
+ if (limit == null) {
+ limit = DEFAULT_LIMIT;
+ }
+ if (windowStart == null) {
+ windowStart = Long.MIN_VALUE;
+ }
+ if (windowEnd == null) {
+ windowEnd = Long.MAX_VALUE;
+ }
+ if (fields == null) {
+ fields = EnumSet.allOf(Field.class);
+ }
+ List<TimelineEntity> entitiesSelected = new ArrayList<TimelineEntity>();
+ for (TimelineEntity entity : new PriorityQueue<TimelineEntity>(entities.values())) {
+ if (entitiesSelected.size() >= limit) {
+ break;
+ }
+ if (!entity.getEntityType().equals(entityType)) {
+ continue;
+ }
+ if (entity.getStartTime() <= windowStart) {
+ continue;
+ }
+ if (entity.getStartTime() > windowEnd) {
+ continue;
+ }
+ if (primaryFilter != null &&
+ !matchPrimaryFilter(entity.getPrimaryFilters(), primaryFilter)) {
+ continue;
+ }
+ if (secondaryFilters != null) { // OR logic
+ boolean flag = false;
+ for (NameValuePair secondaryFilter : secondaryFilters) {
+ if (secondaryFilter != null &&
+ matchFilter(entity.getOtherInfo(), secondaryFilter)) {
+ flag = true;
+ break;
+ }
+ }
+ if (!flag) {
+ continue;
+ }
+ }
+ entitiesSelected.add(entity);
+ }
+ List<TimelineEntity> entitiesToReturn = new ArrayList<TimelineEntity>();
+ for (TimelineEntity entitySelected : entitiesSelected) {
+ entitiesToReturn.add(maskFields(entitySelected, fields));
+ }
+ Collections.sort(entitiesToReturn);
+ TimelineEntities entitiesWrapper = new TimelineEntities();
+ entitiesWrapper.setEntities(entitiesToReturn);
+ return entitiesWrapper;
+ }
+
+ @Override
+ public TimelineEntity getEntity(String entityId, String entityType,
+ EnumSet<Field> fieldsToRetrieve) {
+ if (fieldsToRetrieve == null) {
+ fieldsToRetrieve = EnumSet.allOf(Field.class);
+ }
+ TimelineEntity entity = entities.get(new EntityIdentifier(entityId, entityType));
+ if (entity == null) {
+ return null;
+ } else {
+ return maskFields(entity, fieldsToRetrieve);
+ }
+ }
+
+ @Override
+ public TimelineEvents getEntityTimelines(String entityType,
+ SortedSet<String> entityIds, Long limit, Long windowStart,
+ Long windowEnd,
+ Set<String> eventTypes) {
+ TimelineEvents allEvents = new TimelineEvents();
+ if (entityIds == null) {
+ return allEvents;
+ }
+ if (limit == null) {
+ limit = DEFAULT_LIMIT;
+ }
+ if (windowStart == null) {
+ windowStart = Long.MIN_VALUE;
+ }
+ if (windowEnd == null) {
+ windowEnd = Long.MAX_VALUE;
+ }
+ for (String entityId : entityIds) {
+ EntityIdentifier entityID = new EntityIdentifier(entityId, entityType);
+ TimelineEntity entity = entities.get(entityID);
+ if (entity == null) {
+ continue;
+ }
+ EventsOfOneEntity events = new EventsOfOneEntity();
+ events.setEntityId(entityId);
+ events.setEntityType(entityType);
+ for (TimelineEvent event : entity.getEvents()) {
+ if (events.getEvents().size() >= limit) {
+ break;
+ }
+ if (event.getTimestamp() <= windowStart) {
+ continue;
+ }
+ if (event.getTimestamp() > windowEnd) {
+ continue;
+ }
+ if (eventTypes != null && !eventTypes.contains(event.getEventType())) {
+ continue;
+ }
+ events.addEvent(event);
+ }
+ allEvents.addEvent(events);
+ }
+ return allEvents;
+ }
+
+ @Override
+ public TimelinePutResponse put(TimelineEntities data) {
+ TimelinePutResponse response = new TimelinePutResponse();
+ for (TimelineEntity entity : data.getEntities()) {
+ EntityIdentifier entityId =
+ new EntityIdentifier(entity.getEntityId(), entity.getEntityType());
+ // store entity info in memory
+ TimelineEntity existingEntity = entities.get(entityId);
+ if (existingEntity == null) {
+ existingEntity = new TimelineEntity();
+ existingEntity.setEntityId(entity.getEntityId());
+ existingEntity.setEntityType(entity.getEntityType());
+ existingEntity.setStartTime(entity.getStartTime());
+ entities.put(entityId, existingEntity);
+ }
+ if (entity.getEvents() != null) {
+ if (existingEntity.getEvents() == null) {
+ existingEntity.setEvents(entity.getEvents());
+ } else {
+ existingEntity.addEvents(entity.getEvents());
+ }
+ Collections.sort(existingEntity.getEvents());
+ }
+ // check startTime
+ if (existingEntity.getStartTime() == null) {
+ if (existingEntity.getEvents() == null
+ || existingEntity.getEvents().isEmpty()) {
+ TimelinePutError error = new TimelinePutError();
+ error.setEntityId(entityId.getId());
+ error.setEntityType(entityId.getType());
+ error.setErrorCode(TimelinePutError.NO_START_TIME);
+ response.addError(error);
+ entities.remove(entityId);
+ continue;
+ } else {
+ existingEntity.setStartTime(entity.getEvents().get(0).getTimestamp());
+ }
+ }
+ if (entity.getPrimaryFilters() != null) {
+ if (existingEntity.getPrimaryFilters() == null) {
+ existingEntity.setPrimaryFilters(entity.getPrimaryFilters());
+ } else {
+ existingEntity.addPrimaryFilters(entity.getPrimaryFilters());
+ }
+ }
+ if (entity.getOtherInfo() != null) {
+ if (existingEntity.getOtherInfo() == null) {
+ existingEntity.setOtherInfo(entity.getOtherInfo());
+ } else {
+ existingEntity.addOtherInfo(entity.getOtherInfo());
+ }
+ }
+ // relate it to other entities
+ if (entity.getRelatedEntities() == null) {
+ continue;
+ }
+ for (Map.Entry<String, Set<String>> partRelatedEntities : entity
+ .getRelatedEntities().entrySet()) {
+ if (partRelatedEntities == null) {
+ continue;
+ }
+ for (String idStr : partRelatedEntities.getValue()) {
+ EntityIdentifier relatedEntityId =
+ new EntityIdentifier(idStr, partRelatedEntities.getKey());
+ TimelineEntity relatedEntity = entities.get(relatedEntityId);
+ if (relatedEntity != null) {
+ relatedEntity.addRelatedEntity(
+ existingEntity.getEntityType(), existingEntity.getEntityId());
+ } else {
+ relatedEntity = new TimelineEntity();
+ relatedEntity.setEntityId(relatedEntityId.getId());
+ relatedEntity.setEntityType(relatedEntityId.getType());
+ relatedEntity.setStartTime(existingEntity.getStartTime());
+ relatedEntity.addRelatedEntity(existingEntity.getEntityType(),
+ existingEntity.getEntityId());
+ entities.put(relatedEntityId, relatedEntity);
+ }
+ }
+ }
+ }
+ return response;
+ }
+
+ private static TimelineEntity maskFields(
+ TimelineEntity entity, EnumSet<Field> fields) {
+ // Conceal the fields that are not going to be exposed
+ TimelineEntity entityToReturn = new TimelineEntity();
+ entityToReturn.setEntityId(entity.getEntityId());
+ entityToReturn.setEntityType(entity.getEntityType());
+ entityToReturn.setStartTime(entity.getStartTime());
+ entityToReturn.setEvents(fields.contains(Field.EVENTS) ?
+ entity.getEvents() : fields.contains(Field.LAST_EVENT_ONLY) ?
+ Arrays.asList(entity.getEvents().get(0)) : null);
+ entityToReturn.setRelatedEntities(fields.contains(Field.RELATED_ENTITIES) ?
+ entity.getRelatedEntities() : null);
+ entityToReturn.setPrimaryFilters(fields.contains(Field.PRIMARY_FILTERS) ?
+ entity.getPrimaryFilters() : null);
+ entityToReturn.setOtherInfo(fields.contains(Field.OTHER_INFO) ?
+ entity.getOtherInfo() : null);
+ return entityToReturn;
+ }
+
+ private static boolean matchFilter(Map<String, Object> tags,
+ NameValuePair filter) {
+ Object value = tags.get(filter.getName());
+ if (value == null) { // doesn't have the filter
+ return false;
+ } else if (!value.equals(filter.getValue())) { // doesn't match the filter
+ return false;
+ }
+ return true;
+ }
+
+ private static boolean matchPrimaryFilter(Map<String, Set<Object>> tags,
+ NameValuePair filter) {
+ Set<Object> value = tags.get(filter.getName());
+ if (value == null) { // doesn't have the filter
+ return false;
+ } else {
+ return value.contains(filter.getValue());
+ }
+ }
+
+}
Added: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/timeline/NameValuePair.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/timeline/NameValuePair.java?rev=1570922&view=auto
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/timeline/NameValuePair.java (added)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/timeline/NameValuePair.java Sat Feb 22 20:55:06 2014
@@ -0,0 +1,59 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.applicationhistoryservice.timeline;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+
+/**
+ * A class holding a name and value pair, used for specifying filters in
+ * {@link TimelineReader}.
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Unstable
+public class NameValuePair {
+ String name;
+ Object value;
+
+ public NameValuePair(String name, Object value) {
+ this.name = name;
+ this.value = value;
+ }
+
+ /**
+ * Get the name.
+ * @return The name.
+ */
+ public String getName() {
+
+ return name;
+ }
+
+ /**
+ * Get the value.
+ * @return The value.
+ */
+ public Object getValue() {
+ return value;
+ }
+
+ @Override
+ public String toString() {
+ return "{ name: " + name + ", value: " + value + " }";
+ }
+}
Added: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/timeline/TimelineReader.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/timeline/TimelineReader.java?rev=1570922&view=auto
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/timeline/TimelineReader.java (added)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/timeline/TimelineReader.java Sat Feb 22 20:55:06 2014
@@ -0,0 +1,141 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.applicationhistoryservice.timeline;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.EnumSet;
+import java.util.Set;
+import java.util.SortedSet;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.yarn.api.records.timeline.TimelineEntities;
+import org.apache.hadoop.yarn.api.records.timeline.TimelineEntity;
+import org.apache.hadoop.yarn.api.records.timeline.TimelineEvents;
+
+/**
+ * This interface is for retrieving timeline information.
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Unstable
+public interface TimelineReader {
+
+ /**
+ * Possible fields to retrieve for {@link #getEntities} and {@link #getEntity}
+ * .
+ */
+ enum Field {
+ EVENTS,
+ RELATED_ENTITIES,
+ PRIMARY_FILTERS,
+ OTHER_INFO,
+ LAST_EVENT_ONLY
+ }
+
+ /**
+ * Default limit for {@link #getEntities} and {@link #getEntityTimelines}.
+ */
+ final long DEFAULT_LIMIT = 100;
+
+ /**
+ * This method retrieves a list of entity information, {@link TimelineEntity}, sorted
+ * by the starting timestamp for the entity, descending.
+ *
+ * @param entityType
+ * The type of entities to return (required).
+ * @param limit
+ * A limit on the number of entities to return. If null, defaults to
+ * {@link #DEFAULT_LIMIT}.
+ * @param windowStart
+ * The earliest start timestamp to retrieve (exclusive). If null,
+ * defaults to retrieving all entities until the limit is reached.
+ * @param windowEnd
+ * The latest start timestamp to retrieve (inclusive). If null,
+ * defaults to {@link Long#MAX_VALUE}
+ * @param primaryFilter
+ * Retrieves only entities that have the specified primary filter. If
+ * null, retrieves all entities. This is an indexed retrieval, and no
+ * entities that do not match the filter are scanned.
+ * @param secondaryFilters
+ * Retrieves only entities that have exact matches for all the
+ * specified filters in their primary filters or other info. This is
+ * not an indexed retrieval, so all entities are scanned but only
+ * those matching the filters are returned.
+ * @param fieldsToRetrieve
+ * Specifies which fields of the entity object to retrieve (see
+ * {@link Field}). If the set of fields contains
+ * {@link Field#LAST_EVENT_ONLY} and not {@link Field#EVENTS}, the
+ * most recent event for each entity is retrieved. If null, retrieves
+ * all fields.
+ * @return An {@link TimelineEntities} object.
+ * @throws IOException
+ */
+ TimelineEntities getEntities(String entityType,
+ Long limit, Long windowStart, Long windowEnd,
+ NameValuePair primaryFilter, Collection<NameValuePair> secondaryFilters,
+ EnumSet<Field> fieldsToRetrieve) throws IOException;
+
+ /**
+ * This method retrieves the entity information for a given entity.
+ *
+ * @param entityId
+ * The entity whose information will be retrieved.
+ * @param entityType
+ * The type of the entity.
+ * @param fieldsToRetrieve
+ * Specifies which fields of the entity object to retrieve (see
+ * {@link Field}). If the set of fields contains
+ * {@link Field#LAST_EVENT_ONLY} and not {@link Field#EVENTS}, the
+ * most recent event for each entity is retrieved. If null, retrieves
+ * all fields.
+ * @return An {@link TimelineEntity} object.
+ * @throws IOException
+ */
+ TimelineEntity getEntity(String entityId, String entityType, EnumSet<Field>
+ fieldsToRetrieve) throws IOException;
+
+ /**
+ * This method retrieves the events for a list of entities all of the same
+ * entity type. The events for each entity are sorted in order of their
+ * timestamps, descending.
+ *
+ * @param entityType
+ * The type of entities to retrieve events for.
+ * @param entityIds
+ * The entity IDs to retrieve events for.
+ * @param limit
+ * A limit on the number of events to return for each entity. If
+ * null, defaults to {@link #DEFAULT_LIMIT} events per entity.
+ * @param windowStart
+ * If not null, retrieves only events later than the given time
+ * (exclusive)
+ * @param windowEnd
+ * If not null, retrieves only events earlier than the given time
+ * (inclusive)
+ * @param eventTypes
+ * Restricts the events returned to the given types. If null, events
+ * of all types will be returned.
+ * @return An {@link TimelineEvents} object.
+ * @throws IOException
+ */
+ TimelineEvents getEntityTimelines(String entityType,
+ SortedSet<String> entityIds, Long limit, Long windowStart,
+ Long windowEnd, Set<String> eventTypes) throws IOException;
+}
Added: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/timeline/TimelineStore.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/timeline/TimelineStore.java?rev=1570922&view=auto
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/timeline/TimelineStore.java (added)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/timeline/TimelineStore.java Sat Feb 22 20:55:06 2014
@@ -0,0 +1,29 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.applicationhistoryservice.timeline;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.service.Service;
+
+@InterfaceAudience.Private
+@InterfaceStability.Unstable
+public interface TimelineStore extends
+ Service, TimelineReader, TimelineWriter {
+}
Added: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/timeline/TimelineWriter.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/timeline/TimelineWriter.java?rev=1570922&view=auto
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/timeline/TimelineWriter.java (added)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/timeline/TimelineWriter.java Sat Feb 22 20:55:06 2014
@@ -0,0 +1,46 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.applicationhistoryservice.timeline;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.yarn.api.records.timeline.TimelineEntities;
+import org.apache.hadoop.yarn.api.records.timeline.TimelinePutResponse;
+
+import java.io.IOException;
+
+/**
+ * This interface is for storing timeline information.
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Unstable
+public interface TimelineWriter {
+
+ /**
+ * Stores entity information to the timeline store. Any errors occurring for
+ * individual put request objects will be reported in the response.
+ *
+ * @param data
+ * An {@link TimelineEntities} object.
+ * @return An {@link TimelinePutResponse} object.
+ * @throws IOException
+ */
+ TimelinePutResponse put(TimelineEntities data) throws IOException;
+
+}
Added: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/timeline/package-info.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/timeline/package-info.java?rev=1570922&view=auto
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/timeline/package-info.java (added)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/timeline/package-info.java Sat Feb 22 20:55:06 2014
@@ -0,0 +1,20 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+@InterfaceAudience.Private
+package org.apache.hadoop.yarn.server.applicationhistoryservice.timeline;
+import org.apache.hadoop.classification.InterfaceAudience;
Modified: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/webapp/AHSWebApp.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/webapp/AHSWebApp.java?rev=1570922&r1=1570921&r2=1570922&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/webapp/AHSWebApp.java (original)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/webapp/AHSWebApp.java Sat Feb 22 20:55:06 2014
@@ -21,7 +21,7 @@ import static org.apache.hadoop.yarn.uti
import org.apache.hadoop.yarn.server.api.ApplicationContext;
import org.apache.hadoop.yarn.server.applicationhistoryservice.ApplicationHistoryManager;
-import org.apache.hadoop.yarn.server.applicationhistoryservice.apptimeline.ApplicationTimelineStore;
+import org.apache.hadoop.yarn.server.applicationhistoryservice.timeline.TimelineStore;
import org.apache.hadoop.yarn.webapp.GenericExceptionHandler;
import org.apache.hadoop.yarn.webapp.WebApp;
import org.apache.hadoop.yarn.webapp.YarnJacksonJaxbJsonProvider;
@@ -30,22 +30,22 @@ import org.apache.hadoop.yarn.webapp.Yar
public class AHSWebApp extends WebApp implements YarnWebParams {
private final ApplicationHistoryManager applicationHistoryManager;
- private final ApplicationTimelineStore applicationTimelineStore;
+ private final TimelineStore timelineStore;
public AHSWebApp(ApplicationHistoryManager applicationHistoryManager,
- ApplicationTimelineStore applicationTimelineStore) {
+ TimelineStore timelineStore) {
this.applicationHistoryManager = applicationHistoryManager;
- this.applicationTimelineStore = applicationTimelineStore;
+ this.timelineStore = timelineStore;
}
@Override
public void setup() {
bind(YarnJacksonJaxbJsonProvider.class);
bind(AHSWebServices.class);
- bind(ATSWebServices.class);
+ bind(TimelineWebServices.class);
bind(GenericExceptionHandler.class);
bind(ApplicationContext.class).toInstance(applicationHistoryManager);
- bind(ApplicationTimelineStore.class).toInstance(applicationTimelineStore);
+ bind(TimelineStore.class).toInstance(timelineStore);
route("/", AHSController.class);
route(pajoin("/apps", APP_STATE), AHSController.class);
route(pajoin("/app", APPLICATION_ID), AHSController.class, "app");
Added: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/webapp/TimelineWebServices.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/webapp/TimelineWebServices.java?rev=1570922&view=auto
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/webapp/TimelineWebServices.java (added)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/webapp/TimelineWebServices.java Sat Feb 22 20:55:06 2014
@@ -0,0 +1,328 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.applicationhistoryservice.webapp;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.EnumSet;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.SortedSet;
+import java.util.TreeSet;
+
+import javax.servlet.http.HttpServletRequest;
+import javax.servlet.http.HttpServletResponse;
+import javax.ws.rs.Consumes;
+import javax.ws.rs.GET;
+import javax.ws.rs.POST;
+import javax.ws.rs.Path;
+import javax.ws.rs.PathParam;
+import javax.ws.rs.Produces;
+import javax.ws.rs.QueryParam;
+import javax.ws.rs.WebApplicationException;
+import javax.ws.rs.core.Context;
+import javax.ws.rs.core.MediaType;
+import javax.ws.rs.core.Response;
+import javax.xml.bind.annotation.XmlAccessType;
+import javax.xml.bind.annotation.XmlAccessorType;
+import javax.xml.bind.annotation.XmlElement;
+import javax.xml.bind.annotation.XmlRootElement;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience.Public;
+import org.apache.hadoop.classification.InterfaceStability.Unstable;
+import org.apache.hadoop.yarn.api.records.timeline.TimelineEntities;
+import org.apache.hadoop.yarn.api.records.timeline.TimelineEntity;
+import org.apache.hadoop.yarn.api.records.timeline.TimelineEvents;
+import org.apache.hadoop.yarn.api.records.timeline.TimelinePutResponse;
+import org.apache.hadoop.yarn.server.applicationhistoryservice.timeline.TimelineStore;
+import org.apache.hadoop.yarn.server.applicationhistoryservice.timeline.NameValuePair;
+import org.apache.hadoop.yarn.server.applicationhistoryservice.timeline.TimelineReader.Field;
+import org.apache.hadoop.yarn.webapp.BadRequestException;
+
+import com.google.inject.Inject;
+import com.google.inject.Singleton;
+
+@Singleton
+@Path("/ws/v1/timeline")
+//TODO: support XML serialization/deserialization
+public class TimelineWebServices {
+
+ private static final Log LOG = LogFactory.getLog(TimelineWebServices.class);
+
+ private TimelineStore store;
+
+ @Inject
+ public TimelineWebServices(TimelineStore store) {
+ this.store = store;
+ }
+
+ @XmlRootElement(name = "about")
+ @XmlAccessorType(XmlAccessType.NONE)
+ @Public
+ @Unstable
+ public static class AboutInfo {
+
+ private String about;
+
+ public AboutInfo() {
+
+ }
+
+ public AboutInfo(String about) {
+ this.about = about;
+ }
+
+ @XmlElement(name = "About")
+ public String getAbout() {
+ return about;
+ }
+
+ public void setAbout(String about) {
+ this.about = about;
+ }
+
+ }
+
+ /**
+ * Return the description of the timeline web services.
+ */
+ @GET
+ @Produces({ MediaType.APPLICATION_JSON /* , MediaType.APPLICATION_XML */})
+ public AboutInfo about(
+ @Context HttpServletRequest req,
+ @Context HttpServletResponse res) {
+ init(res);
+ return new AboutInfo("Timeline API");
+ }
+
+ /**
+ * Return a list of entities that match the given parameters.
+ */
+ @GET
+ @Path("/{entityType}")
+ @Produces({ MediaType.APPLICATION_JSON /* , MediaType.APPLICATION_XML */})
+ public TimelineEntities getEntities(
+ @Context HttpServletRequest req,
+ @Context HttpServletResponse res,
+ @PathParam("entityType") String entityType,
+ @QueryParam("primaryFilter") String primaryFilter,
+ @QueryParam("secondaryFilter") String secondaryFilter,
+ @QueryParam("windowStart") String windowStart,
+ @QueryParam("windowEnd") String windowEnd,
+ @QueryParam("limit") String limit,
+ @QueryParam("fields") String fields) {
+ init(res);
+ TimelineEntities entities = null;
+ try {
+ entities = store.getEntities(
+ parseStr(entityType),
+ parseLongStr(limit),
+ parseLongStr(windowStart),
+ parseLongStr(windowEnd),
+ parsePairStr(primaryFilter, ":"),
+ parsePairsStr(secondaryFilter, ",", ":"),
+ parseFieldsStr(fields, ","));
+ } catch (NumberFormatException e) {
+ throw new BadRequestException(
+ "windowStart, windowEnd or limit is not a numeric value.");
+ } catch (IllegalArgumentException e) {
+ throw new BadRequestException("requested invalid field.");
+ } catch (IOException e) {
+ LOG.error("Error getting entities", e);
+ throw new WebApplicationException(e,
+ Response.Status.INTERNAL_SERVER_ERROR);
+ }
+ if (entities == null) {
+ return new TimelineEntities();
+ }
+ return entities;
+ }
+
+ /**
+ * Return a single entity of the given entity type and Id.
+ */
+ @GET
+ @Path("/{entityType}/{entityId}")
+ @Produces({ MediaType.APPLICATION_JSON /* , MediaType.APPLICATION_XML */})
+ public TimelineEntity getEntity(
+ @Context HttpServletRequest req,
+ @Context HttpServletResponse res,
+ @PathParam("entityType") String entityType,
+ @PathParam("entityId") String entityId,
+ @QueryParam("fields") String fields) {
+ init(res);
+ TimelineEntity entity = null;
+ try {
+ entity =
+ store.getEntity(parseStr(entityId), parseStr(entityType),
+ parseFieldsStr(fields, ","));
+ } catch (IllegalArgumentException e) {
+ throw new BadRequestException(
+ "requested invalid field.");
+ } catch (IOException e) {
+ LOG.error("Error getting entity", e);
+ throw new WebApplicationException(e,
+ Response.Status.INTERNAL_SERVER_ERROR);
+ }
+ if (entity == null) {
+ throw new WebApplicationException(Response.Status.NOT_FOUND);
+ }
+ return entity;
+ }
+
+ /**
+ * Return the events that match the given parameters.
+ */
+ @GET
+ @Path("/{entityType}/events")
+ @Produces({ MediaType.APPLICATION_JSON /* , MediaType.APPLICATION_XML */})
+ public TimelineEvents getEvents(
+ @Context HttpServletRequest req,
+ @Context HttpServletResponse res,
+ @PathParam("entityType") String entityType,
+ @QueryParam("entityId") String entityId,
+ @QueryParam("eventType") String eventType,
+ @QueryParam("windowStart") String windowStart,
+ @QueryParam("windowEnd") String windowEnd,
+ @QueryParam("limit") String limit) {
+ init(res);
+ TimelineEvents events = null;
+ try {
+ events = store.getEntityTimelines(
+ parseStr(entityType),
+ parseArrayStr(entityId, ","),
+ parseLongStr(limit),
+ parseLongStr(windowStart),
+ parseLongStr(windowEnd),
+ parseArrayStr(eventType, ","));
+ } catch (NumberFormatException e) {
+ throw new BadRequestException(
+ "windowStart, windowEnd or limit is not a numeric value.");
+ } catch (IOException e) {
+ LOG.error("Error getting entity timelines", e);
+ throw new WebApplicationException(e,
+ Response.Status.INTERNAL_SERVER_ERROR);
+ }
+ if (events == null) {
+ return new TimelineEvents();
+ }
+ return events;
+ }
+
+ /**
+ * Store the given entities into the timeline store, and return the errors
+ * that happen during storing.
+ */
+ @POST
+ @Consumes({ MediaType.APPLICATION_JSON /* , MediaType.APPLICATION_XML */})
+ public TimelinePutResponse postEntities(
+ @Context HttpServletRequest req,
+ @Context HttpServletResponse res,
+ TimelineEntities entities) {
+ init(res);
+ if (entities == null) {
+ return new TimelinePutResponse();
+ }
+ try {
+ return store.put(entities);
+ } catch (IOException e) {
+ LOG.error("Error putting entities", e);
+ throw new WebApplicationException(e,
+ Response.Status.INTERNAL_SERVER_ERROR);
+ }
+ }
+
+ private void init(HttpServletResponse response) {
+ response.setContentType(null);
+ }
+
+ private static SortedSet<String> parseArrayStr(String str, String delimiter) {
+ if (str == null) {
+ return null;
+ }
+ SortedSet<String> strSet = new TreeSet<String>();
+ String[] strs = str.split(delimiter);
+ for (String aStr : strs) {
+ strSet.add(aStr.trim());
+ }
+ return strSet;
+ }
+
+ private static NameValuePair parsePairStr(String str, String delimiter) {
+ if (str == null) {
+ return null;
+ }
+ String[] strs = str.split(delimiter, 2);
+ return new NameValuePair(strs[0].trim(), strs[1].trim());
+ }
+
+ private static Collection<NameValuePair> parsePairsStr(
+ String str, String aDelimiter, String pDelimiter) {
+ if (str == null) {
+ return null;
+ }
+ String[] strs = str.split(aDelimiter);
+ Set<NameValuePair> pairs = new HashSet<NameValuePair>();
+ for (String aStr : strs) {
+ pairs.add(parsePairStr(aStr, pDelimiter));
+ }
+ return pairs;
+ }
+
+ private static EnumSet<Field> parseFieldsStr(String str, String delimiter) {
+ if (str == null) {
+ return null;
+ }
+ String[] strs = str.split(delimiter);
+ List<Field> fieldList = new ArrayList<Field>();
+ for (String s : strs) {
+ s = s.trim().toUpperCase();
+ if (s.equals("EVENTS"))
+ fieldList.add(Field.EVENTS);
+ else if (s.equals("LASTEVENTONLY"))
+ fieldList.add(Field.LAST_EVENT_ONLY);
+ else if (s.equals("RELATEDENTITIES"))
+ fieldList.add(Field.RELATED_ENTITIES);
+ else if (s.equals("PRIMARYFILTERS"))
+ fieldList.add(Field.PRIMARY_FILTERS);
+ else if (s.equals("OTHERINFO"))
+ fieldList.add(Field.OTHER_INFO);
+ }
+ if (fieldList.size() == 0)
+ return null;
+ Field f1 = fieldList.remove(fieldList.size() - 1);
+ if (fieldList.size() == 0)
+ return EnumSet.of(f1);
+ else
+ return EnumSet.of(f1, fieldList.toArray(new Field[fieldList.size()]));
+ }
+
+ private static Long parseLongStr(String str) {
+ return str == null ? null : Long.parseLong(str.trim());
+ }
+
+ private static String parseStr(String str) {
+ return str == null ? null : str.trim();
+ }
+
+}
Added: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/timeline/TestGenericObjectMapper.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/timeline/TestGenericObjectMapper.java?rev=1570922&view=auto
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/timeline/TestGenericObjectMapper.java (added)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/timeline/TestGenericObjectMapper.java Sat Feb 22 20:55:06 2014
@@ -0,0 +1,90 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.applicationhistoryservice.timeline;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.io.WritableComparator;
+import org.apache.hadoop.yarn.server.applicationhistoryservice.timeline.GenericObjectMapper;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static org.junit.Assert.assertEquals;
+
+@InterfaceAudience.Private
+@InterfaceStability.Unstable
+public class TestGenericObjectMapper {
+
+ @Test
+ public void testEncoding() {
+ testEncoding(Long.MAX_VALUE);
+ testEncoding(Long.MIN_VALUE);
+ testEncoding(0l);
+ testEncoding(128l);
+ testEncoding(256l);
+ testEncoding(512l);
+ testEncoding(-256l);
+ }
+
+ private static void testEncoding(long l) {
+ byte[] b = GenericObjectMapper.writeReverseOrderedLong(l);
+ assertEquals("error decoding", l,
+ GenericObjectMapper.readReverseOrderedLong(b, 0));
+ byte[] buf = new byte[16];
+ System.arraycopy(b, 0, buf, 5, 8);
+ assertEquals("error decoding at offset", l,
+ GenericObjectMapper.readReverseOrderedLong(buf, 5));
+ if (l > Long.MIN_VALUE) {
+ byte[] a = GenericObjectMapper.writeReverseOrderedLong(l-1);
+ assertEquals("error preserving ordering", 1,
+ WritableComparator.compareBytes(a, 0, a.length, b, 0, b.length));
+ }
+ if (l < Long.MAX_VALUE) {
+ byte[] c = GenericObjectMapper.writeReverseOrderedLong(l+1);
+ assertEquals("error preserving ordering", 1,
+ WritableComparator.compareBytes(b, 0, b.length, c, 0, c.length));
+ }
+ }
+
+ private static void verify(Object o) throws IOException {
+ assertEquals(o, GenericObjectMapper.read(GenericObjectMapper.write(o)));
+ }
+
+ @Test
+ public void testValueTypes() throws IOException {
+ verify(42l);
+ verify(42);
+ verify(1.23);
+ verify("abc");
+ verify(true);
+ List<String> list = new ArrayList<String>();
+ list.add("123");
+ list.add("abc");
+ verify(list);
+ Map<String,String> map = new HashMap<String,String>();
+ map.put("k1","v1");
+ map.put("k2","v2");
+ verify(map);
+ }
+
+}
Added: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/timeline/TestLeveldbTimelineStore.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/timeline/TestLeveldbTimelineStore.java?rev=1570922&view=auto
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/timeline/TestLeveldbTimelineStore.java (added)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/timeline/TestLeveldbTimelineStore.java Sat Feb 22 20:55:06 2014
@@ -0,0 +1,89 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.applicationhistoryservice.timeline;
+
+import java.io.File;
+import java.io.IOException;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileContext;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+@InterfaceAudience.Private
+@InterfaceStability.Unstable
+public class TestLeveldbTimelineStore
+ extends TimelineStoreTestUtils {
+ private FileContext fsContext;
+ private File fsPath;
+
+ @Before
+ public void setup() throws Exception {
+ fsContext = FileContext.getLocalFSFileContext();
+ Configuration conf = new Configuration();
+ fsPath = new File("target", this.getClass().getSimpleName() +
+ "-tmpDir").getAbsoluteFile();
+ fsContext.delete(new Path(fsPath.getAbsolutePath()), true);
+ conf.set(YarnConfiguration.TIMELINE_SERVICE_LEVELDB_PATH,
+ fsPath.getAbsolutePath());
+ store = new LeveldbTimelineStore();
+ store.init(conf);
+ store.start();
+ loadTestData();
+ loadVerificationData();
+ }
+
+ @After
+ public void tearDown() throws Exception {
+ store.stop();
+ fsContext.delete(new Path(fsPath.getAbsolutePath()), true);
+ }
+
+ @Test
+ public void testGetSingleEntity() throws IOException {
+ super.testGetSingleEntity();
+ ((LeveldbTimelineStore)store).clearStartTimeCache();
+ super.testGetSingleEntity();
+ }
+
+ @Test
+ public void testGetEntities() throws IOException {
+ super.testGetEntities();
+ }
+
+ @Test
+ public void testGetEntitiesWithPrimaryFilters() throws IOException {
+ super.testGetEntitiesWithPrimaryFilters();
+ }
+
+ @Test
+ public void testGetEntitiesWithSecondaryFilters() throws IOException {
+ super.testGetEntitiesWithSecondaryFilters();
+ }
+
+ @Test
+ public void testGetEvents() throws IOException {
+ super.testGetEvents();
+ }
+
+}