You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ambari.apache.org by sw...@apache.org on 2014/12/02 18:28:29 UTC

[13/30] ambari git commit: AMBARI-5707. Replace Ganglia with high performant and pluggable Metrics System. (swagle)

http://git-wip-us.apache.org/repos/asf/ambari/blob/a52f8a55/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/timeline/LeveldbTimelineStore.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/timeline/LeveldbTimelineStore.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/timeline/LeveldbTimelineStore.java
new file mode 100644
index 0000000..edd4842
--- /dev/null
+++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/timeline/LeveldbTimelineStore.java
@@ -0,0 +1,1473 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.applicationhistoryservice.timeline;
+
+import java.io.ByteArrayOutputStream;
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.EnumSet;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+import java.util.SortedSet;
+import java.util.TreeMap;
+import java.util.concurrent.locks.ReentrantLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+import com.google.common.annotations.VisibleForTesting;
+import org.apache.commons.collections.map.LRUMap;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.io.WritableComparator;
+import org.apache.hadoop.service.AbstractService;
+import org.apache.hadoop.yarn.api.records.timeline.TimelineEntities;
+import org.apache.hadoop.yarn.api.records.timeline.TimelineEntity;
+import org.apache.hadoop.yarn.api.records.timeline.TimelineEvent;
+import org.apache.hadoop.yarn.api.records.timeline.TimelineEvents;
+import org.apache.hadoop.yarn.api.records.timeline.TimelinePutResponse;
+import org.apache.hadoop.yarn.api.records.timeline.TimelineEvents.EventsOfOneEntity;
+import org.apache.hadoop.yarn.api.records.timeline.TimelinePutResponse.TimelinePutError;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.fusesource.leveldbjni.JniDBFactory;
+import org.iq80.leveldb.DB;
+import org.iq80.leveldb.DBIterator;
+import org.iq80.leveldb.Options;
+import org.iq80.leveldb.ReadOptions;
+import org.iq80.leveldb.WriteBatch;
+import org.iq80.leveldb.WriteOptions;
+
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.timeline.GenericObjectMapper.readReverseOrderedLong;
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.timeline.GenericObjectMapper.writeReverseOrderedLong;
+
+/**
+ * <p>An implementation of an application timeline store backed by leveldb.</p>
+ *
+ * <p>There are three sections of the db, the start time section,
+ * the entity section, and the indexed entity section.</p>
+ *
+ * <p>The start time section is used to retrieve the unique start time for
+ * a given entity. Its values each contain a start time while its keys are of
+ * the form:</p>
+ * <pre>
+ *   START_TIME_LOOKUP_PREFIX + entity type + entity id</pre>
+ *
+ * <p>The entity section is ordered by entity type, then entity start time
+ * descending, then entity ID. There are four sub-sections of the entity
+ * section: events, primary filters, related entities,
+ * and other info. The event entries have event info serialized into their
+ * values. The other info entries have values corresponding to the values of
+ * the other info name/value map for the entry (note the names are contained
+ * in the key). All other entries have empty values. The key structure is as
+ * follows:</p>
+ * <pre>
+ *   ENTITY_ENTRY_PREFIX + entity type + revstarttime + entity id
+ *
+ *   ENTITY_ENTRY_PREFIX + entity type + revstarttime + entity id +
+ *     EVENTS_COLUMN + reveventtimestamp + eventtype
+ *
+ *   ENTITY_ENTRY_PREFIX + entity type + revstarttime + entity id +
+ *     PRIMARY_FILTERS_COLUMN + name + value
+ *
+ *   ENTITY_ENTRY_PREFIX + entity type + revstarttime + entity id +
+ *     OTHER_INFO_COLUMN + name
+ *
+ *   ENTITY_ENTRY_PREFIX + entity type + revstarttime + entity id +
+ *     RELATED_ENTITIES_COLUMN + relatedentity type + relatedentity id
+ *
+ *   ENTITY_ENTRY_PREFIX + entity type + revstarttime + entity id +
+ *     INVISIBLE_REVERSE_RELATED_ENTITIES_COLUMN + relatedentity type +
+ *     relatedentity id</pre>
+ *
+ * <p>The indexed entity section contains a primary filter name and primary
+ * filter value as the prefix. Within a given name/value, entire entity
+ * entries are stored in the same format as described in the entity section
+ * above (below, "key" represents any one of the possible entity entry keys
+ * described above).</p>
+ * <pre>
+ *   INDEXED_ENTRY_PREFIX + primaryfilter name + primaryfilter value +
+ *     key</pre>
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Unstable
+public class LeveldbTimelineStore extends AbstractService
+    implements TimelineStore {
+  private static final Log LOG = LogFactory
+      .getLog(LeveldbTimelineStore.class);
+
+  private static final String FILENAME = "leveldb-timeline-store.ldb";
+
+  private static final byte[] START_TIME_LOOKUP_PREFIX = "k".getBytes();
+  private static final byte[] ENTITY_ENTRY_PREFIX = "e".getBytes();
+  private static final byte[] INDEXED_ENTRY_PREFIX = "i".getBytes();
+
+  private static final byte[] EVENTS_COLUMN = "e".getBytes();
+  private static final byte[] PRIMARY_FILTERS_COLUMN = "f".getBytes();
+  private static final byte[] OTHER_INFO_COLUMN = "i".getBytes();
+  private static final byte[] RELATED_ENTITIES_COLUMN = "r".getBytes();
+  private static final byte[] INVISIBLE_REVERSE_RELATED_ENTITIES_COLUMN =
+      "z".getBytes();
+
+  private static final byte[] EMPTY_BYTES = new byte[0];
+
+  private Map<EntityIdentifier, StartAndInsertTime> startTimeWriteCache;
+  private Map<EntityIdentifier, Long> startTimeReadCache;
+
+  /**
+   * Per-entity locks are obtained when writing.
+   */
+  private final LockMap<EntityIdentifier> writeLocks =
+      new LockMap<EntityIdentifier>();
+
+  private final ReentrantReadWriteLock deleteLock =
+      new ReentrantReadWriteLock();
+
+  private DB db;
+
+  private Thread deletionThread;
+
+  public LeveldbTimelineStore() {
+    super(LeveldbTimelineStore.class.getName());
+  }
+
+  @Override
+  @SuppressWarnings("unchecked")
+  protected void serviceInit(Configuration conf) throws Exception {
+    Options options = new Options();
+    options.createIfMissing(true);
+    options.cacheSize(conf.getLong(
+        YarnConfiguration.TIMELINE_SERVICE_LEVELDB_READ_CACHE_SIZE,
+        YarnConfiguration.DEFAULT_TIMELINE_SERVICE_LEVELDB_READ_CACHE_SIZE));
+    JniDBFactory factory = new JniDBFactory();
+    String path = conf.get(YarnConfiguration.TIMELINE_SERVICE_LEVELDB_PATH);
+    File p = new File(path);
+    if (!p.exists()) {
+      if (!p.mkdirs()) {
+        throw new IOException("Couldn't create directory for leveldb " +
+            "timeline store " + path);
+      }
+    }
+    LOG.info("Using leveldb path " + path);
+    db = factory.open(new File(path, FILENAME), options);
+    startTimeWriteCache =
+        Collections.synchronizedMap(new LRUMap(getStartTimeWriteCacheSize(
+            conf)));
+    startTimeReadCache =
+        Collections.synchronizedMap(new LRUMap(getStartTimeReadCacheSize(
+            conf)));
+
+    if (conf.getBoolean(YarnConfiguration.TIMELINE_SERVICE_TTL_ENABLE, true)) {
+      deletionThread = new EntityDeletionThread(conf);
+      deletionThread.start();
+    }
+
+    super.serviceInit(conf);
+  }
+
+  @Override
+  protected void serviceStop() throws Exception {
+    if (deletionThread != null) {
+      deletionThread.interrupt();
+      LOG.info("Waiting for deletion thread to complete its current action");
+      try {
+        deletionThread.join();
+      } catch (InterruptedException e) {
+        LOG.warn("Interrupted while waiting for deletion thread to complete," +
+            " closing db now", e);
+      }
+    }
+    IOUtils.cleanup(LOG, db);
+    super.serviceStop();
+  }
+
+  private static class StartAndInsertTime {
+    final long startTime;
+    final long insertTime;
+
+    public StartAndInsertTime(long startTime, long insertTime) {
+      this.startTime = startTime;
+      this.insertTime = insertTime;
+    }
+  }
+
+  private class EntityDeletionThread extends Thread {
+    private final long ttl;
+    private final long ttlInterval;
+
+    public EntityDeletionThread(Configuration conf) {
+      ttl  = conf.getLong(YarnConfiguration.TIMELINE_SERVICE_TTL_MS,
+          YarnConfiguration.DEFAULT_TIMELINE_SERVICE_TTL_MS);
+      ttlInterval = conf.getLong(
+          YarnConfiguration.TIMELINE_SERVICE_LEVELDB_TTL_INTERVAL_MS,
+          YarnConfiguration.DEFAULT_TIMELINE_SERVICE_LEVELDB_TTL_INTERVAL_MS);
+      LOG.info("Starting deletion thread with ttl " + ttl + " and cycle " +
+          "interval " + ttlInterval);
+    }
+
+    @Override
+    public void run() {
+      while (true) {
+        long timestamp = System.currentTimeMillis() - ttl;
+        try {
+          discardOldEntities(timestamp);
+          Thread.sleep(ttlInterval);
+        } catch (IOException e) {
+          LOG.error(e);
+        } catch (InterruptedException e) {
+          LOG.info("Deletion thread received interrupt, exiting");
+          break;
+        }
+      }
+    }
+  }
+
+  private static class LockMap<K> {
+    private static class CountingReentrantLock<K> extends ReentrantLock {
+      private static final long serialVersionUID = 1L;
+      private int count;
+      private K key;
+
+      CountingReentrantLock(K key) {
+        super();
+        this.count = 0;
+        this.key = key;
+      }
+    }
+
+    private Map<K, CountingReentrantLock<K>> locks =
+        new HashMap<K, CountingReentrantLock<K>>();
+
+    synchronized CountingReentrantLock<K> getLock(K key) {
+      CountingReentrantLock<K> lock = locks.get(key);
+      if (lock == null) {
+        lock = new CountingReentrantLock<K>(key);
+        locks.put(key, lock);
+      }
+
+      lock.count++;
+      return lock;
+    }
+
+    synchronized void returnLock(CountingReentrantLock<K> lock) {
+      if (lock.count == 0) {
+        throw new IllegalStateException("Returned lock more times than it " +
+            "was retrieved");
+      }
+      lock.count--;
+
+      if (lock.count == 0) {
+        locks.remove(lock.key);
+      }
+    }
+  }
+
+  private static class KeyBuilder {
+    private static final int MAX_NUMBER_OF_KEY_ELEMENTS = 10;
+    private byte[][] b;
+    private boolean[] useSeparator;
+    private int index;
+    private int length;
+
+    public KeyBuilder(int size) {
+      b = new byte[size][];
+      useSeparator = new boolean[size];
+      index = 0;
+      length = 0;
+    }
+
+    public static KeyBuilder newInstance() {
+      return new KeyBuilder(MAX_NUMBER_OF_KEY_ELEMENTS);
+    }
+
+    public KeyBuilder add(String s) {
+      return add(s.getBytes(), true);
+    }
+
+    public KeyBuilder add(byte[] t) {
+      return add(t, false);
+    }
+
+    public KeyBuilder add(byte[] t, boolean sep) {
+      b[index] = t;
+      useSeparator[index] = sep;
+      length += t.length;
+      if (sep) {
+        length++;
+      }
+      index++;
+      return this;
+    }
+
+    public byte[] getBytes() throws IOException {
+      ByteArrayOutputStream baos = new ByteArrayOutputStream(length);
+      for (int i = 0; i < index; i++) {
+        baos.write(b[i]);
+        if (i < index-1 && useSeparator[i]) {
+          baos.write(0x0);
+        }
+      }
+      return baos.toByteArray();
+    }
+
+    public byte[] getBytesForLookup() throws IOException {
+      ByteArrayOutputStream baos = new ByteArrayOutputStream(length);
+      for (int i = 0; i < index; i++) {
+        baos.write(b[i]);
+        if (useSeparator[i]) {
+          baos.write(0x0);
+        }
+      }
+      return baos.toByteArray();
+    }
+  }
+
+  private static class KeyParser {
+    private final byte[] b;
+    private int offset;
+
+    public KeyParser(byte[] b, int offset) {
+      this.b = b;
+      this.offset = offset;
+    }
+
+    public String getNextString() throws IOException {
+      if (offset >= b.length) {
+        throw new IOException(
+            "tried to read nonexistent string from byte array");
+      }
+      int i = 0;
+      while (offset+i < b.length && b[offset+i] != 0x0) {
+        i++;
+      }
+      String s = new String(b, offset, i);
+      offset = offset + i + 1;
+      return s;
+    }
+
+    public long getNextLong() throws IOException {
+      if (offset+8 >= b.length) {
+        throw new IOException("byte array ran out when trying to read long");
+      }
+      long l = readReverseOrderedLong(b, offset);
+      offset += 8;
+      return l;
+    }
+
+    public int getOffset() {
+      return offset;
+    }
+  }
+
+  @Override
+  public TimelineEntity getEntity(String entityId, String entityType,
+      EnumSet<Field> fields) throws IOException {
+    Long revStartTime = getStartTimeLong(entityId, entityType);
+    if (revStartTime == null) {
+      return null;
+    }
+    byte[] prefix = KeyBuilder.newInstance().add(ENTITY_ENTRY_PREFIX)
+        .add(entityType).add(writeReverseOrderedLong(revStartTime))
+        .add(entityId).getBytesForLookup();
+
+    DBIterator iterator = null;
+    try {
+      iterator = db.iterator();
+      iterator.seek(prefix);
+
+      return getEntity(entityId, entityType, revStartTime, fields, iterator,
+          prefix, prefix.length);
+    } finally {
+      IOUtils.cleanup(LOG, iterator);
+    }
+  }
+
+  /**
+   * Read entity from a db iterator.  If no information is found in the
+   * specified fields for this entity, return null.
+   */
+  private static TimelineEntity getEntity(String entityId, String entityType,
+      Long startTime, EnumSet<Field> fields, DBIterator iterator,
+      byte[] prefix, int prefixlen) throws IOException {
+    if (fields == null) {
+      fields = EnumSet.allOf(Field.class);
+    }
+
+    TimelineEntity entity = new TimelineEntity();
+    boolean events = false;
+    boolean lastEvent = false;
+    if (fields.contains(Field.EVENTS)) {
+      events = true;
+    } else if (fields.contains(Field.LAST_EVENT_ONLY)) {
+      lastEvent = true;
+    } else {
+      entity.setEvents(null);
+    }
+    boolean relatedEntities = false;
+    if (fields.contains(Field.RELATED_ENTITIES)) {
+      relatedEntities = true;
+    } else {
+      entity.setRelatedEntities(null);
+    }
+    boolean primaryFilters = false;
+    if (fields.contains(Field.PRIMARY_FILTERS)) {
+      primaryFilters = true;
+    } else {
+      entity.setPrimaryFilters(null);
+    }
+    boolean otherInfo = false;
+    if (fields.contains(Field.OTHER_INFO)) {
+      otherInfo = true;
+    } else {
+      entity.setOtherInfo(null);
+    }
+
+    // iterate through the entity's entry, parsing information if it is part
+    // of a requested field
+    for (; iterator.hasNext(); iterator.next()) {
+      byte[] key = iterator.peekNext().getKey();
+      if (!prefixMatches(prefix, prefixlen, key)) {
+        break;
+      }
+      if (key.length == prefixlen) {
+        continue;
+      }
+      if (key[prefixlen] == PRIMARY_FILTERS_COLUMN[0]) {
+        if (primaryFilters) {
+          addPrimaryFilter(entity, key,
+              prefixlen + PRIMARY_FILTERS_COLUMN.length);
+        }
+      } else if (key[prefixlen] == OTHER_INFO_COLUMN[0]) {
+        if (otherInfo) {
+          entity.addOtherInfo(parseRemainingKey(key,
+              prefixlen + OTHER_INFO_COLUMN.length),
+              GenericObjectMapper.read(iterator.peekNext().getValue()));
+        }
+      } else if (key[prefixlen] == RELATED_ENTITIES_COLUMN[0]) {
+        if (relatedEntities) {
+          addRelatedEntity(entity, key,
+              prefixlen + RELATED_ENTITIES_COLUMN.length);
+        }
+      } else if (key[prefixlen] == EVENTS_COLUMN[0]) {
+        if (events || (lastEvent &&
+            entity.getEvents().size() == 0)) {
+          TimelineEvent event = getEntityEvent(null, key, prefixlen +
+              EVENTS_COLUMN.length, iterator.peekNext().getValue());
+          if (event != null) {
+            entity.addEvent(event);
+          }
+        }
+      } else {
+        if (key[prefixlen] !=
+            INVISIBLE_REVERSE_RELATED_ENTITIES_COLUMN[0]) {
+          LOG.warn(String.format("Found unexpected column for entity %s of " +
+              "type %s (0x%02x)", entityId, entityType, key[prefixlen]));
+        }
+      }
+    }
+
+    entity.setEntityId(entityId);
+    entity.setEntityType(entityType);
+    entity.setStartTime(startTime);
+
+    return entity;
+  }
+
+  @Override
+  public TimelineEvents getEntityTimelines(String entityType,
+      SortedSet<String> entityIds, Long limit, Long windowStart,
+      Long windowEnd, Set<String> eventType) throws IOException {
+    TimelineEvents events = new TimelineEvents();
+    if (entityIds == null || entityIds.isEmpty()) {
+      return events;
+    }
+    // create a lexicographically-ordered map from start time to entities
+    Map<byte[], List<EntityIdentifier>> startTimeMap = new TreeMap<byte[],
+        List<EntityIdentifier>>(new Comparator<byte[]>() {
+          @Override
+          public int compare(byte[] o1, byte[] o2) {
+            return WritableComparator.compareBytes(o1, 0, o1.length, o2, 0,
+                o2.length);
+          }
+        });
+    DBIterator iterator = null;
+    try {
+      // look up start times for the specified entities
+      // skip entities with no start time
+      for (String entityId : entityIds) {
+        byte[] startTime = getStartTime(entityId, entityType);
+        if (startTime != null) {
+          List<EntityIdentifier> entities = startTimeMap.get(startTime);
+          if (entities == null) {
+            entities = new ArrayList<EntityIdentifier>();
+            startTimeMap.put(startTime, entities);
+          }
+          entities.add(new EntityIdentifier(entityId, entityType));
+        }
+      }
+      for (Entry<byte[], List<EntityIdentifier>> entry :
+          startTimeMap.entrySet()) {
+        // look up the events matching the given parameters (limit,
+        // start time, end time, event types) for entities whose start times
+        // were found and add the entities to the return list
+        byte[] revStartTime = entry.getKey();
+        for (EntityIdentifier entityIdentifier : entry.getValue()) {
+          EventsOfOneEntity entity = new EventsOfOneEntity();
+          entity.setEntityId(entityIdentifier.getId());
+          entity.setEntityType(entityType);
+          events.addEvent(entity);
+          KeyBuilder kb = KeyBuilder.newInstance().add(ENTITY_ENTRY_PREFIX)
+              .add(entityType).add(revStartTime).add(entityIdentifier.getId())
+              .add(EVENTS_COLUMN);
+          byte[] prefix = kb.getBytesForLookup();
+          if (windowEnd == null) {
+            windowEnd = Long.MAX_VALUE;
+          }
+          byte[] revts = writeReverseOrderedLong(windowEnd);
+          kb.add(revts);
+          byte[] first = kb.getBytesForLookup();
+          byte[] last = null;
+          if (windowStart != null) {
+            last = KeyBuilder.newInstance().add(prefix)
+                .add(writeReverseOrderedLong(windowStart)).getBytesForLookup();
+          }
+          if (limit == null) {
+            limit = DEFAULT_LIMIT;
+          }
+          iterator = db.iterator();
+          for (iterator.seek(first); entity.getEvents().size() < limit &&
+              iterator.hasNext(); iterator.next()) {
+            byte[] key = iterator.peekNext().getKey();
+            if (!prefixMatches(prefix, prefix.length, key) || (last != null &&
+                WritableComparator.compareBytes(key, 0, key.length, last, 0,
+                    last.length) > 0)) {
+              break;
+            }
+            TimelineEvent event = getEntityEvent(eventType, key, prefix.length,
+                iterator.peekNext().getValue());
+            if (event != null) {
+              entity.addEvent(event);
+            }
+          }
+        }
+      }
+    } finally {
+      IOUtils.cleanup(LOG, iterator);
+    }
+    return events;
+  }
+
+  /**
+   * Returns true if the byte array begins with the specified prefix.
+   */
+  private static boolean prefixMatches(byte[] prefix, int prefixlen,
+      byte[] b) {
+    if (b.length < prefixlen) {
+      return false;
+    }
+    return WritableComparator.compareBytes(prefix, 0, prefixlen, b, 0,
+        prefixlen) == 0;
+  }
+
+  @Override
+  public TimelineEntities getEntities(String entityType,
+      Long limit, Long windowStart, Long windowEnd, String fromId, Long fromTs,
+      NameValuePair primaryFilter, Collection<NameValuePair> secondaryFilters,
+      EnumSet<Field> fields) throws IOException {
+    if (primaryFilter == null) {
+      // if no primary filter is specified, prefix the lookup with
+      // ENTITY_ENTRY_PREFIX
+      return getEntityByTime(ENTITY_ENTRY_PREFIX, entityType, limit,
+          windowStart, windowEnd, fromId, fromTs, secondaryFilters, fields);
+    } else {
+      // if a primary filter is specified, prefix the lookup with
+      // INDEXED_ENTRY_PREFIX + primaryFilterName + primaryFilterValue +
+      // ENTITY_ENTRY_PREFIX
+      byte[] base = KeyBuilder.newInstance().add(INDEXED_ENTRY_PREFIX)
+          .add(primaryFilter.getName())
+          .add(GenericObjectMapper.write(primaryFilter.getValue()), true)
+          .add(ENTITY_ENTRY_PREFIX).getBytesForLookup();
+      return getEntityByTime(base, entityType, limit, windowStart, windowEnd,
+          fromId, fromTs, secondaryFilters, fields);
+    }
+  }
+
+  /**
+   * Retrieves a list of entities satisfying given parameters.
+   *
+   * @param base A byte array prefix for the lookup
+   * @param entityType The type of the entity
+   * @param limit A limit on the number of entities to return
+   * @param starttime The earliest entity start time to retrieve (exclusive)
+   * @param endtime The latest entity start time to retrieve (inclusive)
+   * @param fromId Retrieve entities starting with this entity
+   * @param fromTs Ignore entities with insert timestamp later than this ts
+   * @param secondaryFilters Filter pairs that the entities should match
+   * @param fields The set of fields to retrieve
+   * @return A list of entities
+   * @throws IOException
+   */
+  private TimelineEntities getEntityByTime(byte[] base,
+      String entityType, Long limit, Long starttime, Long endtime,
+      String fromId, Long fromTs, Collection<NameValuePair> secondaryFilters,
+      EnumSet<Field> fields) throws IOException {
+    DBIterator iterator = null;
+    try {
+      KeyBuilder kb = KeyBuilder.newInstance().add(base).add(entityType);
+      // only db keys matching the prefix (base + entity type) will be parsed
+      byte[] prefix = kb.getBytesForLookup();
+      if (endtime == null) {
+        // if end time is null, place no restriction on end time
+        endtime = Long.MAX_VALUE;
+      }
+      // construct a first key that will be seeked to using end time or fromId
+      byte[] first = null;
+      if (fromId != null) {
+        Long fromIdStartTime = getStartTimeLong(fromId, entityType);
+        if (fromIdStartTime == null) {
+          // no start time for provided id, so return empty entities
+          return new TimelineEntities();
+        }
+        if (fromIdStartTime <= endtime) {
+          // if provided id's start time falls before the end of the window,
+          // use it to construct the seek key
+          first = kb.add(writeReverseOrderedLong(fromIdStartTime))
+              .add(fromId).getBytesForLookup();
+        }
+      }
+      // if seek key wasn't constructed using fromId, construct it using end ts
+      if (first == null) {
+        first = kb.add(writeReverseOrderedLong(endtime)).getBytesForLookup();
+      }
+      byte[] last = null;
+      if (starttime != null) {
+        // if start time is not null, set a last key that will not be
+        // iterated past
+        last = KeyBuilder.newInstance().add(base).add(entityType)
+            .add(writeReverseOrderedLong(starttime)).getBytesForLookup();
+      }
+      if (limit == null) {
+        // if limit is not specified, use the default
+        limit = DEFAULT_LIMIT;
+      }
+
+      TimelineEntities entities = new TimelineEntities();
+      iterator = db.iterator();
+      iterator.seek(first);
+      // iterate until one of the following conditions is met: limit is
+      // reached, there are no more keys, the key prefix no longer matches,
+      // or a start time has been specified and reached/exceeded
+      while (entities.getEntities().size() < limit && iterator.hasNext()) {
+        byte[] key = iterator.peekNext().getKey();
+        if (!prefixMatches(prefix, prefix.length, key) || (last != null &&
+            WritableComparator.compareBytes(key, 0, key.length, last, 0,
+                last.length) > 0)) {
+          break;
+        }
+        // read the start time and entity id from the current key
+        KeyParser kp = new KeyParser(key, prefix.length);
+        Long startTime = kp.getNextLong();
+        String entityId = kp.getNextString();
+
+        if (fromTs != null) {
+          long insertTime = readReverseOrderedLong(iterator.peekNext()
+              .getValue(), 0);
+          if (insertTime > fromTs) {
+            byte[] firstKey = key;
+            while (iterator.hasNext() && prefixMatches(firstKey,
+                kp.getOffset(), key)) {
+              iterator.next();
+              key = iterator.peekNext().getKey();
+            }
+            continue;
+          }
+        }
+
+        // parse the entity that owns this key, iterating over all keys for
+        // the entity
+        TimelineEntity entity = getEntity(entityId, entityType, startTime,
+            fields, iterator, key, kp.getOffset());
+        // determine if the retrieved entity matches the provided secondary
+        // filters, and if so add it to the list of entities to return
+        boolean filterPassed = true;
+        if (secondaryFilters != null) {
+          for (NameValuePair filter : secondaryFilters) {
+            Object v = entity.getOtherInfo().get(filter.getName());
+            if (v == null) {
+              Set<Object> vs = entity.getPrimaryFilters()
+                  .get(filter.getName());
+              if (vs != null && !vs.contains(filter.getValue())) {
+                filterPassed = false;
+                break;
+              }
+            } else if (!v.equals(filter.getValue())) {
+              filterPassed = false;
+              break;
+            }
+          }
+        }
+        if (filterPassed) {
+          entities.addEntity(entity);
+        }
+      }
+      return entities;
+    } finally {
+      IOUtils.cleanup(LOG, iterator);
+    }
+  }
+
+  /**
+   * Put a single entity.  If there is an error, add a TimelinePutError to the
+   * given response.
+   */
+  private void put(TimelineEntity entity, TimelinePutResponse response) {
+    LockMap.CountingReentrantLock<EntityIdentifier> lock =
+        writeLocks.getLock(new EntityIdentifier(entity.getEntityId(),
+            entity.getEntityType()));
+    lock.lock();
+    WriteBatch writeBatch = null;
+    List<EntityIdentifier> relatedEntitiesWithoutStartTimes =
+        new ArrayList<EntityIdentifier>();
+    byte[] revStartTime = null;
+    try {
+      writeBatch = db.createWriteBatch();
+      List<TimelineEvent> events = entity.getEvents();
+      // look up the start time for the entity
+      StartAndInsertTime startAndInsertTime = getAndSetStartTime(
+          entity.getEntityId(), entity.getEntityType(),
+          entity.getStartTime(), events);
+      if (startAndInsertTime == null) {
+        // if no start time is found, add an error and return
+        TimelinePutError error = new TimelinePutError();
+        error.setEntityId(entity.getEntityId());
+        error.setEntityType(entity.getEntityType());
+        error.setErrorCode(TimelinePutError.NO_START_TIME);
+        response.addError(error);
+        return;
+      }
+      revStartTime = writeReverseOrderedLong(startAndInsertTime
+          .startTime);
+
+      Map<String, Set<Object>> primaryFilters = entity.getPrimaryFilters();
+
+      // write entity marker
+      byte[] markerKey = createEntityMarkerKey(entity.getEntityId(),
+          entity.getEntityType(), revStartTime);
+      byte[] markerValue = writeReverseOrderedLong(startAndInsertTime
+          .insertTime);
+      writeBatch.put(markerKey, markerValue);
+      writePrimaryFilterEntries(writeBatch, primaryFilters, markerKey,
+          markerValue);
+
+      // write event entries
+      if (events != null && !events.isEmpty()) {
+        for (TimelineEvent event : events) {
+          byte[] revts = writeReverseOrderedLong(event.getTimestamp());
+          byte[] key = createEntityEventKey(entity.getEntityId(),
+              entity.getEntityType(), revStartTime, revts,
+              event.getEventType());
+          byte[] value = GenericObjectMapper.write(event.getEventInfo());
+          writeBatch.put(key, value);
+          writePrimaryFilterEntries(writeBatch, primaryFilters, key, value);
+        }
+      }
+
+      // write related entity entries
+      Map<String, Set<String>> relatedEntities =
+          entity.getRelatedEntities();
+      if (relatedEntities != null && !relatedEntities.isEmpty()) {
+        for (Entry<String, Set<String>> relatedEntityList :
+            relatedEntities.entrySet()) {
+          String relatedEntityType = relatedEntityList.getKey();
+          for (String relatedEntityId : relatedEntityList.getValue()) {
+            // invisible "reverse" entries (entity -> related entity)
+            byte[] key = createReverseRelatedEntityKey(entity.getEntityId(),
+                entity.getEntityType(), revStartTime, relatedEntityId,
+                relatedEntityType);
+            writeBatch.put(key, EMPTY_BYTES);
+            // look up start time of related entity
+            byte[] relatedEntityStartTime = getStartTime(relatedEntityId,
+                relatedEntityType);
+            // delay writing the related entity if no start time is found
+            if (relatedEntityStartTime == null) {
+              relatedEntitiesWithoutStartTimes.add(
+                  new EntityIdentifier(relatedEntityId, relatedEntityType));
+              continue;
+            }
+            // write "forward" entry (related entity -> entity)
+            key = createRelatedEntityKey(relatedEntityId,
+                relatedEntityType, relatedEntityStartTime,
+                entity.getEntityId(), entity.getEntityType());
+            writeBatch.put(key, EMPTY_BYTES);
+          }
+        }
+      }
+
+      // write primary filter entries
+      if (primaryFilters != null && !primaryFilters.isEmpty()) {
+        for (Entry<String, Set<Object>> primaryFilter :
+            primaryFilters.entrySet()) {
+          for (Object primaryFilterValue : primaryFilter.getValue()) {
+            byte[] key = createPrimaryFilterKey(entity.getEntityId(),
+                entity.getEntityType(), revStartTime,
+                primaryFilter.getKey(), primaryFilterValue);
+            writeBatch.put(key, EMPTY_BYTES);
+            writePrimaryFilterEntries(writeBatch, primaryFilters, key,
+                EMPTY_BYTES);
+          }
+        }
+      }
+
+      // write other info entries
+      Map<String, Object> otherInfo = entity.getOtherInfo();
+      if (otherInfo != null && !otherInfo.isEmpty()) {
+        for (Entry<String, Object> i : otherInfo.entrySet()) {
+          byte[] key = createOtherInfoKey(entity.getEntityId(),
+              entity.getEntityType(), revStartTime, i.getKey());
+          byte[] value = GenericObjectMapper.write(i.getValue());
+          writeBatch.put(key, value);
+          writePrimaryFilterEntries(writeBatch, primaryFilters, key, value);
+        }
+      }
+      db.write(writeBatch);
+    } catch (IOException e) {
+      LOG.error("Error putting entity " + entity.getEntityId() +
+          " of type " + entity.getEntityType(), e);
+      TimelinePutError error = new TimelinePutError();
+      error.setEntityId(entity.getEntityId());
+      error.setEntityType(entity.getEntityType());
+      error.setErrorCode(TimelinePutError.IO_EXCEPTION);
+      response.addError(error);
+    } finally {
+      lock.unlock();
+      writeLocks.returnLock(lock);
+      IOUtils.cleanup(LOG, writeBatch);
+    }
+
+    for (EntityIdentifier relatedEntity : relatedEntitiesWithoutStartTimes) {
+      lock = writeLocks.getLock(relatedEntity);
+      lock.lock();
+      try {
+        StartAndInsertTime relatedEntityStartAndInsertTime =
+            getAndSetStartTime(relatedEntity.getId(), relatedEntity.getType(),
+            readReverseOrderedLong(revStartTime, 0), null);
+        if (relatedEntityStartAndInsertTime == null) {
+          throw new IOException("Error setting start time for related entity");
+        }
+        byte[] relatedEntityStartTime = writeReverseOrderedLong(
+            relatedEntityStartAndInsertTime.startTime);
+        db.put(createRelatedEntityKey(relatedEntity.getId(),
+            relatedEntity.getType(), relatedEntityStartTime,
+            entity.getEntityId(), entity.getEntityType()), EMPTY_BYTES);
+        db.put(createEntityMarkerKey(relatedEntity.getId(),
+            relatedEntity.getType(), relatedEntityStartTime),
+            writeReverseOrderedLong(relatedEntityStartAndInsertTime
+                .insertTime));
+      } catch (IOException e) {
+        LOG.error("Error putting related entity " + relatedEntity.getId() +
+            " of type " + relatedEntity.getType() + " for entity " +
+            entity.getEntityId() + " of type " + entity.getEntityType(), e);
+        TimelinePutError error = new TimelinePutError();
+        error.setEntityId(entity.getEntityId());
+        error.setEntityType(entity.getEntityType());
+        error.setErrorCode(TimelinePutError.IO_EXCEPTION);
+        response.addError(error);
+      } finally {
+        lock.unlock();
+        writeLocks.returnLock(lock);
+      }
+    }
+  }
+
+  /**
+   * For a given key / value pair that has been written to the db,
+   * write additional entries to the db for each primary filter.
+   */
+  private static void writePrimaryFilterEntries(WriteBatch writeBatch,
+      Map<String, Set<Object>> primaryFilters, byte[] key, byte[] value)
+      throws IOException {
+    if (primaryFilters != null && !primaryFilters.isEmpty()) {
+      for (Entry<String, Set<Object>> pf : primaryFilters.entrySet()) {
+        for (Object pfval : pf.getValue()) {
+          writeBatch.put(addPrimaryFilterToKey(pf.getKey(), pfval,
+              key), value);
+        }
+      }
+    }
+  }
+
+  @Override
+  public TimelinePutResponse put(TimelineEntities entities) {
+    try {
+      deleteLock.readLock().lock();
+      TimelinePutResponse response = new TimelinePutResponse();
+      for (TimelineEntity entity : entities.getEntities()) {
+        put(entity, response);
+      }
+      return response;
+    } finally {
+      deleteLock.readLock().unlock();
+    }
+  }
+
+  /**
+   * Get the unique start time for a given entity as a byte array that sorts
+   * the timestamps in reverse order (see {@link
+   * GenericObjectMapper#writeReverseOrderedLong(long)}).
+   *
+   * @param entityId The id of the entity
+   * @param entityType The type of the entity
+   * @return A byte array, null if not found
+   * @throws IOException
+   */
+  private byte[] getStartTime(String entityId, String entityType)
+      throws IOException {
+    Long l = getStartTimeLong(entityId, entityType);
+    return l == null ? null : writeReverseOrderedLong(l);
+  }
+
+  /**
+   * Get the unique start time for a given entity as a Long.
+   *
+   * @param entityId The id of the entity
+   * @param entityType The type of the entity
+   * @return A Long, null if not found
+   * @throws IOException
+   */
+  private Long getStartTimeLong(String entityId, String entityType)
+      throws IOException {
+    EntityIdentifier entity = new EntityIdentifier(entityId, entityType);
+    // start time is not provided, so try to look it up
+    if (startTimeReadCache.containsKey(entity)) {
+      // found the start time in the cache
+      return startTimeReadCache.get(entity);
+    } else {
+      // try to look up the start time in the db
+      byte[] b = createStartTimeLookupKey(entity.getId(), entity.getType());
+      byte[] v = db.get(b);
+      if (v == null) {
+        // did not find the start time in the db
+        return null;
+      } else {
+        // found the start time in the db
+        Long l = readReverseOrderedLong(v, 0);
+        startTimeReadCache.put(entity, l);
+        return l;
+      }
+    }
+  }
+
+  /**
+   * Get the unique start time for a given entity as a byte array that sorts
+   * the timestamps in reverse order (see {@link
+   * GenericObjectMapper#writeReverseOrderedLong(long)}). If the start time
+   * doesn't exist, set it based on the information provided. Should only be
+   * called when a lock has been obtained on the entity.
+   *
+   * @param entityId The id of the entity
+   * @param entityType The type of the entity
+   * @param startTime The start time of the entity, or null
+   * @param events A list of events for the entity, or null
+   * @return A StartAndInsertTime
+   * @throws IOException
+   */
+  private StartAndInsertTime getAndSetStartTime(String entityId,
+      String entityType, Long startTime, List<TimelineEvent> events)
+      throws IOException {
+    EntityIdentifier entity = new EntityIdentifier(entityId, entityType);
+    if (startTime == null) {
+      // start time is not provided, so try to look it up
+      if (startTimeWriteCache.containsKey(entity)) {
+        // found the start time in the cache
+        return startTimeWriteCache.get(entity);
+      } else {
+        if (events != null) {
+          // prepare a start time from events in case it is needed
+          Long min = Long.MAX_VALUE;
+          for (TimelineEvent e : events) {
+            if (min > e.getTimestamp()) {
+              min = e.getTimestamp();
+            }
+          }
+          startTime = min;
+        }
+        return checkStartTimeInDb(entity, startTime);
+      }
+    } else {
+      // start time is provided
+      if (startTimeWriteCache.containsKey(entity)) {
+        // always use start time from cache if it exists
+        return startTimeWriteCache.get(entity);
+      } else {
+        // check the provided start time matches the db
+        return checkStartTimeInDb(entity, startTime);
+      }
+    }
+  }
+
+  /**
+   * Checks db for start time and returns it if it exists.  If it doesn't
+   * exist, writes the suggested start time (if it is not null).  This is
+   * only called when the start time is not found in the cache,
+   * so it adds it back into the cache if it is found. Should only be called
+   * when a lock has been obtained on the entity.
+   */
+  private StartAndInsertTime checkStartTimeInDb(EntityIdentifier entity,
+      Long suggestedStartTime) throws IOException {
+    StartAndInsertTime startAndInsertTime = null;
+    // create lookup key for start time
+    byte[] b = createStartTimeLookupKey(entity.getId(), entity.getType());
+    // retrieve value for key
+    byte[] v = db.get(b);
+    if (v == null) {
+      // start time doesn't exist in db
+      if (suggestedStartTime == null) {
+        return null;
+      }
+      startAndInsertTime = new StartAndInsertTime(suggestedStartTime,
+          System.currentTimeMillis());
+
+      // write suggested start time
+      v = new byte[16];
+      writeReverseOrderedLong(suggestedStartTime, v, 0);
+      writeReverseOrderedLong(startAndInsertTime.insertTime, v, 8);
+      WriteOptions writeOptions = new WriteOptions();
+      writeOptions.sync(true);
+      db.put(b, v, writeOptions);
+    } else {
+      // found start time in db, so ignore suggested start time
+      startAndInsertTime = new StartAndInsertTime(readReverseOrderedLong(v, 0),
+          readReverseOrderedLong(v, 8));
+    }
+    startTimeWriteCache.put(entity, startAndInsertTime);
+    startTimeReadCache.put(entity, startAndInsertTime.startTime);
+    return startAndInsertTime;
+  }
+
+  /**
+   * Creates a key for looking up the start time of a given entity,
+   * of the form START_TIME_LOOKUP_PREFIX + entity type + entity id.
+   */
+  private static byte[] createStartTimeLookupKey(String entityId,
+      String entityType) throws IOException {
+    return KeyBuilder.newInstance().add(START_TIME_LOOKUP_PREFIX)
+        .add(entityType).add(entityId).getBytes();
+  }
+
+  /**
+   * Creates an entity marker, serializing ENTITY_ENTRY_PREFIX + entity type +
+   * revstarttime + entity id.
+   */
+  private static byte[] createEntityMarkerKey(String entityId,
+      String entityType, byte[] revStartTime) throws IOException {
+    return KeyBuilder.newInstance().add(ENTITY_ENTRY_PREFIX)
+        .add(entityType).add(revStartTime).add(entityId).getBytesForLookup();
+  }
+
+  /**
+   * Creates an index entry for the given key of the form
+   * INDEXED_ENTRY_PREFIX + primaryfiltername + primaryfiltervalue + key.
+   */
+  private static byte[] addPrimaryFilterToKey(String primaryFilterName,
+      Object primaryFilterValue, byte[] key) throws IOException {
+    return KeyBuilder.newInstance().add(INDEXED_ENTRY_PREFIX)
+        .add(primaryFilterName)
+        .add(GenericObjectMapper.write(primaryFilterValue), true).add(key)
+        .getBytes();
+  }
+
+  /**
+   * Creates an event key, serializing ENTITY_ENTRY_PREFIX + entity type +
+   * revstarttime + entity id + EVENTS_COLUMN + reveventtimestamp + event type.
+   */
+  private static byte[] createEntityEventKey(String entityId,
+      String entityType, byte[] revStartTime, byte[] revEventTimestamp,
+      String eventType) throws IOException {
+    return KeyBuilder.newInstance().add(ENTITY_ENTRY_PREFIX)
+        .add(entityType).add(revStartTime).add(entityId).add(EVENTS_COLUMN)
+        .add(revEventTimestamp).add(eventType).getBytes();
+  }
+
+  /**
+   * Creates an event object from the given key, offset, and value.  If the
+   * event type is not contained in the specified set of event types,
+   * returns null.
+   */
+  private static TimelineEvent getEntityEvent(Set<String> eventTypes,
+      byte[] key, int offset, byte[] value) throws IOException {
+    KeyParser kp = new KeyParser(key, offset);
+    long ts = kp.getNextLong();
+    String tstype = kp.getNextString();
+    if (eventTypes == null || eventTypes.contains(tstype)) {
+      TimelineEvent event = new TimelineEvent();
+      event.setTimestamp(ts);
+      event.setEventType(tstype);
+      Object o = GenericObjectMapper.read(value);
+      if (o == null) {
+        event.setEventInfo(null);
+      } else if (o instanceof Map) {
+        @SuppressWarnings("unchecked")
+        Map<String, Object> m = (Map<String, Object>) o;
+        event.setEventInfo(m);
+      } else {
+        throw new IOException("Couldn't deserialize event info map");
+      }
+      return event;
+    }
+    return null;
+  }
+
+  /**
+   * Creates a primary filter key, serializing ENTITY_ENTRY_PREFIX +
+   * entity type + revstarttime + entity id + PRIMARY_FILTERS_COLUMN + name +
+   * value.
+   */
+  private static byte[] createPrimaryFilterKey(String entityId,
+      String entityType, byte[] revStartTime, String name, Object value)
+      throws IOException {
+    return KeyBuilder.newInstance().add(ENTITY_ENTRY_PREFIX).add(entityType)
+        .add(revStartTime).add(entityId).add(PRIMARY_FILTERS_COLUMN).add(name)
+        .add(GenericObjectMapper.write(value)).getBytes();
+  }
+
+  /**
+   * Parses the primary filter from the given key at the given offset and
+   * adds it to the given entity.
+   */
+  private static void addPrimaryFilter(TimelineEntity entity, byte[] key,
+      int offset) throws IOException {
+    KeyParser kp = new KeyParser(key, offset);
+    String name = kp.getNextString();
+    Object value = GenericObjectMapper.read(key, kp.getOffset());
+    entity.addPrimaryFilter(name, value);
+  }
+
+  /**
+   * Creates an other info key, serializing ENTITY_ENTRY_PREFIX + entity type +
+   * revstarttime + entity id + OTHER_INFO_COLUMN + name.
+   */
+  private static byte[] createOtherInfoKey(String entityId, String entityType,
+      byte[] revStartTime, String name) throws IOException {
+    return KeyBuilder.newInstance().add(ENTITY_ENTRY_PREFIX).add(entityType)
+        .add(revStartTime).add(entityId).add(OTHER_INFO_COLUMN).add(name)
+        .getBytes();
+  }
+
+  /**
+   * Creates a string representation of the byte array from the given offset
+   * to the end of the array (for parsing other info keys).
+   */
+  private static String parseRemainingKey(byte[] b, int offset) {
+    return new String(b, offset, b.length - offset);
+  }
+
+  /**
+   * Creates a related entity key, serializing ENTITY_ENTRY_PREFIX +
+   * entity type + revstarttime + entity id + RELATED_ENTITIES_COLUMN +
+   * relatedentity type + relatedentity id.
+   */
+  private static byte[] createRelatedEntityKey(String entityId,
+      String entityType, byte[] revStartTime, String relatedEntityId,
+      String relatedEntityType) throws IOException {
+    return KeyBuilder.newInstance().add(ENTITY_ENTRY_PREFIX).add(entityType)
+        .add(revStartTime).add(entityId).add(RELATED_ENTITIES_COLUMN)
+        .add(relatedEntityType).add(relatedEntityId).getBytes();
+  }
+
+  /**
+   * Parses the related entity from the given key at the given offset and
+   * adds it to the given entity.
+   */
+  private static void addRelatedEntity(TimelineEntity entity, byte[] key,
+      int offset) throws IOException {
+    KeyParser kp = new KeyParser(key, offset);
+    String type = kp.getNextString();
+    String id = kp.getNextString();
+    entity.addRelatedEntity(type, id);
+  }
+
+  /**
+   * Creates a reverse related entity key, serializing ENTITY_ENTRY_PREFIX +
+   * entity type + revstarttime + entity id +
+   * INVISIBLE_REVERSE_RELATED_ENTITIES_COLUMN +
+   * relatedentity type + relatedentity id.
+   */
+  private static byte[] createReverseRelatedEntityKey(String entityId,
+      String entityType, byte[] revStartTime, String relatedEntityId,
+      String relatedEntityType) throws IOException {
+    return KeyBuilder.newInstance().add(ENTITY_ENTRY_PREFIX).add(entityType)
+        .add(revStartTime).add(entityId)
+        .add(INVISIBLE_REVERSE_RELATED_ENTITIES_COLUMN)
+        .add(relatedEntityType).add(relatedEntityId).getBytes();
+  }
+
+  /**
+   * Clears the cache to test reloading start times from leveldb (only for
+   * testing).
+   */
+  @VisibleForTesting
+  void clearStartTimeCache() {
+    startTimeWriteCache.clear();
+    startTimeReadCache.clear();
+  }
+
+  @VisibleForTesting
+  static int getStartTimeReadCacheSize(Configuration conf) {
+    return conf.getInt(
+        YarnConfiguration.TIMELINE_SERVICE_LEVELDB_START_TIME_READ_CACHE_SIZE,
+        YarnConfiguration.
+            DEFAULT_TIMELINE_SERVICE_LEVELDB_START_TIME_READ_CACHE_SIZE);
+  }
+
+  @VisibleForTesting
+  static int getStartTimeWriteCacheSize(Configuration conf) {
+    return conf.getInt(
+        YarnConfiguration.TIMELINE_SERVICE_LEVELDB_START_TIME_WRITE_CACHE_SIZE,
+        YarnConfiguration.
+            DEFAULT_TIMELINE_SERVICE_LEVELDB_START_TIME_WRITE_CACHE_SIZE);
+  }
+
+  // warning is suppressed to prevent eclipse from noting unclosed resource
+  @SuppressWarnings("resource")
+  @VisibleForTesting
+  List<String> getEntityTypes() throws IOException {
+    DBIterator iterator = null;
+    try {
+      iterator = getDbIterator(false);
+      List<String> entityTypes = new ArrayList<String>();
+      iterator.seek(ENTITY_ENTRY_PREFIX);
+      while (iterator.hasNext()) {
+        byte[] key = iterator.peekNext().getKey();
+        if (key[0] != ENTITY_ENTRY_PREFIX[0]) {
+          break;
+        }
+        KeyParser kp = new KeyParser(key,
+            ENTITY_ENTRY_PREFIX.length);
+        String entityType = kp.getNextString();
+        entityTypes.add(entityType);
+        byte[] lookupKey = KeyBuilder.newInstance().add(ENTITY_ENTRY_PREFIX)
+            .add(entityType).getBytesForLookup();
+        if (lookupKey[lookupKey.length - 1] != 0x0) {
+          throw new IOException("Found unexpected end byte in lookup key");
+        }
+        lookupKey[lookupKey.length - 1] = 0x1;
+        iterator.seek(lookupKey);
+      }
+      return entityTypes;
+    } finally {
+      IOUtils.cleanup(LOG, iterator);
+    }
+  }
+
+  /**
+   * Finds all keys in the db that have a given prefix and deletes them on
+   * the given write batch.
+   */
+  private void deleteKeysWithPrefix(WriteBatch writeBatch, byte[] prefix,
+      DBIterator iterator) {
+    for (iterator.seek(prefix); iterator.hasNext(); iterator.next()) {
+      byte[] key = iterator.peekNext().getKey();
+      if (!prefixMatches(prefix, prefix.length, key)) {
+        break;
+      }
+      writeBatch.delete(key);
+    }
+  }
+
+  @VisibleForTesting
+  boolean deleteNextEntity(String entityType, byte[] reverseTimestamp,
+      DBIterator iterator, DBIterator pfIterator, boolean seeked)
+      throws IOException {
+    WriteBatch writeBatch = null;
+    try {
+      KeyBuilder kb = KeyBuilder.newInstance().add(ENTITY_ENTRY_PREFIX)
+          .add(entityType);
+      byte[] typePrefix = kb.getBytesForLookup();
+      kb.add(reverseTimestamp);
+      if (!seeked) {
+        iterator.seek(kb.getBytesForLookup());
+      }
+      if (!iterator.hasNext()) {
+        return false;
+      }
+      byte[] entityKey = iterator.peekNext().getKey();
+      if (!prefixMatches(typePrefix, typePrefix.length, entityKey)) {
+        return false;
+      }
+
+      // read the start time and entity id from the current key
+      KeyParser kp = new KeyParser(entityKey, typePrefix.length + 8);
+      String entityId = kp.getNextString();
+      int prefixlen = kp.getOffset();
+      byte[] deletePrefix = new byte[prefixlen];
+      System.arraycopy(entityKey, 0, deletePrefix, 0, prefixlen);
+
+      writeBatch = db.createWriteBatch();
+
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Deleting entity type:" + entityType + " id:" + entityId);
+      }
+      // remove start time from cache and db
+      writeBatch.delete(createStartTimeLookupKey(entityId, entityType));
+      EntityIdentifier entityIdentifier =
+          new EntityIdentifier(entityId, entityType);
+      startTimeReadCache.remove(entityIdentifier);
+      startTimeWriteCache.remove(entityIdentifier);
+
+      // delete current entity
+      for (; iterator.hasNext(); iterator.next()) {
+        byte[] key = iterator.peekNext().getKey();
+        if (!prefixMatches(entityKey, prefixlen, key)) {
+          break;
+        }
+        writeBatch.delete(key);
+
+        if (key.length == prefixlen) {
+          continue;
+        }
+        if (key[prefixlen] == PRIMARY_FILTERS_COLUMN[0]) {
+          kp = new KeyParser(key,
+              prefixlen + PRIMARY_FILTERS_COLUMN.length);
+          String name = kp.getNextString();
+          Object value = GenericObjectMapper.read(key, kp.getOffset());
+          deleteKeysWithPrefix(writeBatch, addPrimaryFilterToKey(name, value,
+              deletePrefix), pfIterator);
+          if (LOG.isDebugEnabled()) {
+            LOG.debug("Deleting entity type:" + entityType + " id:" +
+                entityId + " primary filter entry " + name + " " +
+                value);
+          }
+        } else if (key[prefixlen] == RELATED_ENTITIES_COLUMN[0]) {
+          kp = new KeyParser(key,
+              prefixlen + RELATED_ENTITIES_COLUMN.length);
+          String type = kp.getNextString();
+          String id = kp.getNextString();
+          byte[] relatedEntityStartTime = getStartTime(id, type);
+          if (relatedEntityStartTime == null) {
+            LOG.warn("Found no start time for " +
+                "related entity " + id + " of type " + type + " while " +
+                "deleting " + entityId + " of type " + entityType);
+            continue;
+          }
+          writeBatch.delete(createReverseRelatedEntityKey(id, type,
+              relatedEntityStartTime, entityId, entityType));
+          if (LOG.isDebugEnabled()) {
+            LOG.debug("Deleting entity type:" + entityType + " id:" +
+                entityId + " from invisible reverse related entity " +
+                "entry of type:" + type + " id:" + id);
+          }
+        } else if (key[prefixlen] ==
+            INVISIBLE_REVERSE_RELATED_ENTITIES_COLUMN[0]) {
+          kp = new KeyParser(key, prefixlen +
+              INVISIBLE_REVERSE_RELATED_ENTITIES_COLUMN.length);
+          String type = kp.getNextString();
+          String id = kp.getNextString();
+          byte[] relatedEntityStartTime = getStartTime(id, type);
+          if (relatedEntityStartTime == null) {
+            LOG.warn("Found no start time for reverse " +
+                "related entity " + id + " of type " + type + " while " +
+                "deleting " + entityId + " of type " + entityType);
+            continue;
+          }
+          writeBatch.delete(createRelatedEntityKey(id, type,
+              relatedEntityStartTime, entityId, entityType));
+          if (LOG.isDebugEnabled()) {
+            LOG.debug("Deleting entity type:" + entityType + " id:" +
+                entityId + " from related entity entry of type:" +
+                type + " id:" + id);
+          }
+        }
+      }
+      WriteOptions writeOptions = new WriteOptions();
+      writeOptions.sync(true);
+      db.write(writeBatch, writeOptions);
+      return true;
+    } finally {
+      IOUtils.cleanup(LOG, writeBatch);
+    }
+  }
+
+  /**
+   * Discards entities with start timestamp less than or equal to the given
+   * timestamp.
+   */
+  @VisibleForTesting
+  void discardOldEntities(long timestamp)
+      throws IOException, InterruptedException {
+    byte[] reverseTimestamp = writeReverseOrderedLong(timestamp);
+    long totalCount = 0;
+    long t1 = System.currentTimeMillis();
+    try {
+      List<String> entityTypes = getEntityTypes();
+      for (String entityType : entityTypes) {
+        DBIterator iterator = null;
+        DBIterator pfIterator = null;
+        long typeCount = 0;
+        try {
+          deleteLock.writeLock().lock();
+          iterator = getDbIterator(false);
+          pfIterator = getDbIterator(false);
+
+          if (deletionThread != null && deletionThread.isInterrupted()) {
+            throw new InterruptedException();
+          }
+          boolean seeked = false;
+          while (deleteNextEntity(entityType, reverseTimestamp, iterator,
+              pfIterator, seeked)) {
+            typeCount++;
+            totalCount++;
+            seeked = true;
+            if (deletionThread != null && deletionThread.isInterrupted()) {
+              throw new InterruptedException();
+            }
+          }
+        } catch (IOException e) {
+          LOG.error("Got IOException while deleting entities for type " +
+              entityType + ", continuing to next type", e);
+        } finally {
+          IOUtils.cleanup(LOG, iterator, pfIterator);
+          deleteLock.writeLock().unlock();
+          if (typeCount > 0) {
+            LOG.info("Deleted " + typeCount + " entities of type " +
+                entityType);
+          }
+        }
+      }
+    } finally {
+      long t2 = System.currentTimeMillis();
+      LOG.info("Discarded " + totalCount + " entities for timestamp " +
+          timestamp + " and earlier in " + (t2 - t1) / 1000.0 + " seconds");
+    }
+  }
+
+  @VisibleForTesting
+  DBIterator getDbIterator(boolean fillCache) {
+    ReadOptions readOptions = new ReadOptions();
+    readOptions.fillCache(fillCache);
+    return db.iterator(readOptions);
+  }
+}

