You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by gt...@apache.org on 2016/01/20 10:13:21 UTC

[04/50] [abbrv] hadoop git commit: YARN-3049. [Storage Implementation] Implement storage reader interface to fetch raw data from HBase backend (Zhijie Shen via sjlee)

YARN-3049. [Storage Implementation] Implement storage reader interface to fetch raw data from HBase backend (Zhijie Shen via sjlee)

(cherry picked from commit 07433c2ad52df9e844dbd90020c277d3df844dcd)


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/0bed3fb3
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/0bed3fb3
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/0bed3fb3

Branch: refs/heads/feature-YARN-2928
Commit: 0bed3fb3b3793fc0fc4b838c024cf5da7c2cc291
Parents: 2d97f86
Author: Sangjin Lee <sj...@apache.org>
Authored: Fri Aug 7 10:00:22 2015 -0700
Committer: Li Lu <gt...@apache.org>
Committed: Tue Jan 19 17:37:46 2016 -0800

----------------------------------------------------------------------
 hadoop-yarn-project/CHANGES.txt                 |   3 +
 .../dev-support/findbugs-exclude.xml            |   5 +
 .../records/timelineservice/TimelineEntity.java |   9 +-
 .../storage/FileSystemTimelineReaderImpl.java   | 164 +++----
 .../storage/HBaseTimelineReaderImpl.java        | 424 +++++++++++++++++++
 .../storage/HBaseTimelineWriterImpl.java        |  43 +-
 .../storage/TimelineSchemaCreator.java          |  12 +
 .../storage/apptoflow/AppToFlowColumn.java      | 126 ++++++
 .../apptoflow/AppToFlowColumnFamily.java        |  51 +++
 .../storage/apptoflow/AppToFlowRowKey.java      |  39 ++
 .../storage/apptoflow/AppToFlowTable.java       | 110 +++++
 .../storage/apptoflow/package-info.java         |  23 +
 .../storage/common/BaseTable.java               |  16 +
 .../storage/common/ColumnPrefix.java            |   2 +-
 .../common/TimelineEntitySchemaConstants.java   |  68 ---
 .../common/TimelineHBaseSchemaConstants.java    |  68 +++
 .../storage/common/TimelineReaderUtils.java     | 112 +++++
 .../storage/entity/EntityColumn.java            |   2 +-
 .../storage/entity/EntityColumnFamily.java      |   2 +-
 .../storage/entity/EntityColumnPrefix.java      |   2 +-
 .../storage/entity/EntityRowKey.java            |  36 +-
 .../storage/entity/EntityTable.java             |   8 +-
 .../storage/TestHBaseTimelineWriterImpl.java    |  82 +++-
 23 files changed, 1197 insertions(+), 210 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/0bed3fb3/hadoop-yarn-project/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt
index f40cfe8..0378df8 100644
--- a/hadoop-yarn-project/CHANGES.txt
+++ b/hadoop-yarn-project/CHANGES.txt
@@ -82,6 +82,9 @@ Branch YARN-2928: Timeline Server Next Generation: Phase 1
     YARN-3949. Ensure timely flush of timeline writes. (Sangjin Lee via
     junping_du)
 
+    YARN-3049. [Storage Implementation] Implement storage reader interface to
+    fetch raw data from HBase backend (Zhijie Shen via sjlee)
+
   IMPROVEMENTS
 
     YARN-3276. Code cleanup for timeline service API records. (Junping Du via

http://git-wip-us.apache.org/repos/asf/hadoop/blob/0bed3fb3/hadoop-yarn-project/hadoop-yarn/dev-support/findbugs-exclude.xml
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/dev-support/findbugs-exclude.xml b/hadoop-yarn-project/hadoop-yarn/dev-support/findbugs-exclude.xml
index 7505645..0dcdd15 100644
--- a/hadoop-yarn-project/hadoop-yarn/dev-support/findbugs-exclude.xml
+++ b/hadoop-yarn-project/hadoop-yarn/dev-support/findbugs-exclude.xml
@@ -522,4 +522,9 @@
     <Package name="org.apache.hadoop.yarn.api.records.impl.pb" />
     <Bug pattern="NP_BOOLEAN_RETURN_NULL" />
   </Match>
+  <!-- Object cast is based on the event type -->
+  <Match>
+    <Class name="org.apache.hadoop.yarn.server.resourcemanager.metrics.AbstractTimelineServicePublisher" />
+     <Bug pattern="BC_UNCONFIRMED_CAST" />
+  </Match>
 </FindBugsFilter>

http://git-wip-us.apache.org/repos/asf/hadoop/blob/0bed3fb3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/timelineservice/TimelineEntity.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/timelineservice/TimelineEntity.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/timelineservice/TimelineEntity.java
index 9ef2d90..0701001 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/timelineservice/TimelineEntity.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/timelineservice/TimelineEntity.java
@@ -29,7 +29,9 @@ import javax.xml.bind.annotation.XmlRootElement;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Map;
+import java.util.NavigableSet;
 import java.util.Set;
+import java.util.TreeSet;
 
 /**
  * The basic timeline entity data structure for timeline service v2. Timeline
@@ -133,7 +135,8 @@ public class TimelineEntity implements Comparable<TimelineEntity> {
   private HashMap<String, Object> info = new HashMap<>();
   private HashMap<String, String> configs = new HashMap<>();
   private Set<TimelineMetric> metrics = new HashSet<>();
-  private Set<TimelineEvent> events = new HashSet<>();
+  // events should be sorted by timestamp in descending order
+  private NavigableSet<TimelineEvent> events = new TreeSet<>();
   private HashMap<String, Set<String>> isRelatedToEntities = new HashMap<>();
   private HashMap<String, Set<String>> relatesToEntities = new HashMap<>();
   private long createdTime;
@@ -334,7 +337,7 @@ public class TimelineEntity implements Comparable<TimelineEntity> {
   }
 
   @XmlElement(name = "events")
-  public Set<TimelineEvent> getEvents() {
+  public NavigableSet<TimelineEvent> getEvents() {
     if (real == null) {
       return events;
     } else {
@@ -342,7 +345,7 @@ public class TimelineEntity implements Comparable<TimelineEntity> {
     }
   }
 
-  public void setEvents(Set<TimelineEvent> events) {
+  public void setEvents(NavigableSet<TimelineEvent> events) {
     if (real == null) {
       this.events = events;
     } else {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/0bed3fb3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/FileSystemTimelineReaderImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/FileSystemTimelineReaderImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/FileSystemTimelineReaderImpl.java
index f9f1d1d..45ddd1d 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/FileSystemTimelineReaderImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/FileSystemTimelineReaderImpl.java
@@ -43,6 +43,7 @@ import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity;
 import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent;
 import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetric;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.server.timelineservice.storage.common.TimelineReaderUtils;
 import org.apache.hadoop.yarn.webapp.YarnJacksonJaxbJsonProvider;
 import org.codehaus.jackson.JsonGenerationException;
 import org.codehaus.jackson.map.JsonMappingException;
@@ -119,59 +120,32 @@ public class FileSystemTimelineReaderImpl extends AbstractService
   private static void fillFields(TimelineEntity finalEntity,
       TimelineEntity real, EnumSet<Field> fields) {
     if (fields.contains(Field.ALL)) {
-      finalEntity.setConfigs(real.getConfigs());
-      finalEntity.setMetrics(real.getMetrics());
-      finalEntity.setInfo(real.getInfo());
-      finalEntity.setIsRelatedToEntities(real.getIsRelatedToEntities());
-      finalEntity.setIsRelatedToEntities(real.getIsRelatedToEntities());
-      finalEntity.setEvents(real.getEvents());
-      return;
+      fields = EnumSet.allOf(Field.class);
     }
     for (Field field : fields) {
       switch(field) {
-      case CONFIGS:
-        finalEntity.setConfigs(real.getConfigs());
-        break;
-      case METRICS:
-        finalEntity.setMetrics(real.getMetrics());
-        break;
-      case INFO:
-        finalEntity.setInfo(real.getInfo());
-        break;
-      case IS_RELATED_TO:
-        finalEntity.setIsRelatedToEntities(real.getIsRelatedToEntities());
-        break;
-      case RELATES_TO:
-        finalEntity.setIsRelatedToEntities(real.getIsRelatedToEntities());
-        break;
-      case EVENTS:
-        finalEntity.setEvents(real.getEvents());
-        break;
-      default:
-        continue;
-      }
-    }
-  }
-
-  private static boolean matchFilter(Object infoValue, Object filterValue) {
-    return infoValue.equals(filterValue);
-  }
-
-  private static boolean matchFilters(Map<String, ? extends Object> entityInfo,
-      Map<String, ? extends Object> filters) {
-    if (entityInfo == null || entityInfo.isEmpty()) {
-      return false;
-    }
-    for (Map.Entry<String, ? extends Object> filter : filters.entrySet()) {
-      Object infoValue = entityInfo.get(filter.getKey());
-      if (infoValue == null) {
-        return false;
-      }
-      if (!matchFilter(infoValue, filter.getValue())) {
-        return false;
+        case CONFIGS:
+          finalEntity.setConfigs(real.getConfigs());
+          break;
+        case METRICS:
+          finalEntity.setMetrics(real.getMetrics());
+          break;
+        case INFO:
+          finalEntity.setInfo(real.getInfo());
+          break;
+        case IS_RELATED_TO:
+          finalEntity.setIsRelatedToEntities(real.getIsRelatedToEntities());
+          break;
+        case RELATES_TO:
+          finalEntity.setIsRelatedToEntities(real.getIsRelatedToEntities());
+          break;
+        case EVENTS:
+          finalEntity.setEvents(real.getEvents());
+          break;
+        default:
+          continue;
       }
     }
-    return true;
   }
 
   private String getFlowRunPath(String userId, String clusterId, String flowId,
@@ -186,10 +160,10 @@ public class FileSystemTimelineReaderImpl extends AbstractService
     String appFlowMappingFile = rootPath + "/" +  ENTITIES_DIR + "/" +
         clusterId + "/" + APP_FLOW_MAPPING_FILE;
     try (BufferedReader reader =
-        new BufferedReader(new InputStreamReader(
-            new FileInputStream(
-                appFlowMappingFile), Charset.forName("UTF-8")));
-        CSVParser parser = new CSVParser(reader, csvFormat)) {
+             new BufferedReader(new InputStreamReader(
+                 new FileInputStream(
+                     appFlowMappingFile), Charset.forName("UTF-8")));
+         CSVParser parser = new CSVParser(reader, csvFormat)) {
       for (CSVRecord record : parser.getRecords()) {
         if (record.size() < 4) {
           continue;
@@ -207,36 +181,6 @@ public class FileSystemTimelineReaderImpl extends AbstractService
     throw new IOException("Unable to get flow info");
   }
 
-  private static boolean matchMetricFilters(Set<TimelineMetric> metrics,
-      Set<String> metricFilters) {
-    Set<String> tempMetrics = new HashSet<String>();
-    for (TimelineMetric metric : metrics) {
-      tempMetrics.add(metric.getId());
-    }
-
-    for (String metricFilter : metricFilters) {
-      if (!tempMetrics.contains(metricFilter)) {
-        return false;
-      }
-    }
-    return true;
-  }
-
-  private static boolean matchEventFilters(Set<TimelineEvent> entityEvents,
-      Set<String> eventFilters) {
-    Set<String> tempEvents = new HashSet<String>();
-    for (TimelineEvent event : entityEvents) {
-      tempEvents.add(event.getId());
-    }
-
-    for (String eventFilter : eventFilters) {
-      if (!tempEvents.contains(eventFilter)) {
-        return false;
-      }
-    }
-    return true;
-  }
-
   private static TimelineEntity createEntityToBeReturned(TimelineEntity entity,
       EnumSet<Field> fieldsToRetrieve) {
     TimelineEntity entityToBeReturned = new TimelineEntity();
@@ -254,23 +198,6 @@ public class FileSystemTimelineReaderImpl extends AbstractService
     return (time >= timeBegin) && (time <= timeEnd);
   }
 
-  private static boolean matchRelations(
-      Map<String, Set<String>> entityRelations,
-      Map<String, Set<String>> relations) {
-    for (Map.Entry<String, Set<String>> relation : relations.entrySet()) {
-      Set<String> ids = entityRelations.get(relation.getKey());
-      if (ids == null) {
-        return false;
-      }
-      for (String id : relation.getValue()) {
-        if (!ids.contains(id)) {
-          return false;
-        }
-      }
-    }
-    return true;
-  }
-
   private static void mergeEntities(TimelineEntity entity1,
       TimelineEntity entity2) {
     // Ideally created time wont change except in the case of issue from client.
@@ -364,22 +291,22 @@ public class FileSystemTimelineReaderImpl extends AbstractService
     // First sort the selected entities based on created/start time.
     Map<Long, Set<TimelineEntity>> sortedEntities =
         new TreeMap<>(
-          new Comparator<Long>() {
-            @Override
-            public int compare(Long l1, Long l2) {
-              return l2.compareTo(l1);
+            new Comparator<Long>() {
+              @Override
+              public int compare(Long l1, Long l2) {
+                return l2.compareTo(l1);
+              }
             }
-          }
         );
     for (File entityFile : dir.listFiles()) {
       if (!entityFile.getName().contains(TIMELINE_SERVICE_STORAGE_EXTENSION)) {
         continue;
       }
       try (BufferedReader reader =
-          new BufferedReader(
-              new InputStreamReader(
-                  new FileInputStream(
-                      entityFile), Charset.forName("UTF-8")))) {
+               new BufferedReader(
+                   new InputStreamReader(
+                       new FileInputStream(
+                           entityFile), Charset.forName("UTF-8")))) {
         TimelineEntity entity = readEntityFromFile(reader);
         if (!entity.getType().equals(entityType)) {
           continue;
@@ -393,27 +320,32 @@ public class FileSystemTimelineReaderImpl extends AbstractService
           continue;
         }
         if (relatesTo != null && !relatesTo.isEmpty() &&
-            !matchRelations(entity.getRelatesToEntities(), relatesTo)) {
+            !TimelineReaderUtils
+                .matchRelations(entity.getRelatesToEntities(), relatesTo)) {
           continue;
         }
         if (isRelatedTo != null && !isRelatedTo.isEmpty() &&
-            !matchRelations(entity.getIsRelatedToEntities(), isRelatedTo)) {
+            !TimelineReaderUtils
+                .matchRelations(entity.getIsRelatedToEntities(), isRelatedTo)) {
           continue;
         }
         if (infoFilters != null && !infoFilters.isEmpty() &&
-            !matchFilters(entity.getInfo(), infoFilters)) {
+            !TimelineReaderUtils.matchFilters(entity.getInfo(), infoFilters)) {
           continue;
         }
         if (configFilters != null && !configFilters.isEmpty() &&
-            !matchFilters(entity.getConfigs(), configFilters)) {
+            !TimelineReaderUtils.matchFilters(
+                entity.getConfigs(), configFilters)) {
           continue;
         }
         if (metricFilters != null && !metricFilters.isEmpty() &&
-            !matchMetricFilters(entity.getMetrics(), metricFilters)) {
+            !TimelineReaderUtils.matchMetricFilters(
+                entity.getMetrics(), metricFilters)) {
           continue;
         }
         if (eventFilters != null && !eventFilters.isEmpty() &&
-            !matchEventFilters(entity.getEvents(), eventFilters)) {
+            !TimelineReaderUtils.matchEventFilters(
+                entity.getEvents(), eventFilters)) {
           continue;
         }
         TimelineEntity entityToBeReturned =
@@ -461,8 +393,8 @@ public class FileSystemTimelineReaderImpl extends AbstractService
     File entityFile =
         new File(dir, entityId + TIMELINE_SERVICE_STORAGE_EXTENSION);
     try (BufferedReader reader =
-        new BufferedReader(new InputStreamReader(
-            new FileInputStream(entityFile), Charset.forName("UTF-8")))) {
+             new BufferedReader(new InputStreamReader(
+                 new FileInputStream(entityFile), Charset.forName("UTF-8")))) {
       TimelineEntity entity = readEntityFromFile(reader);
       return createEntityToBeReturned(entity, fieldsToRetrieve);
     }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/0bed3fb3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/HBaseTimelineReaderImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/HBaseTimelineReaderImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/HBaseTimelineReaderImpl.java
new file mode 100644
index 0000000..5258b9c
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/HBaseTimelineReaderImpl.java
@@ -0,0 +1,424 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.timelineservice.storage;
+
+
+import com.google.common.base.Preconditions;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.client.Connection;
+import org.apache.hadoop.hbase.client.ConnectionFactory;
+import org.apache.hadoop.hbase.client.Get;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.ResultScanner;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.service.AbstractService;
+import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity;
+import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent;
+import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetric;
+import org.apache.hadoop.yarn.server.timelineservice.storage.apptoflow.AppToFlowColumn;
+import org.apache.hadoop.yarn.server.timelineservice.storage.apptoflow.AppToFlowRowKey;
+import org.apache.hadoop.yarn.server.timelineservice.storage.apptoflow.AppToFlowTable;
+import org.apache.hadoop.yarn.server.timelineservice.storage.common.Separator;
+import org.apache.hadoop.yarn.server.timelineservice.storage.common.TimelineReaderUtils;
+import org.apache.hadoop.yarn.server.timelineservice.storage.common.TimelineWriterUtils;
+import org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityColumn;
+import org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityColumnPrefix;
+import org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityRowKey;
+import org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityTable;
+
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.util.Collection;
+import java.util.EnumSet;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.NavigableMap;
+import java.util.NavigableSet;
+import java.util.Set;
+import java.util.TreeSet;
+
+public class HBaseTimelineReaderImpl
+    extends AbstractService implements TimelineReader {
+
+  private static final Log LOG = LogFactory
+      .getLog(HBaseTimelineReaderImpl.class);
+  private static final long DEFAULT_BEGIN_TIME = 0L;
+  private static final long DEFAULT_END_TIME = Long.MAX_VALUE;
+
+  private Configuration hbaseConf = null;
+  private Connection conn;
+  private EntityTable entityTable;
+  private AppToFlowTable appToFlowTable;
+
+  public HBaseTimelineReaderImpl() {
+    super(HBaseTimelineReaderImpl.class.getName());
+  }
+
+  @Override
+  public void serviceInit(Configuration conf) throws Exception {
+    super.serviceInit(conf);
+    hbaseConf = HBaseConfiguration.create(conf);
+    conn = ConnectionFactory.createConnection(hbaseConf);
+    entityTable = new EntityTable();
+    appToFlowTable = new AppToFlowTable();
+  }
+
+  @Override
+  protected void serviceStop() throws Exception {
+    if (conn != null) {
+      LOG.info("closing the hbase Connection");
+      conn.close();
+    }
+    super.serviceStop();
+  }
+
+  @Override
+  public TimelineEntity getEntity(String userId, String clusterId,
+      String flowId, Long flowRunId, String appId, String entityType,
+      String entityId, EnumSet<Field> fieldsToRetrieve)
+      throws IOException {
+    validateParams(userId, clusterId, appId, entityType, entityId, true);
+    // In reality both should be null or neither should be null
+    if (flowId == null || flowRunId == null) {
+      FlowContext context = lookupFlowContext(clusterId, appId);
+      flowId = context.flowId;
+      flowRunId = context.flowRunId;
+    }
+    if (fieldsToRetrieve == null) {
+      fieldsToRetrieve = EnumSet.noneOf(Field.class);
+    }
+
+    byte[] rowKey = EntityRowKey.getRowKey(
+        clusterId, userId, flowId, flowRunId, appId, entityType, entityId);
+    Get get = new Get(rowKey);
+    get.setMaxVersions(Integer.MAX_VALUE);
+    return parseEntity(
+        entityTable.getResult(hbaseConf, conn, get), fieldsToRetrieve,
+        false, DEFAULT_BEGIN_TIME, DEFAULT_END_TIME, false, DEFAULT_BEGIN_TIME,
+        DEFAULT_END_TIME, null, null, null, null, null, null);
+  }
+
+  @Override
+  public Set<TimelineEntity> getEntities(String userId, String clusterId,
+      String flowId, Long flowRunId, String appId, String entityType,
+      Long limit, Long createdTimeBegin, Long createdTimeEnd,
+      Long modifiedTimeBegin, Long modifiedTimeEnd,
+      Map<String, Set<String>> relatesTo, Map<String, Set<String>> isRelatedTo,
+      Map<String, Object> infoFilters, Map<String, String> configFilters,
+      Set<String> metricFilters, Set<String> eventFilters,
+      EnumSet<Field> fieldsToRetrieve) throws IOException {
+    validateParams(userId, clusterId, appId, entityType, null, false);
+    // In reality both should be null or neither should be null
+    if (flowId == null || flowRunId == null) {
+      FlowContext context = lookupFlowContext(clusterId, appId);
+      flowId = context.flowId;
+      flowRunId = context.flowRunId;
+    }
+    if (limit == null) {
+      limit = TimelineReader.DEFAULT_LIMIT;
+    }
+    if (createdTimeBegin == null) {
+      createdTimeBegin = DEFAULT_BEGIN_TIME;
+    }
+    if (createdTimeEnd == null) {
+      createdTimeEnd = DEFAULT_END_TIME;
+    }
+    if (modifiedTimeBegin == null) {
+      modifiedTimeBegin = DEFAULT_BEGIN_TIME;
+    }
+    if (modifiedTimeEnd == null) {
+      modifiedTimeEnd = DEFAULT_END_TIME;
+    }
+    if (fieldsToRetrieve == null) {
+      fieldsToRetrieve = EnumSet.noneOf(Field.class);
+    }
+
+    NavigableSet<TimelineEntity> entities = new TreeSet<>();
+    // Scan through part of the table to find the entities belong to one app and
+    // one type
+    Scan scan = new Scan();
+    scan.setRowPrefixFilter(EntityRowKey.getRowKeyPrefix(
+        clusterId, userId, flowId, flowRunId, appId, entityType));
+    scan.setMaxVersions(Integer.MAX_VALUE);
+    ResultScanner scanner = entityTable.getResultScanner(hbaseConf, conn, scan);
+    for (Result result : scanner) {
+      TimelineEntity entity = parseEntity(result, fieldsToRetrieve,
+          true, createdTimeBegin, createdTimeEnd,
+          true, modifiedTimeBegin, modifiedTimeEnd,
+          isRelatedTo, relatesTo, infoFilters, configFilters, eventFilters,
+          metricFilters);
+      if (entity == null) {
+        continue;
+      }
+      if (entities.size() > limit) {
+        entities.pollLast();
+      }
+      entities.add(entity);
+    }
+    return entities;
+  }
+
+  private FlowContext lookupFlowContext(String clusterId, String appId)
+      throws IOException {
+    byte[] rowKey = AppToFlowRowKey.getRowKey(clusterId, appId);
+    Get get = new Get(rowKey);
+    Result result = appToFlowTable.getResult(hbaseConf, conn, get);
+    if (result != null && !result.isEmpty()) {
+      return new FlowContext(
+          AppToFlowColumn.FLOW_ID.readResult(result).toString(),
+          ((Number) AppToFlowColumn.FLOW_RUN_ID.readResult(result)).longValue());
+    } else {
+       throw new IOException(
+           "Unable to find the context flow ID and flow run ID for clusterId=" +
+           clusterId + ", appId=" + appId);
+    }
+  }
+
+  private static class FlowContext {
+    private String flowId;
+    private Long flowRunId;
+    public FlowContext(String flowId, Long flowRunId) {
+      this.flowId = flowId;
+      this.flowRunId = flowRunId;
+    }
+  }
+
+  private static void validateParams(String userId, String clusterId,
+      String appId, String entityType, String entityId, boolean checkEntityId) {
+    Preconditions.checkNotNull(userId, "userId shouldn't be null");
+    Preconditions.checkNotNull(clusterId, "clusterId shouldn't be null");
+    Preconditions.checkNotNull(appId, "appId shouldn't be null");
+    Preconditions.checkNotNull(entityType, "entityType shouldn't be null");
+    if (checkEntityId) {
+      Preconditions.checkNotNull(entityId, "entityId shouldn't be null");
+    }
+  }
+
+  private static TimelineEntity parseEntity(
+      Result result, EnumSet<Field> fieldsToRetrieve,
+      boolean checkCreatedTime, long createdTimeBegin, long createdTimeEnd,
+      boolean checkModifiedTime, long modifiedTimeBegin, long modifiedTimeEnd,
+      Map<String, Set<String>> isRelatedTo, Map<String, Set<String>> relatesTo,
+      Map<String, Object> infoFilters, Map<String, String> configFilters,
+      Set<String> eventFilters, Set<String> metricFilters)
+          throws IOException {
+    if (result == null || result.isEmpty()) {
+      return null;
+    }
+    TimelineEntity entity = new TimelineEntity();
+    entity.setType(EntityColumn.TYPE.readResult(result).toString());
+    entity.setId(EntityColumn.ID.readResult(result).toString());
+
+    // fetch created time
+    entity.setCreatedTime(
+        ((Number) EntityColumn.CREATED_TIME.readResult(result)).longValue());
+    if (checkCreatedTime && (entity.getCreatedTime() < createdTimeBegin ||
+        entity.getCreatedTime() > createdTimeEnd)) {
+      return null;
+    }
+
+    // fetch modified time
+    entity.setCreatedTime(
+        ((Number) EntityColumn.MODIFIED_TIME.readResult(result)).longValue());
+    if (checkModifiedTime && (entity.getModifiedTime() < modifiedTimeBegin ||
+        entity.getModifiedTime() > modifiedTimeEnd)) {
+      return null;
+    }
+
+    // fetch is related to entities
+    boolean checkIsRelatedTo = isRelatedTo != null && isRelatedTo.size() > 0;
+    if (fieldsToRetrieve.contains(Field.ALL) ||
+        fieldsToRetrieve.contains(Field.IS_RELATED_TO) || checkIsRelatedTo) {
+      readRelationship(entity, result, EntityColumnPrefix.IS_RELATED_TO);
+      if (checkIsRelatedTo && !TimelineReaderUtils.matchRelations(
+          entity.getIsRelatedToEntities(), isRelatedTo)) {
+        return null;
+      }
+      if (!fieldsToRetrieve.contains(Field.ALL) &&
+          !fieldsToRetrieve.contains(Field.IS_RELATED_TO)) {
+        entity.getIsRelatedToEntities().clear();
+      }
+    }
+
+    // fetch relates to entities
+    boolean checkRelatesTo = relatesTo != null && relatesTo.size() > 0;
+    if (fieldsToRetrieve.contains(Field.ALL) ||
+        fieldsToRetrieve.contains(Field.RELATES_TO) || checkRelatesTo) {
+      readRelationship(entity, result, EntityColumnPrefix.RELATES_TO);
+      if (checkRelatesTo && !TimelineReaderUtils.matchRelations(
+          entity.getRelatesToEntities(), relatesTo)) {
+        return null;
+      }
+      if (!fieldsToRetrieve.contains(Field.ALL) &&
+          !fieldsToRetrieve.contains(Field.RELATES_TO)) {
+        entity.getRelatesToEntities().clear();
+      }
+    }
+
+    // fetch info
+    boolean checkInfo = infoFilters != null && infoFilters.size() > 0;
+    if (fieldsToRetrieve.contains(Field.ALL) ||
+        fieldsToRetrieve.contains(Field.INFO) || checkInfo) {
+      readKeyValuePairs(entity, result, EntityColumnPrefix.INFO);
+      if (checkInfo &&
+          !TimelineReaderUtils.matchFilters(entity.getInfo(), infoFilters)) {
+        return null;
+      }
+      if (!fieldsToRetrieve.contains(Field.ALL) &&
+          !fieldsToRetrieve.contains(Field.INFO)) {
+        entity.getInfo().clear();
+      }
+    }
+
+    // fetch configs
+    boolean checkConfigs = configFilters != null && configFilters.size() > 0;
+    if (fieldsToRetrieve.contains(Field.ALL) ||
+        fieldsToRetrieve.contains(Field.CONFIGS) || checkConfigs) {
+      readKeyValuePairs(entity, result, EntityColumnPrefix.CONFIG);
+      if (checkConfigs && !TimelineReaderUtils.matchFilters(
+          entity.getConfigs(), configFilters)) {
+        return null;
+      }
+      if (!fieldsToRetrieve.contains(Field.ALL) &&
+          !fieldsToRetrieve.contains(Field.CONFIGS)) {
+        entity.getConfigs().clear();
+      }
+    }
+
+    // fetch events
+    boolean checkEvents = eventFilters != null && eventFilters.size() > 0;
+    if (fieldsToRetrieve.contains(Field.ALL) ||
+        fieldsToRetrieve.contains(Field.EVENTS) || checkEvents) {
+      readEvents(entity, result);
+      if (checkEvents && !TimelineReaderUtils.matchEventFilters(
+          entity.getEvents(), eventFilters)) {
+        return null;
+      }
+      if (!fieldsToRetrieve.contains(Field.ALL) &&
+          !fieldsToRetrieve.contains(Field.EVENTS)) {
+        entity.getEvents().clear();
+      }
+    }
+
+    // fetch metrics
+    boolean checkMetrics = metricFilters != null && metricFilters.size() > 0;
+    if (fieldsToRetrieve.contains(Field.ALL) ||
+        fieldsToRetrieve.contains(Field.METRICS) || checkMetrics) {
+      readMetrics(entity, result);
+      if (checkMetrics && !TimelineReaderUtils.matchMetricFilters(
+          entity.getMetrics(), metricFilters)) {
+        return null;
+      }
+      if (!fieldsToRetrieve.contains(Field.ALL) &&
+          !fieldsToRetrieve.contains(Field.METRICS)) {
+        entity.getMetrics().clear();
+      }
+    }
+    return entity;
+  }
+
+  private static void readRelationship(
+      TimelineEntity entity, Result result, EntityColumnPrefix prefix)
+          throws IOException {
+    // isRelatedTo and relatesTo are of type Map<String, Set<String>>
+    Map<String, Object> columns = prefix.readResults(result);
+    for (Map.Entry<String, Object> column : columns.entrySet()) {
+      for (String id : Separator.VALUES.splitEncoded(
+          column.getValue().toString())) {
+        if (prefix.equals(EntityColumnPrefix.IS_RELATED_TO)) {
+          entity.addIsRelatedToEntity(column.getKey(), id);
+        } else {
+          entity.addRelatesToEntity(column.getKey(), id);
+        }
+      }
+    }
+  }
+
+  private static void readKeyValuePairs(
+      TimelineEntity entity, Result result, EntityColumnPrefix prefix)
+          throws IOException {
+    // info and configuration are of type Map<String, Object or String>
+    Map<String, Object> columns = prefix.readResults(result);
+    if (prefix.equals(EntityColumnPrefix.CONFIG)) {
+      for (Map.Entry<String, Object> column : columns.entrySet()) {
+        entity.addConfig(column.getKey(), column.getKey().toString());
+      }
+    } else {
+      entity.addInfo(columns);
+    }
+  }
+
+  private static void readEvents(TimelineEntity entity, Result result)
+      throws IOException {
+    Map<String, TimelineEvent> eventsMap = new HashMap<>();
+    Map<String, Object> eventsResult =
+        EntityColumnPrefix.EVENT.readResults(result);
+    for (Map.Entry<String,Object> eventResult : eventsResult.entrySet()) {
+      Collection<String> tokens =
+          Separator.VALUES.splitEncoded(eventResult.getKey());
+      if (tokens.size() != 2 && tokens.size() != 3) {
+        throw new IOException(
+            "Invalid event column name: " + eventResult.getKey());
+      }
+      Iterator<String> idItr = tokens.iterator();
+      String id = idItr.next();
+      String tsStr = idItr.next();
+      // TODO: timestamp is not correct via ser/des through UTF-8 string
+      Long ts =
+          TimelineWriterUtils.invert(Bytes.toLong(tsStr.getBytes(
+              StandardCharsets.UTF_8)));
+      String key = Separator.VALUES.joinEncoded(id, ts.toString());
+      TimelineEvent event = eventsMap.get(key);
+      if (event == null) {
+        event = new TimelineEvent();
+        event.setId(id);
+        event.setTimestamp(ts);
+        eventsMap.put(key, event);
+      }
+      if (tokens.size() == 3) {
+        String infoKey = idItr.next();
+        event.addInfo(infoKey, eventResult.getValue());
+      }
+    }
+    Set<TimelineEvent> eventsSet = new HashSet<>(eventsMap.values());
+    entity.addEvents(eventsSet);
+  }
+
+  private static void readMetrics(TimelineEntity entity, Result result)
+      throws IOException {
+    NavigableMap<String, NavigableMap<Long, Number>> metricsResult =
+        EntityColumnPrefix.METRIC.readResultsWithTimestamps(result);
+    for (Map.Entry<String, NavigableMap<Long, Number>> metricResult:
+        metricsResult.entrySet()) {
+      TimelineMetric metric = new TimelineMetric();
+      metric.setId(metricResult.getKey());
+      // Simply assume that if the value set contains more than 1 elements, the
+      // metric is a TIME_SERIES metric, otherwise, it's a SINGLE_VALUE metric
+      metric.setType(metricResult.getValue().size() > 1 ?
+          TimelineMetric.Type.TIME_SERIES : TimelineMetric.Type.SINGLE_VALUE);
+      metric.addValues(metricResult.getValue());
+      entity.addMetric(metric);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/0bed3fb3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/HBaseTimelineWriterImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/HBaseTimelineWriterImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/HBaseTimelineWriterImpl.java
index 3173e87..5290415 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/HBaseTimelineWriterImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/HBaseTimelineWriterImpl.java
@@ -33,9 +33,14 @@ import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.service.AbstractService;
 import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntities;
 import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity;
+import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntityType;
 import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent;
 import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetric;
 import org.apache.hadoop.yarn.api.records.timelineservice.TimelineWriteResponse;
+import org.apache.hadoop.yarn.server.metrics.ApplicationMetricsConstants;
+import org.apache.hadoop.yarn.server.timelineservice.storage.apptoflow.AppToFlowColumn;
+import org.apache.hadoop.yarn.server.timelineservice.storage.apptoflow.AppToFlowRowKey;
+import org.apache.hadoop.yarn.server.timelineservice.storage.apptoflow.AppToFlowTable;
 import org.apache.hadoop.yarn.server.timelineservice.storage.common.Separator;
 import org.apache.hadoop.yarn.server.timelineservice.storage.common.TimelineWriterUtils;
 import org.apache.hadoop.yarn.server.timelineservice.storage.common.TypedBufferedMutator;
@@ -55,6 +60,7 @@ public class HBaseTimelineWriterImpl extends AbstractService implements
 
   private Connection conn;
   private TypedBufferedMutator<EntityTable> entityTable;
+  private TypedBufferedMutator<AppToFlowTable> appToFlowTable;
 
   private static final Log LOG = LogFactory
       .getLog(HBaseTimelineWriterImpl.class);
@@ -77,6 +83,7 @@ public class HBaseTimelineWriterImpl extends AbstractService implements
     Configuration hbaseConf = HBaseConfiguration.create(conf);
     conn = ConnectionFactory.createConnection(hbaseConf);
     entityTable = new EntityTable().getTableMutator(hbaseConf, conn);
+    appToFlowTable = new AppToFlowTable().getTableMutator(hbaseConf, conn);
   }
 
   /**
@@ -97,7 +104,7 @@ public class HBaseTimelineWriterImpl extends AbstractService implements
 
       byte[] rowKey =
           EntityRowKey.getRowKey(clusterId, userId, flowName, flowRunId, appId,
-              te);
+              te.getType(), te.getId());
 
       storeInfo(rowKey, te, flowVersion);
       storeEvents(rowKey, te.getEvents());
@@ -107,11 +114,37 @@ public class HBaseTimelineWriterImpl extends AbstractService implements
           EntityColumnPrefix.IS_RELATED_TO);
       storeRelations(rowKey, te.getRelatesToEntities(),
           EntityColumnPrefix.RELATES_TO);
-    }
 
+      if (isApplicationCreated(te)) {
+        onApplicationCreated(
+            clusterId, userId, flowName, flowVersion, flowRunId, appId, te);
+      }
+    }
     return putStatus;
   }
 
+  private static boolean isApplicationCreated(TimelineEntity te) {
+    if (te.getType().equals(TimelineEntityType.YARN_APPLICATION.toString())) {
+      boolean isAppCreated = false;
+      for (TimelineEvent event : te.getEvents()) {
+        if (event.getId().equals(
+            ApplicationMetricsConstants.CREATED_EVENT_TYPE)) {
+          return true;
+        }
+      }
+    }
+    return false;
+  }
+
+  private void onApplicationCreated(String clusterId, String userId,
+      String flowName, String flowVersion, long flowRunId, String appId,
+      TimelineEntity te) throws IOException {
+    byte[] rowKey = AppToFlowRowKey.getRowKey(clusterId, appId);
+    AppToFlowColumn.FLOW_ID.store(rowKey, appToFlowTable, null, flowName);
+    AppToFlowColumn.FLOW_RUN_ID.store(
+        rowKey, appToFlowTable, null, flowRunId);
+  }
+
   /**
    * Stores the Relations from the {@linkplain TimelineEntity} object
    */
@@ -245,6 +278,7 @@ public class HBaseTimelineWriterImpl extends AbstractService implements
   public void flush() throws IOException {
     // flush all buffered mutators
     entityTable.flush();
+    appToFlowTable.flush();
   }
 
   /**
@@ -258,6 +292,11 @@ public class HBaseTimelineWriterImpl extends AbstractService implements
       // The close API performs flushing and releases any resources held
       entityTable.close();
     }
+    if (appToFlowTable != null) {
+      LOG.info("closing app_flow table");
+      // The close API performs flushing and releases any resources held
+      appToFlowTable.close();
+    }
     if (conn != null) {
       LOG.info("closing the hbase Connection");
       conn.close();

http://git-wip-us.apache.org/repos/asf/hadoop/blob/0bed3fb3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/TimelineSchemaCreator.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/TimelineSchemaCreator.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/TimelineSchemaCreator.java
index a5cc2ab..2c3897d 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/TimelineSchemaCreator.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/TimelineSchemaCreator.java
@@ -37,6 +37,7 @@ import org.apache.hadoop.hbase.client.Admin;
 import org.apache.hadoop.hbase.client.Connection;
 import org.apache.hadoop.hbase.client.ConnectionFactory;
 import org.apache.hadoop.util.GenericOptionsParser;
+import org.apache.hadoop.yarn.server.timelineservice.storage.apptoflow.AppToFlowTable;
 import org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityTable;
 
 /**
@@ -70,6 +71,11 @@ public class TimelineSchemaCreator {
       int metricsTTL = Integer.parseInt(entityTableTTLMetrics);
       new EntityTable().setMetricsTTL(metricsTTL, hbaseConf);
     }
+    // Grab the appToflowTableName argument
+    String appToflowTableName = commandLine.getOptionValue("a2f");
+    if (StringUtils.isNotBlank(appToflowTableName)) {
+      hbaseConf.set(AppToFlowTable.TABLE_NAME_CONF_NAME, appToflowTableName);
+    }
     createAllTables(hbaseConf);
   }
 
@@ -95,6 +101,11 @@ public class TimelineSchemaCreator {
     o.setRequired(false);
     options.addOption(o);
 
+    o = new Option("a2f", "appToflowTableName", true, "app to flow table name");
+    o.setArgName("appToflowTableName");
+    o.setRequired(false);
+    options.addOption(o);
+
     CommandLineParser parser = new PosixParser();
     CommandLine commandLine = null;
     try {
@@ -120,6 +131,7 @@ public class TimelineSchemaCreator {
         throw new IOException("Cannot create table since admin is null");
       }
       new EntityTable().createTable(admin, hbaseConf);
+      new AppToFlowTable().createTable(admin, hbaseConf);
     } finally {
       if (conn != null) {
         conn.close();

http://git-wip-us.apache.org/repos/asf/hadoop/blob/0bed3fb3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/apptoflow/AppToFlowColumn.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/apptoflow/AppToFlowColumn.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/apptoflow/AppToFlowColumn.java
new file mode 100644
index 0000000..423037a
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/apptoflow/AppToFlowColumn.java
@@ -0,0 +1,126 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.timelineservice.storage.apptoflow;
+
+
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.yarn.server.timelineservice.storage.common.Column;
+import org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnFamily;
+import org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnHelper;
+import org.apache.hadoop.yarn.server.timelineservice.storage.common.Separator;
+import org.apache.hadoop.yarn.server.timelineservice.storage.common.TypedBufferedMutator;
+
+import java.io.IOException;
+
+/**
+ * Identifies fully qualified columns for the {@link AppToFlowTable}.
+ */
+public enum AppToFlowColumn implements Column<AppToFlowTable> {
+
+  /**
+   * The flow ID
+   */
+  FLOW_ID(AppToFlowColumnFamily.MAPPING, "flow_id"),
+
+  /**
+   * The flow run ID
+   */
+  FLOW_RUN_ID(AppToFlowColumnFamily.MAPPING, "flow_run_id");
+
+  private final ColumnHelper<AppToFlowTable> column;
+  private final ColumnFamily<AppToFlowTable> columnFamily;
+  private final String columnQualifier;
+  private final byte[] columnQualifierBytes;
+
+  AppToFlowColumn(ColumnFamily<AppToFlowTable> columnFamily,
+      String columnQualifier) {
+    this.columnFamily = columnFamily;
+    this.columnQualifier = columnQualifier;
+    // Future-proof by ensuring the right column prefix hygiene.
+    this.columnQualifierBytes =
+        Bytes.toBytes(Separator.SPACE.encode(columnQualifier));
+    this.column = new ColumnHelper<AppToFlowTable>(columnFamily);
+  }
+
+  /**
+   * @return the column name value
+   */
+  private String getColumnQualifier() {
+    return columnQualifier;
+  }
+
+  public void store(byte[] rowKey,
+      TypedBufferedMutator<AppToFlowTable> tableMutator, Long timestamp,
+      Object inputValue) throws IOException {
+    column.store(rowKey, tableMutator, columnQualifierBytes, timestamp,
+        inputValue);
+  }
+
+  public Object readResult(Result result) throws IOException {
+    return column.readResult(result, columnQualifierBytes);
+  }
+
+  /**
+   * Retrieve an {@link AppToFlowColumn} given a name, or null if there is no
+   * match. The following holds true: {@code columnFor(x) == columnFor(y)} if
+   * and only if {@code x.equals(y)} or {@code (x == y == null)}
+   *
+   * @param columnQualifier Name of the column to retrieve
+   * @return the corresponding {@link AppToFlowColumn} or null
+   */
+  public static final AppToFlowColumn columnFor(String columnQualifier) {
+
+    // Match column based on value, assume column family matches.
+    for (AppToFlowColumn ec : AppToFlowColumn.values()) {
+      // Find a match based only on name.
+      if (ec.getColumnQualifier().equals(columnQualifier)) {
+        return ec;
+      }
+    }
+
+    // Default to null
+    return null;
+  }
+
+  /**
+   * Retrieve an {@link AppToFlowColumn} given a name, or null if there is no
+   * match. The following holds true: {@code columnFor(a,x) == columnFor(b,y)}
+   * if and only if {@code a.equals(b) & x.equals(y)} or
+   * {@code (x == y == null)}
+   *
+   * @param columnFamily The columnFamily for which to retrieve the column.
+   * @param name Name of the column to retrieve
+   * @return the corresponding {@link AppToFlowColumn} or null if both arguments
+   *         don't match.
+   */
+  public static final AppToFlowColumn columnFor(
+      AppToFlowColumnFamily columnFamily, String name) {
+
+    for (AppToFlowColumn ec : AppToFlowColumn.values()) {
+      // Find a match based column family and on name.
+      if (ec.columnFamily.equals(columnFamily)
+          && ec.getColumnQualifier().equals(name)) {
+        return ec;
+      }
+    }
+
+    // Default to null
+    return null;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/0bed3fb3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/apptoflow/AppToFlowColumnFamily.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/apptoflow/AppToFlowColumnFamily.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/apptoflow/AppToFlowColumnFamily.java
new file mode 100644
index 0000000..e74235f
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/apptoflow/AppToFlowColumnFamily.java
@@ -0,0 +1,51 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.timelineservice.storage.apptoflow;
+
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnFamily;
+import org.apache.hadoop.yarn.server.timelineservice.storage.common.Separator;
+
+/**
+ * Represents the app_flow table column families.
+ */
+public enum AppToFlowColumnFamily implements ColumnFamily<AppToFlowTable> {
+  /**
+   * Mapping column family houses known columns such as flowId and flowRunId
+   */
+  MAPPING("m");
+
+  /**
+   * Byte representation of this column family.
+   */
+  private final byte[] bytes;
+
+  /**
+   * @param value create a column family with this name. Must be lower case and
+   *          without spaces.
+   */
+  AppToFlowColumnFamily(String value) {
+    // column families should be lower case and not contain any spaces.
+    this.bytes = Bytes.toBytes(Separator.SPACE.encode(value));
+  }
+
+  public byte[] getBytes() {
+    return Bytes.copy(bytes);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/0bed3fb3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/apptoflow/AppToFlowRowKey.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/apptoflow/AppToFlowRowKey.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/apptoflow/AppToFlowRowKey.java
new file mode 100644
index 0000000..ad4fec6
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/apptoflow/AppToFlowRowKey.java
@@ -0,0 +1,39 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.timelineservice.storage.apptoflow;
+
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.yarn.server.timelineservice.storage.common.Separator;
+
+/**
+ * Represents a rowkey for the app_flow table.
+ */
+public class AppToFlowRowKey {
+  /**
+   * Constructs a row key prefix for the app_flow table as follows:
+   * {@code clusterId!AppId}
+   *
+   * @param clusterId
+   * @param appId
+   * @return byte array with the row key
+   */
+  public static byte[] getRowKey(String clusterId, String appId) {
+    return Bytes.toBytes(Separator.QUALIFIERS.joinEncoded(clusterId, appId));
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/0bed3fb3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/apptoflow/AppToFlowTable.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/apptoflow/AppToFlowTable.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/apptoflow/AppToFlowTable.java
new file mode 100644
index 0000000..2467856
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/apptoflow/AppToFlowTable.java
@@ -0,0 +1,110 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.timelineservice.storage.apptoflow;
+
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HColumnDescriptor;
+import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.Admin;
+import org.apache.hadoop.hbase.regionserver.BloomType;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.server.timelineservice.storage.common.BaseTable;
+import org.apache.hadoop.yarn.server.timelineservice.storage.common.TimelineHBaseSchemaConstants;
+
+import java.io.IOException;
+
+/**
+ * The app_flow table as column families mapping. Mapping stores
+ * appId to flowId and flowRunId mapping information
+ *
+ * Example app_flow table record:
+ *
+ * <pre>
+ * |--------------------------------------|
+ * |  Row       | Column Family           |
+ * |  key       | info                    |
+ * |--------------------------------------|
+ * | clusterId! | flowId:                 |
+ * | AppId      | foo@daily_hive_report   |
+ * |            |                         |
+ * |            | flowRunId:              |
+ * |            | 1452828720457           |
+ * |            |                         |
+ * |            |                         |
+ * |            |                         |
+ * |--------------------------------------|
+ * </pre>
+ */
+public class AppToFlowTable extends BaseTable<AppToFlowTable> {
+  /** app_flow prefix */
+  private static final String PREFIX =
+      YarnConfiguration.TIMELINE_SERVICE_PREFIX + "app-flow";
+
+  /** config param name that specifies the app_flow table name */
+  public static final String TABLE_NAME_CONF_NAME = PREFIX + ".table.name";
+
+  /** default value for app_flow table name */
+  private static final String DEFAULT_TABLE_NAME = "timelineservice.app_flow";
+
+  private static final Log LOG = LogFactory.getLog(AppToFlowTable.class);
+
+  public AppToFlowTable() {
+    super(TABLE_NAME_CONF_NAME, DEFAULT_TABLE_NAME);
+  }
+
+  /*
+   * (non-Javadoc)
+   *
+   * @see
+   * org.apache.hadoop.yarn.server.timelineservice.storage.BaseTable#createTable
+   * (org.apache.hadoop.hbase.client.Admin,
+   * org.apache.hadoop.conf.Configuration)
+   */
+  public void createTable(Admin admin, Configuration hbaseConf)
+      throws IOException {
+
+    TableName table = getTableName(hbaseConf);
+    if (admin.tableExists(table)) {
+      // do not disable / delete existing table
+      // similar to the approach taken by map-reduce jobs when
+      // output directory exists
+      throw new IOException("Table " + table.getNameAsString()
+          + " already exists.");
+    }
+
+    HTableDescriptor appToFlowTableDescp = new HTableDescriptor(table);
+    HColumnDescriptor mappCF =
+        new HColumnDescriptor(AppToFlowColumnFamily.MAPPING.getBytes());
+    mappCF.setBloomFilterType(BloomType.ROWCOL);
+    appToFlowTableDescp.addFamily(mappCF);
+
+    appToFlowTableDescp
+        .setRegionSplitPolicyClassName(
+            "org.apache.hadoop.hbase.regionserver.KeyPrefixRegionSplitPolicy");
+    appToFlowTableDescp.setValue("KeyPrefixRegionSplitPolicy.prefix_length",
+        TimelineHBaseSchemaConstants.USERNAME_SPLIT_KEY_PREFIX_LENGTH);
+    admin.createTable(appToFlowTableDescp,
+        TimelineHBaseSchemaConstants.getUsernameSplits());
+    LOG.info("Status of table creation for " + table.getNameAsString() + "="
+        + admin.tableExists(table));
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/0bed3fb3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/apptoflow/package-info.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/apptoflow/package-info.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/apptoflow/package-info.java
new file mode 100644
index 0000000..df7ffc1
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/apptoflow/package-info.java
@@ -0,0 +1,23 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Unstable
+package org.apache.hadoop.yarn.server.timelineservice.storage.apptoflow;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hadoop/blob/0bed3fb3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/BaseTable.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/BaseTable.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/BaseTable.java
index e8d8b5c..abba79a 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/BaseTable.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/BaseTable.java
@@ -24,6 +24,8 @@ import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.client.Admin;
 import org.apache.hadoop.hbase.client.BufferedMutator;
 import org.apache.hadoop.hbase.client.Connection;
+import org.apache.hadoop.hbase.client.Get;
+import org.apache.hadoop.hbase.client.Result;
 import org.apache.hadoop.hbase.client.ResultScanner;
 import org.apache.hadoop.hbase.client.Scan;
 import org.apache.hadoop.hbase.client.Table;
@@ -94,6 +96,20 @@ public abstract class BaseTable<T> {
   }
 
   /**
+   *
+   * @param hbaseConf used to read settings that override defaults
+   * @param conn used to create table from
+   * @param get that specifies what single row you want to get from this table
+   * @return result of get operation
+   * @throws IOException
+   */
+  public Result getResult(Configuration hbaseConf, Connection conn, Get get)
+      throws IOException {
+    Table table = conn.getTable(getTableName(hbaseConf));
+    return table.get(get);
+  }
+
+  /**
    * Get the table name for this table.
    *
    * @param hbaseConf

http://git-wip-us.apache.org/repos/asf/hadoop/blob/0bed3fb3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/ColumnPrefix.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/ColumnPrefix.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/ColumnPrefix.java
index 671c824..509ff49 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/ColumnPrefix.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/ColumnPrefix.java
@@ -64,7 +64,7 @@ public interface ColumnPrefix<T> {
   public Object readResult(Result result, String qualifier) throws IOException;
 
   /**
-   * @param resultfrom which to read columns
+   * @param result from which to read columns
    * @return the latest values of columns in the column family with this prefix
    *         (or all of them if the prefix value is null).
    * @throws IOException

http://git-wip-us.apache.org/repos/asf/hadoop/blob/0bed3fb3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/TimelineEntitySchemaConstants.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/TimelineEntitySchemaConstants.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/TimelineEntitySchemaConstants.java
deleted file mode 100644
index 5518a27..0000000
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/TimelineEntitySchemaConstants.java
+++ /dev/null
@@ -1,68 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.yarn.server.timelineservice.storage.common;
-
-import org.apache.hadoop.classification.InterfaceAudience;
-import org.apache.hadoop.classification.InterfaceStability;
-import org.apache.hadoop.hbase.util.Bytes;
-
-/**
- * contains the constants used in the context of schema accesses for
- * {@link org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity}
- * information
- */
-@InterfaceAudience.Private
-@InterfaceStability.Unstable
-public class TimelineEntitySchemaConstants {
-
-  /**
-   * Used to create a pre-split for tables starting with a username in the
-   * prefix. TODO: this may have to become a config variable (string with
-   * separators) so that different installations can presplit based on their own
-   * commonly occurring names.
-   */
-  private final static byte[][] USERNAME_SPLITS = { Bytes.toBytes("a"),
-      Bytes.toBytes("ad"), Bytes.toBytes("an"), Bytes.toBytes("b"),
-      Bytes.toBytes("ca"), Bytes.toBytes("cl"), Bytes.toBytes("d"),
-      Bytes.toBytes("e"), Bytes.toBytes("f"), Bytes.toBytes("g"),
-      Bytes.toBytes("h"), Bytes.toBytes("i"), Bytes.toBytes("j"),
-      Bytes.toBytes("k"), Bytes.toBytes("l"), Bytes.toBytes("m"),
-      Bytes.toBytes("n"), Bytes.toBytes("o"), Bytes.toBytes("q"),
-      Bytes.toBytes("r"), Bytes.toBytes("s"), Bytes.toBytes("se"),
-      Bytes.toBytes("t"), Bytes.toBytes("u"), Bytes.toBytes("v"),
-      Bytes.toBytes("w"), Bytes.toBytes("x"), Bytes.toBytes("y"),
-      Bytes.toBytes("z") };
-
-  /**
-   * The length at which keys auto-split
-   */
-  public static final String USERNAME_SPLIT_KEY_PREFIX_LENGTH = "4";
-
-  /**
-   * @return splits for splits where a user is a prefix.
-   */
-  public final static byte[][] getUsernameSplits() {
-    byte[][] kloon = USERNAME_SPLITS.clone();
-    // Deep copy.
-    for (int row = 0; row < USERNAME_SPLITS.length; row++) {
-      kloon[row] = Bytes.copy(USERNAME_SPLITS[row]);
-    }
-    return kloon;
-  }
-
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hadoop/blob/0bed3fb3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/TimelineHBaseSchemaConstants.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/TimelineHBaseSchemaConstants.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/TimelineHBaseSchemaConstants.java
new file mode 100644
index 0000000..bbf498a
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/TimelineHBaseSchemaConstants.java
@@ -0,0 +1,68 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.timelineservice.storage.common;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.hbase.util.Bytes;
+
+/**
+ * contains the constants used in the context of schema accesses for
+ * {@link org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity}
+ * information
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Unstable
+public class TimelineHBaseSchemaConstants {
+
+  /**
+   * Used to create a pre-split for tables starting with a username in the
+   * prefix. TODO: this may have to become a config variable (string with
+   * separators) so that different installations can presplit based on their own
+   * commonly occurring names.
+   */
+  private final static byte[][] USERNAME_SPLITS = { Bytes.toBytes("a"),
+      Bytes.toBytes("ad"), Bytes.toBytes("an"), Bytes.toBytes("b"),
+      Bytes.toBytes("ca"), Bytes.toBytes("cl"), Bytes.toBytes("d"),
+      Bytes.toBytes("e"), Bytes.toBytes("f"), Bytes.toBytes("g"),
+      Bytes.toBytes("h"), Bytes.toBytes("i"), Bytes.toBytes("j"),
+      Bytes.toBytes("k"), Bytes.toBytes("l"), Bytes.toBytes("m"),
+      Bytes.toBytes("n"), Bytes.toBytes("o"), Bytes.toBytes("q"),
+      Bytes.toBytes("r"), Bytes.toBytes("s"), Bytes.toBytes("se"),
+      Bytes.toBytes("t"), Bytes.toBytes("u"), Bytes.toBytes("v"),
+      Bytes.toBytes("w"), Bytes.toBytes("x"), Bytes.toBytes("y"),
+      Bytes.toBytes("z") };
+
+  /**
+   * The length at which keys auto-split
+   */
+  public static final String USERNAME_SPLIT_KEY_PREFIX_LENGTH = "4";
+
+  /**
+   * @return splits for splits where a user is a prefix.
+   */
+  public final static byte[][] getUsernameSplits() {
+    byte[][] kloon = USERNAME_SPLITS.clone();
+    // Deep copy.
+    for (int row = 0; row < USERNAME_SPLITS.length; row++) {
+      kloon[row] = Bytes.copy(USERNAME_SPLITS[row]);
+    }
+    return kloon;
+  }
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hadoop/blob/0bed3fb3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/TimelineReaderUtils.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/TimelineReaderUtils.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/TimelineReaderUtils.java
new file mode 100644
index 0000000..91d7ba4
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/TimelineReaderUtils.java
@@ -0,0 +1,112 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.timelineservice.storage.common;
+
+import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent;
+import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetric;
+
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+
+
+public class TimelineReaderUtils {
+  /**
+   *
+   * @param entityRelations the relations of an entity
+   * @param relationFilters the relations for filtering
+   * @return a boolean flag to indicate if both match
+   */
+  public static boolean matchRelations(
+      Map<String, Set<String>> entityRelations,
+      Map<String, Set<String>> relationFilters) {
+    for (Map.Entry<String, Set<String>> relation : relationFilters.entrySet()) {
+      Set<String> ids = entityRelations.get(relation.getKey());
+      if (ids == null) {
+        return false;
+      }
+      for (String id : relation.getValue()) {
+        if (!ids.contains(id)) {
+          return false;
+        }
+      }
+    }
+    return true;
+  }
+
+  /**
+   *
+   * @param map the map of key/value pairs in an entity
+   * @param filters the map of key/value pairs for filtering
+   * @return a boolean flag to indicate if both match
+   */
+  public static boolean matchFilters(Map<String, ? extends Object> map,
+      Map<String, ? extends Object> filters) {
+    for (Map.Entry<String, ? extends Object> filter : filters.entrySet()) {
+      Object value = map.get(filter.getKey());
+      if (value == null) {
+        return false;
+      }
+      if (!value.equals(filter.getValue())) {
+        return false;
+      }
+    }
+    return true;
+  }
+
+  /**
+   *
+   * @param entityEvents the set of event objects in an entity
+   * @param eventFilters the set of event Ids for filtering
+   * @return a boolean flag to indicate if both match
+   */
+  public static boolean matchEventFilters(Set<TimelineEvent> entityEvents,
+      Set<String> eventFilters) {
+    Set<String> eventIds = new HashSet<String>();
+    for (TimelineEvent event : entityEvents) {
+      eventIds.add(event.getId());
+    }
+    for (String eventFilter : eventFilters) {
+      if (!eventIds.contains(eventFilter)) {
+        return false;
+      }
+    }
+    return true;
+  }
+
+  /**
+   *
+   * @param metrics the set of metric objects in an entity
+   * @param metricFilters the set of metric Ids for filtering
+   * @return a boolean flag to indicate if both match
+   */
+  public static boolean matchMetricFilters(Set<TimelineMetric> metrics,
+      Set<String> metricFilters) {
+    Set<String> metricIds = new HashSet<String>();
+    for (TimelineMetric metric : metrics) {
+      metricIds.add(metric.getId());
+    }
+
+    for (String metricFilter : metricFilters) {
+      if (!metricIds.contains(metricFilter)) {
+        return false;
+      }
+    }
+    return true;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/0bed3fb3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/entity/EntityColumn.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/entity/EntityColumn.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/entity/EntityColumn.java
index 90da966..26e7748 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/entity/EntityColumn.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/entity/EntityColumn.java
@@ -62,7 +62,7 @@ public enum EntityColumn implements Column<EntityTable> {
   private final String columnQualifier;
   private final byte[] columnQualifierBytes;
 
-  private EntityColumn(ColumnFamily<EntityTable> columnFamily,
+  EntityColumn(ColumnFamily<EntityTable> columnFamily,
       String columnQualifier) {
     this.columnFamily = columnFamily;
     this.columnQualifier = columnQualifier;

http://git-wip-us.apache.org/repos/asf/hadoop/blob/0bed3fb3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/entity/EntityColumnFamily.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/entity/EntityColumnFamily.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/entity/EntityColumnFamily.java
index 8a95d12..7c63727 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/entity/EntityColumnFamily.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/entity/EntityColumnFamily.java
@@ -53,7 +53,7 @@ public enum EntityColumnFamily implements ColumnFamily<EntityTable> {
    * @param value create a column family with this name. Must be lower case and
    *          without spaces.
    */
-  private EntityColumnFamily(String value) {
+  EntityColumnFamily(String value) {
     // column families should be lower case and not contain any spaces.
     this.bytes = Bytes.toBytes(Separator.SPACE.encode(value));
   }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/0bed3fb3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/entity/EntityColumnPrefix.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/entity/EntityColumnPrefix.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/entity/EntityColumnPrefix.java
index 8b7bc3e..58272ab 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/entity/EntityColumnPrefix.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/entity/EntityColumnPrefix.java
@@ -80,7 +80,7 @@ public enum EntityColumnPrefix implements ColumnPrefix<EntityTable> {
    * @param columnFamily that this column is stored in.
    * @param columnPrefix for this column.
    */
-  private EntityColumnPrefix(ColumnFamily<EntityTable> columnFamily,
+  EntityColumnPrefix(ColumnFamily<EntityTable> columnFamily,
       String columnPrefix) {
     column = new ColumnHelper<EntityTable>(columnFamily);
     this.columnFamily = columnFamily;

http://git-wip-us.apache.org/repos/asf/hadoop/blob/0bed3fb3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/entity/EntityRowKey.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/entity/EntityRowKey.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/entity/EntityRowKey.java
index 3e17ad0..9a72be0 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/entity/EntityRowKey.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/entity/EntityRowKey.java
@@ -55,17 +55,45 @@ public class EntityRowKey {
 
   /**
    * Constructs a row key prefix for the entity table as follows:
-   * {@code userName!clusterId!flowId!flowRunId!AppId}
+   * {@code userName!clusterId!flowId!flowRunId!AppId!entityType!}
    *
    * @param clusterId
    * @param userId
    * @param flowId
    * @param flowRunId
    * @param appId
+   * @param entityType
    * @return byte array with the row key prefix
    */
+  public static byte[] getRowKeyPrefix(String clusterId, String userId,
+      String flowId, Long flowRunId, String appId, String entityType) {
+    byte[] first =
+        Bytes.toBytes(Separator.QUALIFIERS.joinEncoded(userId, clusterId,
+            flowId));
+    // Note that flowRunId is a long, so we can't encode them all at the same
+    // time.
+    byte[] second = Bytes.toBytes(TimelineWriterUtils.invert(flowRunId));
+    byte[] third =
+        Bytes.toBytes(Separator.QUALIFIERS.joinEncoded(appId, entityType, ""));
+    return Separator.QUALIFIERS.join(first, second, third);
+  }
+
+  /**
+   * Constructs a row key for the entity table as follows:
+   * {@code userName!clusterId!flowId!flowRunId!AppId!entityType!entityId}
+   *
+   * @param clusterId
+   * @param userId
+   * @param flowId
+   * @param flowRunId
+   * @param appId
+   * @param entityType
+   * @param entityId
+   * @return byte array with the row key
+   */
   public static byte[] getRowKey(String clusterId, String userId,
-      String flowId, Long flowRunId, String appId, TimelineEntity te) {
+      String flowId, Long flowRunId, String appId, String entityType,
+      String entityId) {
     byte[] first =
         Bytes.toBytes(Separator.QUALIFIERS.joinEncoded(userId, clusterId,
             flowId));
@@ -73,8 +101,8 @@ public class EntityRowKey {
     // time.
     byte[] second = Bytes.toBytes(TimelineWriterUtils.invert(flowRunId));
     byte[] third =
-        Bytes.toBytes(Separator.QUALIFIERS.joinEncoded(appId, te.getType(),
-            te.getId()));
+        Bytes.toBytes(Separator.QUALIFIERS.joinEncoded(appId, entityType,
+            entityId));
     return Separator.QUALIFIERS.join(first, second, third);
   }
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/0bed3fb3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/entity/EntityTable.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/entity/EntityTable.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/entity/EntityTable.java
index 2ae7d39..f657a14 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/entity/EntityTable.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/entity/EntityTable.java
@@ -29,7 +29,7 @@ import org.apache.hadoop.hbase.client.Admin;
 import org.apache.hadoop.hbase.regionserver.BloomType;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.server.timelineservice.storage.common.BaseTable;
-import org.apache.hadoop.yarn.server.timelineservice.storage.common.TimelineEntitySchemaConstants;
+import org.apache.hadoop.yarn.server.timelineservice.storage.common.TimelineHBaseSchemaConstants;
 
 /**
  * The entity table as column families info, config and metrics. Info stores
@@ -75,7 +75,7 @@ import org.apache.hadoop.yarn.server.timelineservice.storage.common.TimelineEnti
 public class EntityTable extends BaseTable<EntityTable> {
   /** entity prefix */
   private static final String PREFIX =
-      YarnConfiguration.TIMELINE_SERVICE_PREFIX + ".entity";
+      YarnConfiguration.TIMELINE_SERVICE_PREFIX + "entity";
 
   /** config param name that specifies the entity table name */
   public static final String TABLE_NAME_CONF_NAME = PREFIX + ".table.name";
@@ -146,9 +146,9 @@ public class EntityTable extends BaseTable<EntityTable> {
     entityTableDescp
         .setRegionSplitPolicyClassName("org.apache.hadoop.hbase.regionserver.KeyPrefixRegionSplitPolicy");
     entityTableDescp.setValue("KeyPrefixRegionSplitPolicy.prefix_length",
-        TimelineEntitySchemaConstants.USERNAME_SPLIT_KEY_PREFIX_LENGTH);
+        TimelineHBaseSchemaConstants.USERNAME_SPLIT_KEY_PREFIX_LENGTH);
     admin.createTable(entityTableDescp,
-        TimelineEntitySchemaConstants.getUsernameSplits());
+        TimelineHBaseSchemaConstants.getUsernameSplits());
     LOG.info("Status of table creation for " + table.getNameAsString() + "="
         + admin.tableExists(table));
   }