http://git-wip-us.apache.org/repos/asf/ambari/blob/a52f8a55/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/timeline/MemoryTimelineStore.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/timeline/MemoryTimelineStore.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/timeline/MemoryTimelineStore.java
new file mode 100644
index 0000000..86ac1f8
--- /dev/null
+++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/timeline/MemoryTimelineStore.java
@@ -0,0 +1,360 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.applicationhistoryservice.timeline;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.EnumSet;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.PriorityQueue;
+import java.util.Set;
+import java.util.SortedSet;
+import java.util.TreeSet;
+
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.classification.InterfaceStability.Unstable;
+import org.apache.hadoop.service.AbstractService;
+import org.apache.hadoop.yarn.api.records.timeline.TimelineEntities;
+import org.apache.hadoop.yarn.api.records.timeline.TimelineEntity;
+import org.apache.hadoop.yarn.api.records.timeline.TimelineEvent;
+import org.apache.hadoop.yarn.api.records.timeline.TimelineEvents;
+import org.apache.hadoop.yarn.api.records.timeline.TimelinePutResponse;
+import org.apache.hadoop.yarn.api.records.timeline.TimelineEvents.EventsOfOneEntity;
+import org.apache.hadoop.yarn.api.records.timeline.TimelinePutResponse.TimelinePutError;
+
+/**
+ * In-memory implementation of {@link TimelineStore}. This
+ * implementation is for test purpose only. If users improperly instantiate it,
+ * they may encounter reading and writing history data in different memory
+ * store.
+ * 
+ */
+@Private
+@Unstable
+public class MemoryTimelineStore
+    extends AbstractService implements TimelineStore {
+
+  private Map<EntityIdentifier, TimelineEntity> entities =
+      new HashMap<EntityIdentifier, TimelineEntity>();
+  private Map<EntityIdentifier, Long> entityInsertTimes =
+      new HashMap<EntityIdentifier, Long>();
+
+  public MemoryTimelineStore() {
+    super(MemoryTimelineStore.class.getName());
+  }
+
+  @Override
+  public TimelineEntities getEntities(String entityType, Long limit,
+      Long windowStart, Long windowEnd, String fromId, Long fromTs,
+      NameValuePair primaryFilter, Collection<NameValuePair> secondaryFilters,
+      EnumSet<Field> fields) {
+    if (limit == null) {
+      limit = DEFAULT_LIMIT;
+    }
+    if (windowStart == null) {
+      windowStart = Long.MIN_VALUE;
+    }
+    if (windowEnd == null) {
+      windowEnd = Long.MAX_VALUE;
+    }
+    if (fields == null) {
+      fields = EnumSet.allOf(Field.class);
+    }
+
+    Iterator<TimelineEntity> entityIterator = null;
+    if (fromId != null) {
+      TimelineEntity firstEntity = entities.get(new EntityIdentifier(fromId,
+          entityType));
+      if (firstEntity == null) {
+        return new TimelineEntities();
+      } else {
+        entityIterator = new TreeSet<TimelineEntity>(entities.values())
+            .tailSet(firstEntity, true).iterator();
+      }
+    }
+    if (entityIterator == null) {
+      entityIterator = new PriorityQueue<TimelineEntity>(entities.values())
+          .iterator();
+    }
+
+    List<TimelineEntity> entitiesSelected = new ArrayList<TimelineEntity>();
+    while (entityIterator.hasNext()) {
+      TimelineEntity entity = entityIterator.next();
+      if (entitiesSelected.size() >= limit) {
+        break;
+      }
+      if (!entity.getEntityType().equals(entityType)) {
+        continue;
+      }
+      if (entity.getStartTime() <= windowStart) {
+        continue;
+      }
+      if (entity.getStartTime() > windowEnd) {
+        continue;
+      }
+      if (fromTs != null && entityInsertTimes.get(new EntityIdentifier(
+          entity.getEntityId(), entity.getEntityType())) > fromTs) {
+        continue;
+      }
+      if (primaryFilter != null &&
+          !matchPrimaryFilter(entity.getPrimaryFilters(), primaryFilter)) {
+        continue;
+      }
+      if (secondaryFilters != null) { // AND logic
+        boolean flag = true;
+        for (NameValuePair secondaryFilter : secondaryFilters) {
+          if (secondaryFilter != null && !matchPrimaryFilter(
+              entity.getPrimaryFilters(), secondaryFilter) &&
+              !matchFilter(entity.getOtherInfo(), secondaryFilter)) {
+            flag = false;
+            break;
+          }
+        }
+        if (!flag) {
+          continue;
+        }
+      }
+      entitiesSelected.add(entity);
+    }
+    List<TimelineEntity> entitiesToReturn = new ArrayList<TimelineEntity>();
+    for (TimelineEntity entitySelected : entitiesSelected) {
+      entitiesToReturn.add(maskFields(entitySelected, fields));
+    }
+    Collections.sort(entitiesToReturn);
+    TimelineEntities entitiesWrapper = new TimelineEntities();
+    entitiesWrapper.setEntities(entitiesToReturn);
+    return entitiesWrapper;
+  }
+
+  @Override
+  public TimelineEntity getEntity(String entityId, String entityType,
+      EnumSet<Field> fieldsToRetrieve) {
+    if (fieldsToRetrieve == null) {
+      fieldsToRetrieve = EnumSet.allOf(Field.class);
+    }
+    TimelineEntity entity = entities.get(new EntityIdentifier(entityId, entityType));
+    if (entity == null) {
+      return null;
+    } else {
+      return maskFields(entity, fieldsToRetrieve);
+    }
+  }
+
+  @Override
+  public TimelineEvents getEntityTimelines(String entityType,
+      SortedSet<String> entityIds, Long limit, Long windowStart,
+      Long windowEnd,
+      Set<String> eventTypes) {
+    TimelineEvents allEvents = new TimelineEvents();
+    if (entityIds == null) {
+      return allEvents;
+    }
+    if (limit == null) {
+      limit = DEFAULT_LIMIT;
+    }
+    if (windowStart == null) {
+      windowStart = Long.MIN_VALUE;
+    }
+    if (windowEnd == null) {
+      windowEnd = Long.MAX_VALUE;
+    }
+    for (String entityId : entityIds) {
+      EntityIdentifier entityID = new EntityIdentifier(entityId, entityType);
+      TimelineEntity entity = entities.get(entityID);
+      if (entity == null) {
+        continue;
+      }
+      EventsOfOneEntity events = new EventsOfOneEntity();
+      events.setEntityId(entityId);
+      events.setEntityType(entityType);
+      for (TimelineEvent event : entity.getEvents()) {
+        if (events.getEvents().size() >= limit) {
+          break;
+        }
+        if (event.getTimestamp() <= windowStart) {
+          continue;
+        }
+        if (event.getTimestamp() > windowEnd) {
+          continue;
+        }
+        if (eventTypes != null && !eventTypes.contains(event.getEventType())) {
+          continue;
+        }
+        events.addEvent(event);
+      }
+      allEvents.addEvent(events);
+    }
+    return allEvents;
+  }
+
+  @Override
+  public TimelinePutResponse put(TimelineEntities data) {
+    TimelinePutResponse response = new TimelinePutResponse();
+    for (TimelineEntity entity : data.getEntities()) {
+      EntityIdentifier entityId =
+          new EntityIdentifier(entity.getEntityId(), entity.getEntityType());
+      // store entity info in memory
+      TimelineEntity existingEntity = entities.get(entityId);
+      if (existingEntity == null) {
+        existingEntity = new TimelineEntity();
+        existingEntity.setEntityId(entity.getEntityId());
+        existingEntity.setEntityType(entity.getEntityType());
+        existingEntity.setStartTime(entity.getStartTime());
+        entities.put(entityId, existingEntity);
+        entityInsertTimes.put(entityId, System.currentTimeMillis());
+      }
+      if (entity.getEvents() != null) {
+        if (existingEntity.getEvents() == null) {
+          existingEntity.setEvents(entity.getEvents());
+        } else {
+          existingEntity.addEvents(entity.getEvents());
+        }
+        Collections.sort(existingEntity.getEvents());
+      }
+      // check startTime
+      if (existingEntity.getStartTime() == null) {
+        if (existingEntity.getEvents() == null
+            || existingEntity.getEvents().isEmpty()) {
+          TimelinePutError error = new TimelinePutError();
+          error.setEntityId(entityId.getId());
+          error.setEntityType(entityId.getType());
+          error.setErrorCode(TimelinePutError.NO_START_TIME);
+          response.addError(error);
+          entities.remove(entityId);
+          entityInsertTimes.remove(entityId);
+          continue;
+        } else {
+          Long min = Long.MAX_VALUE;
+          for (TimelineEvent e : entity.getEvents()) {
+            if (min > e.getTimestamp()) {
+              min = e.getTimestamp();
+            }
+          }
+          existingEntity.setStartTime(min);
+        }
+      }
+      if (entity.getPrimaryFilters() != null) {
+        if (existingEntity.getPrimaryFilters() == null) {
+          existingEntity.setPrimaryFilters(new HashMap<String, Set<Object>>());
+        }
+        for (Entry<String, Set<Object>> pf :
+            entity.getPrimaryFilters().entrySet()) {
+          for (Object pfo : pf.getValue()) {
+            existingEntity.addPrimaryFilter(pf.getKey(), maybeConvert(pfo));
+          }
+        }
+      }
+      if (entity.getOtherInfo() != null) {
+        if (existingEntity.getOtherInfo() == null) {
+          existingEntity.setOtherInfo(new HashMap<String, Object>());
+        }
+        for (Entry<String, Object> info : entity.getOtherInfo().entrySet()) {
+          existingEntity.addOtherInfo(info.getKey(),
+              maybeConvert(info.getValue()));
+        }
+      }
+      // relate it to other entities
+      if (entity.getRelatedEntities() == null) {
+        continue;
+      }
+      for (Map.Entry<String, Set<String>> partRelatedEntities : entity
+          .getRelatedEntities().entrySet()) {
+        if (partRelatedEntities == null) {
+          continue;
+        }
+        for (String idStr : partRelatedEntities.getValue()) {
+          EntityIdentifier relatedEntityId =
+              new EntityIdentifier(idStr, partRelatedEntities.getKey());
+          TimelineEntity relatedEntity = entities.get(relatedEntityId);
+          if (relatedEntity != null) {
+            relatedEntity.addRelatedEntity(
+                existingEntity.getEntityType(), existingEntity.getEntityId());
+          } else {
+            relatedEntity = new TimelineEntity();
+            relatedEntity.setEntityId(relatedEntityId.getId());
+            relatedEntity.setEntityType(relatedEntityId.getType());
+            relatedEntity.setStartTime(existingEntity.getStartTime());
+            relatedEntity.addRelatedEntity(existingEntity.getEntityType(),
+                existingEntity.getEntityId());
+            entities.put(relatedEntityId, relatedEntity);
+            entityInsertTimes.put(relatedEntityId, System.currentTimeMillis());
+          }
+        }
+      }
+    }
+    return response;
+  }
+
+  private static TimelineEntity maskFields(
+      TimelineEntity entity, EnumSet<Field> fields) {
+    // Conceal the fields that are not going to be exposed
+    TimelineEntity entityToReturn = new TimelineEntity();
+    entityToReturn.setEntityId(entity.getEntityId());
+    entityToReturn.setEntityType(entity.getEntityType());
+    entityToReturn.setStartTime(entity.getStartTime());
+    entityToReturn.setEvents(fields.contains(Field.EVENTS) ?
+        entity.getEvents() : fields.contains(Field.LAST_EVENT_ONLY) ?
+            Arrays.asList(entity.getEvents().get(0)) : null);
+    entityToReturn.setRelatedEntities(fields.contains(Field.RELATED_ENTITIES) ?
+        entity.getRelatedEntities() : null);
+    entityToReturn.setPrimaryFilters(fields.contains(Field.PRIMARY_FILTERS) ?
+        entity.getPrimaryFilters() : null);
+    entityToReturn.setOtherInfo(fields.contains(Field.OTHER_INFO) ?
+        entity.getOtherInfo() : null);
+    return entityToReturn;
+  }
+
+  private static boolean matchFilter(Map<String, Object> tags,
+      NameValuePair filter) {
+    Object value = tags.get(filter.getName());
+    if (value == null) { // doesn't have the filter
+      return false;
+    } else if (!value.equals(filter.getValue())) { // doesn't match the filter
+      return false;
+    }
+    return true;
+  }
+
+  private static boolean matchPrimaryFilter(Map<String, Set<Object>> tags,
+      NameValuePair filter) {
+    Set<Object> value = tags.get(filter.getName());
+    if (value == null) { // doesn't have the filter
+      return false;
+    } else {
+      return value.contains(filter.getValue());
+    }
+  }
+
+  private static Object maybeConvert(Object o) {
+    if (o instanceof Long) {
+      Long l = (Long)o;
+      if (l >= Integer.MIN_VALUE && l <= Integer.MAX_VALUE) {
+        return l.intValue();
+      }
+    }
+    return o;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/ambari/blob/a52f8a55/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/timeline/NameValuePair.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/timeline/NameValuePair.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/timeline/NameValuePair.java
new file mode 100644
index 0000000..d8dabd2
--- /dev/null
+++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/timeline/NameValuePair.java
@@ -0,0 +1,59 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.applicationhistoryservice.timeline;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+
+/**
+ * A class holding a name and value pair, used for specifying filters in
+ * {@link TimelineReader}.
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Unstable
+public class NameValuePair {
+  String name;
+  Object value;
+
+  public NameValuePair(String name, Object value) {
+    this.name = name;
+    this.value = value;
+  }
+
+  /**
+   * Get the name.
+   * @return The name.
+   */
+  public String getName() {
+
+    return name;
+  }
+
+  /**
+   * Get the value.
+   * @return The value.
+   */
+  public Object getValue() {
+    return value;
+  }
+
+  @Override
+  public String toString() {
+    return "{ name: " + name + ", value: " + value + " }";
+  }
+}

http://git-wip-us.apache.org/repos/asf/ambari/blob/a52f8a55/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/timeline/TimelineReader.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/timeline/TimelineReader.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/timeline/TimelineReader.java
new file mode 100644
index 0000000..9ae9954
--- /dev/null
+++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/timeline/TimelineReader.java
@@ -0,0 +1,155 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.applicationhistoryservice.timeline;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.EnumSet;
+import java.util.Set;
+import java.util.SortedSet;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.yarn.api.records.timeline.TimelineEntities;
+import org.apache.hadoop.yarn.api.records.timeline.TimelineEntity;
+import org.apache.hadoop.yarn.api.records.timeline.TimelineEvents;
+
+/**
+ * This interface is for retrieving timeline information.
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Unstable
+public interface TimelineReader {
+
+  /**
+   * Possible fields to retrieve for {@link #getEntities} and {@link #getEntity}
+   * .
+   */
+  enum Field {
+    EVENTS,
+    RELATED_ENTITIES,
+    PRIMARY_FILTERS,
+    OTHER_INFO,
+    LAST_EVENT_ONLY
+  }
+
+  /**
+   * Default limit for {@link #getEntities} and {@link #getEntityTimelines}.
+   */
+  final long DEFAULT_LIMIT = 100;
+
+  /**
+   * This method retrieves a list of entity information, {@link TimelineEntity},
+   * sorted by the starting timestamp for the entity, descending. The starting
+   * timestamp of an entity is a timestamp specified by the client. If it is not
+   * explicitly specified, it will be chosen by the store to be the earliest
+   * timestamp of the events received in the first put for the entity.
+   * 
+   * @param entityType
+   *          The type of entities to return (required).
+   * @param limit
+   *          A limit on the number of entities to return. If null, defaults to
+   *          {@link #DEFAULT_LIMIT}.
+   * @param windowStart
+   *          The earliest start timestamp to retrieve (exclusive). If null,
+   *          defaults to retrieving all entities until the limit is reached.
+   * @param windowEnd
+   *          The latest start timestamp to retrieve (inclusive). If null,
+   *          defaults to {@link Long#MAX_VALUE}
+   * @param fromId
+   *          If fromId is not null, retrieve entities earlier than and
+   *          including the specified ID. If no start time is found for the
+   *          specified ID, an empty list of entities will be returned. The
+   *          windowEnd parameter will take precedence if the start time of this
+   *          entity falls later than windowEnd.
+   * @param fromTs
+   *          If fromTs is not null, ignore entities that were inserted into the
+   *          store after the given timestamp. The entity's insert timestamp
+   *          used for this comparison is the store's system time when the first
+   *          put for the entity was received (not the entity's start time).
+   * @param primaryFilter
+   *          Retrieves only entities that have the specified primary filter. If
+   *          null, retrieves all entities. This is an indexed retrieval, and no
+   *          entities that do not match the filter are scanned.
+   * @param secondaryFilters
+   *          Retrieves only entities that have exact matches for all the
+   *          specified filters in their primary filters or other info. This is
+   *          not an indexed retrieval, so all entities are scanned but only
+   *          those matching the filters are returned.
+   * @param fieldsToRetrieve
+   *          Specifies which fields of the entity object to retrieve (see
+   *          {@link Field}). If the set of fields contains
+   *          {@link Field#LAST_EVENT_ONLY} and not {@link Field#EVENTS}, the
+   *          most recent event for each entity is retrieved. If null, retrieves
+   *          all fields.
+   * @return An {@link TimelineEntities} object.
+   * @throws IOException
+   */
+  TimelineEntities getEntities(String entityType,
+      Long limit, Long windowStart, Long windowEnd, String fromId, Long fromTs,
+      NameValuePair primaryFilter, Collection<NameValuePair> secondaryFilters,
+      EnumSet<Field> fieldsToRetrieve) throws IOException;
+
+  /**
+   * This method retrieves the entity information for a given entity.
+   * 
+   * @param entityId
+   *          The entity whose information will be retrieved.
+   * @param entityType
+   *          The type of the entity.
+   * @param fieldsToRetrieve
+   *          Specifies which fields of the entity object to retrieve (see
+   *          {@link Field}). If the set of fields contains
+   *          {@link Field#LAST_EVENT_ONLY} and not {@link Field#EVENTS}, the
+   *          most recent event for each entity is retrieved. If null, retrieves
+   *          all fields.
+   * @return An {@link TimelineEntity} object.
+   * @throws IOException
+   */
+  TimelineEntity getEntity(String entityId, String entityType, EnumSet<Field>
+      fieldsToRetrieve) throws IOException;
+
+  /**
+   * This method retrieves the events for a list of entities all of the same
+   * entity type. The events for each entity are sorted in order of their
+   * timestamps, descending.
+   * 
+   * @param entityType
+   *          The type of entities to retrieve events for.
+   * @param entityIds
+   *          The entity IDs to retrieve events for.
+   * @param limit
+   *          A limit on the number of events to return for each entity. If
+   *          null, defaults to {@link #DEFAULT_LIMIT} events per entity.
+   * @param windowStart
+   *          If not null, retrieves only events later than the given time
+   *          (exclusive)
+   * @param windowEnd
+   *          If not null, retrieves only events earlier than the given time
+   *          (inclusive)
+   * @param eventTypes
+   *          Restricts the events returned to the given types. If null, events
+   *          of all types will be returned.
+   * @return An {@link TimelineEvents} object.
+   * @throws IOException
+   */
+  TimelineEvents getEntityTimelines(String entityType,
+      SortedSet<String> entityIds, Long limit, Long windowStart,
+      Long windowEnd, Set<String> eventTypes) throws IOException;
+}

http://git-wip-us.apache.org/repos/asf/ambari/blob/a52f8a55/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/timeline/TimelineStore.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/timeline/TimelineStore.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/timeline/TimelineStore.java
new file mode 100644
index 0000000..6b50d83
--- /dev/null
+++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/timeline/TimelineStore.java
@@ -0,0 +1,29 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.applicationhistoryservice.timeline;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.service.Service;
+
+@InterfaceAudience.Private
+@InterfaceStability.Unstable
+public interface TimelineStore extends
+    Service, TimelineReader, TimelineWriter {
+}

http://git-wip-us.apache.org/repos/asf/ambari/blob/a52f8a55/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/timeline/TimelineWriter.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/timeline/TimelineWriter.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/timeline/TimelineWriter.java
new file mode 100644
index 0000000..8f28d82
--- /dev/null
+++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/timeline/TimelineWriter.java
@@ -0,0 +1,46 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.applicationhistoryservice.timeline;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.yarn.api.records.timeline.TimelineEntities;
+import org.apache.hadoop.yarn.api.records.timeline.TimelinePutResponse;
+
+import java.io.IOException;
+
+/**
+ * This interface is for storing timeline information.
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Unstable
+public interface TimelineWriter {
+
+  /**
+   * Stores entity information to the timeline store. Any errors occurring for
+   * individual put request objects will be reported in the response.
+   * 
+   * @param data
+   *          An {@link TimelineEntities} object.
+   * @return An {@link TimelinePutResponse} object.
+   * @throws IOException
+   */
+  TimelinePutResponse put(TimelineEntities data) throws IOException;
+
+}

http://git-wip-us.apache.org/repos/asf/ambari/blob/a52f8a55/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/timeline/package-info.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/timeline/package-info.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/timeline/package-info.java
new file mode 100644
index 0000000..970e868
--- /dev/null
+++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/timeline/package-info.java
@@ -0,0 +1,20 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+@InterfaceAudience.Private
+package org.apache.hadoop.yarn.server.applicationhistoryservice.timeline;
+import org.apache.hadoop.classification.InterfaceAudience;