You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by vr...@apache.org on 2015/09/22 22:47:32 UTC

[2/2] hadoop git commit: YARN-4074. [timeline reader] implement support for querying for flows and flow runs (sjlee via vrushali)

YARN-4074. [timeline reader] implement support for querying for flows and flow runs (sjlee via vrushali)


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/2e7e0f0b
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/2e7e0f0b
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/2e7e0f0b

Branch: refs/heads/YARN-2928
Commit: 2e7e0f0bbba4b3ab4280de06d595018bc5dec87b
Parents: 4b37985
Author: Vrushali <vr...@apache.org>
Authored: Tue Sep 22 13:42:30 2015 -0700
Committer: Vrushali <vr...@apache.org>
Committed: Tue Sep 22 13:42:30 2015 -0700

----------------------------------------------------------------------
 hadoop-yarn-project/CHANGES.txt                 |   3 +
 .../timelineservice/FlowActivityEntity.java     | 183 ++++++++
 .../api/records/timelineservice/FlowEntity.java | 103 -----
 .../records/timelineservice/FlowRunEntity.java  | 121 ++++++
 .../timelineservice/TimelineEntityType.java     |  31 +-
 .../TestTimelineServiceRecords.java             |  14 +-
 .../TestTimelineServiceClientIntegration.java   |   2 +-
 .../collector/TimelineCollectorWebService.java  |   6 +-
 .../storage/ApplicationEntityReader.java        | 229 ++++++++++
 .../storage/FlowActivityEntityReader.java       | 168 +++++++
 .../storage/FlowRunEntityReader.java            | 136 ++++++
 .../storage/GenericEntityReader.java            | 389 +++++++++++++++++
 .../storage/HBaseTimelineReaderImpl.java        | 434 +------------------
 .../storage/TimelineEntityReader.java           | 223 ++++++++++
 .../storage/TimelineEntityReaderFactory.java    |  97 +++++
 .../storage/application/ApplicationRowKey.java  |  68 ++-
 .../storage/apptoflow/AppToFlowRowKey.java      |  31 ++
 .../storage/common/BaseTable.java               |   3 +-
 .../storage/entity/EntityRowKey.java            |  76 +++-
 .../storage/flow/FlowActivityRowKey.java        |   7 +-
 .../storage/flow/FlowRunRowKey.java             |  50 ++-
 .../storage/flow/FlowScanner.java               |  18 +-
 .../storage/TestHBaseTimelineStorage.java       |  34 +-
 .../storage/flow/TestFlowDataGenerator.java     |  39 +-
 .../flow/TestHBaseStorageFlowActivity.java      | 131 ++++--
 .../storage/flow/TestHBaseStorageFlowRun.java   | 105 +++--
 26 files changed, 2021 insertions(+), 680 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/2e7e0f0b/hadoop-yarn-project/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt
index cc0f014..114b2fe 100644
--- a/hadoop-yarn-project/CHANGES.txt
+++ b/hadoop-yarn-project/CHANGES.txt
@@ -106,6 +106,9 @@ Branch YARN-2928: Timeline Server Next Generation: Phase 1
     YARN-3901. Populate flow run data in the flow_run & flow activity tables
     (Vrushali C via sjlee)
 
+    YARN-4074. [timeline reader] implement support for querying for flows
+    and flow runs (sjlee via vrushali)
+
   IMPROVEMENTS
 
     YARN-3276. Code cleanup for timeline service API records. (Junping Du via

http://git-wip-us.apache.org/repos/asf/hadoop/blob/2e7e0f0b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/timelineservice/FlowActivityEntity.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/timelineservice/FlowActivityEntity.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/timelineservice/FlowActivityEntity.java
new file mode 100644
index 0000000..163bd5c
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/timelineservice/FlowActivityEntity.java
@@ -0,0 +1,183 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.api.records.timelineservice;
+
+import java.util.Collection;
+import java.util.Date;
+import java.util.NavigableSet;
+import java.util.TreeSet;
+
+import javax.xml.bind.annotation.XmlElement;
+
+import org.apache.hadoop.classification.InterfaceAudience.Public;
+import org.apache.hadoop.classification.InterfaceStability.Unstable;
+
+/**
+ * Entity that represents a record for flow activity. It's essentially a
+ * container entity for flow runs with limited information.
+ */
+@Public
+@Unstable
+public class FlowActivityEntity extends TimelineEntity {
+  public static final String CLUSTER_INFO_KEY =
+      TimelineEntity.SYSTEM_INFO_KEY_PREFIX +  "CLUSTER";
+  public static final String DATE_INFO_KEY =
+      TimelineEntity.SYSTEM_INFO_KEY_PREFIX +  "DATE";
+  public static final String USER_INFO_KEY =
+      TimelineEntity.SYSTEM_INFO_KEY_PREFIX + "USER";
+  public static final String FLOW_NAME_INFO_KEY =
+      TimelineEntity.SYSTEM_INFO_KEY_PREFIX + "FLOW_NAME";
+
+  private final NavigableSet<FlowRunEntity> flowRuns = new TreeSet<>();
+
+  public FlowActivityEntity() {
+    super(TimelineEntityType.YARN_FLOW_ACTIVITY.toString());
+    // set config to null
+    setConfigs(null);
+  }
+
+  public FlowActivityEntity(String cluster, long time, String user,
+      String flowName) {
+    this();
+    setCluster(cluster);
+    setDate(time);
+    setUser(user);
+    setFlowName(flowName);
+  }
+
+  public FlowActivityEntity(TimelineEntity entity) {
+    super(entity);
+    if (!TimelineEntityType.YARN_FLOW_ACTIVITY.matches(entity.getType())) {
+      throw new IllegalArgumentException("Incompatible entity type: " +
+          getId());
+    }
+    // set config to null
+    setConfigs(null);
+  }
+
+  @XmlElement(name = "id")
+  @Override
+  public String getId() {
+    // flow activity: cluster/day/user@flow_name
+    String id = super.getId();
+    if (id == null) {
+      StringBuilder sb = new StringBuilder();
+      sb.append(getCluster());
+      sb.append('/');
+      sb.append(getDate().getTime());
+      sb.append('/');
+      sb.append(getUser());
+      sb.append('@');
+      sb.append(getFlowName());
+      id = sb.toString();
+      setId(id);
+    }
+    return id;
+  }
+
+  @Override
+  public int compareTo(TimelineEntity entity) {
+    int comparison = getType().compareTo(entity.getType());
+    if (comparison == 0) {
+      // order by cluster, date (descending), user, and flow name
+      FlowActivityEntity other = (FlowActivityEntity)entity;
+      int clusterComparison = getCluster().compareTo(other.getCluster());
+      if (clusterComparison != 0) {
+        return clusterComparison;
+      }
+      int dateComparisonDescending =
+          (int)(other.getDate().getTime() - getDate().getTime()); // descending
+      if (dateComparisonDescending != 0) {
+        return dateComparisonDescending; // descending
+      }
+      int userComparison = getUser().compareTo(other.getUser());
+      if (userComparison != 0) {
+        return userComparison;
+      }
+      return getFlowName().compareTo(other.getFlowName());
+    } else {
+      return comparison;
+    }
+  }
+
+  /**
+   * Reuse the base class equals method.
+   */
+  @Override
+  public boolean equals(Object obj) {
+    return super.equals(obj);
+  }
+
+  /**
+   * Reuse the base class hashCode method.
+   */
+  @Override
+  public int hashCode() {
+    return super.hashCode();
+  }
+
+  public String getCluster() {
+    return (String)getInfo().get(CLUSTER_INFO_KEY);
+  }
+
+  public void setCluster(String cluster) {
+    addInfo(CLUSTER_INFO_KEY, cluster);
+  }
+
+  public Date getDate() {
+    return (Date)getInfo().get(DATE_INFO_KEY);
+  }
+
+  public void setDate(long time) {
+    Date date = new Date(time);
+    addInfo(DATE_INFO_KEY, date);
+  }
+
+  public String getUser() {
+    return (String)getInfo().get(USER_INFO_KEY);
+  }
+
+  public void setUser(String user) {
+    addInfo(USER_INFO_KEY, user);
+  }
+
+  public String getFlowName() {
+    return (String)getInfo().get(FLOW_NAME_INFO_KEY);
+  }
+
+  public void setFlowName(String flowName) {
+    addInfo(FLOW_NAME_INFO_KEY, flowName);
+  }
+
+  public void addFlowRun(FlowRunEntity run) {
+    flowRuns.add(run);
+  }
+
+  public void addFlowRuns(Collection<FlowRunEntity> runs) {
+    flowRuns.addAll(runs);
+  }
+
+  @XmlElement(name = "flowruns")
+  public NavigableSet<FlowRunEntity> getFlowRuns() {
+    return flowRuns;
+  }
+
+  public int getNumberOfRuns() {
+    return flowRuns.size();
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/2e7e0f0b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/timelineservice/FlowEntity.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/timelineservice/FlowEntity.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/timelineservice/FlowEntity.java
deleted file mode 100644
index 4554778..0000000
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/timelineservice/FlowEntity.java
+++ /dev/null
@@ -1,103 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.yarn.api.records.timelineservice;
-
-import org.apache.hadoop.classification.InterfaceAudience;
-import org.apache.hadoop.classification.InterfaceStability;
-
-import javax.xml.bind.annotation.XmlElement;
-
-@InterfaceAudience.Public
-@InterfaceStability.Unstable
-public class FlowEntity extends HierarchicalTimelineEntity {
-  public static final String USER_INFO_KEY =
-      TimelineEntity.SYSTEM_INFO_KEY_PREFIX + "USER";
-  public static final String FLOW_NAME_INFO_KEY =
-      TimelineEntity.SYSTEM_INFO_KEY_PREFIX + "FLOW_NAME";
-  public static final String FLOW_VERSION_INFO_KEY =
-      TimelineEntity.SYSTEM_INFO_KEY_PREFIX +  "FLOW_VERSION";
-  public static final String FLOW_RUN_ID_INFO_KEY =
-      TimelineEntity.SYSTEM_INFO_KEY_PREFIX +  "FLOW_RUN_ID";
-
-  public FlowEntity() {
-    super(TimelineEntityType.YARN_FLOW.toString());
-  }
-
-  public FlowEntity(TimelineEntity entity) {
-    super(entity);
-    if (!entity.getType().equals(TimelineEntityType.YARN_FLOW.toString())) {
-      throw new IllegalArgumentException("Incompatible entity type: " + getId());
-    }
-  }
-
-  @XmlElement(name = "id")
-  @Override
-  public String getId() {
-    //Flow id schema: user@flow_name(or id)/version/run_id
-    String id = super.getId();
-    if (id == null) {
-      StringBuilder sb = new StringBuilder();
-      sb.append(getInfo().get(USER_INFO_KEY).toString());
-      sb.append('@');
-      sb.append(getInfo().get(FLOW_NAME_INFO_KEY).toString());
-      sb.append('/');
-      sb.append(getInfo().get(FLOW_VERSION_INFO_KEY).toString());
-      sb.append('/');
-      sb.append(getInfo().get(FLOW_RUN_ID_INFO_KEY).toString());
-      id = sb.toString();
-      setId(id);
-    }
-    return id;
-  }
-
-  public String getUser() {
-    Object user = getInfo().get(USER_INFO_KEY);
-    return user == null ? null : user.toString();
-  }
-
-  public void setUser(String user) {
-    addInfo(USER_INFO_KEY, user);
-  }
-
-  public String getName() {
-    Object name = getInfo().get(FLOW_NAME_INFO_KEY);
-    return name == null ? null : name.toString();
-  }
-
-  public void setName(String name) {
-    addInfo(FLOW_NAME_INFO_KEY, name);
-  }
-
-  public String getVersion() {
-    Object version = getInfo().get(FLOW_VERSION_INFO_KEY);
-    return version == null ? null : version.toString();
-  }
-
-  public void setVersion(String version) {
-    addInfo(FLOW_VERSION_INFO_KEY, version);
-  }
-
-  public long getRunId() {
-    Object runId = getInfo().get(FLOW_RUN_ID_INFO_KEY);
-    return runId == null ? 0L : (Long) runId;
-  }
-
-  public void setRunId(long runId) {
-    addInfo(FLOW_RUN_ID_INFO_KEY, runId);
-  }
-}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/2e7e0f0b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/timelineservice/FlowRunEntity.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/timelineservice/FlowRunEntity.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/timelineservice/FlowRunEntity.java
new file mode 100644
index 0000000..3c3ffb4
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/timelineservice/FlowRunEntity.java
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.api.records.timelineservice;
+
+import javax.xml.bind.annotation.XmlElement;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+
+@InterfaceAudience.Public
+@InterfaceStability.Unstable
+public class FlowRunEntity extends HierarchicalTimelineEntity {
+  public static final String USER_INFO_KEY =
+      TimelineEntity.SYSTEM_INFO_KEY_PREFIX + "USER";
+  public static final String FLOW_NAME_INFO_KEY =
+      TimelineEntity.SYSTEM_INFO_KEY_PREFIX + "FLOW_NAME";
+  public static final String FLOW_VERSION_INFO_KEY =
+      TimelineEntity.SYSTEM_INFO_KEY_PREFIX +  "FLOW_VERSION";
+  public static final String FLOW_RUN_ID_INFO_KEY =
+      TimelineEntity.SYSTEM_INFO_KEY_PREFIX +  "FLOW_RUN_ID";
+  public static final String FLOW_RUN_END_TIME =
+      TimelineEntity.SYSTEM_INFO_KEY_PREFIX + "FLOW_RUN_END_TIME";
+
+  public FlowRunEntity() {
+    super(TimelineEntityType.YARN_FLOW_RUN.toString());
+    // set config to null
+    setConfigs(null);
+  }
+
+  public FlowRunEntity(TimelineEntity entity) {
+    super(entity);
+    if (!entity.getType().equals(TimelineEntityType.YARN_FLOW_RUN.toString())) {
+      throw new IllegalArgumentException("Incompatible entity type: " + getId());
+    }
+    // set config to null
+    setConfigs(null);
+  }
+
+  @XmlElement(name = "id")
+  @Override
+  public String getId() {
+    //Flow id schema: user@flow_name(or id)/run_id
+    String id = super.getId();
+    if (id == null) {
+      StringBuilder sb = new StringBuilder();
+      sb.append(getInfo().get(USER_INFO_KEY).toString());
+      sb.append('@');
+      sb.append(getInfo().get(FLOW_NAME_INFO_KEY).toString());
+      sb.append('/');
+      sb.append(getInfo().get(FLOW_RUN_ID_INFO_KEY).toString());
+      id = sb.toString();
+      setId(id);
+    }
+    return id;
+  }
+
+  public String getUser() {
+    return (String)getInfo().get(USER_INFO_KEY);
+  }
+
+  public void setUser(String user) {
+    addInfo(USER_INFO_KEY, user);
+  }
+
+  public String getName() {
+    return (String)getInfo().get(FLOW_NAME_INFO_KEY);
+  }
+
+  public void setName(String name) {
+    addInfo(FLOW_NAME_INFO_KEY, name);
+  }
+
+  public String getVersion() {
+    return (String)getInfo().get(FLOW_VERSION_INFO_KEY);
+  }
+
+  public void setVersion(String version) {
+    addInfo(FLOW_VERSION_INFO_KEY, version);
+  }
+
+  public long getRunId() {
+    Object runId = getInfo().get(FLOW_RUN_ID_INFO_KEY);
+    return runId == null ? 0L : (Long) runId;
+  }
+
+  public void setRunId(long runId) {
+    addInfo(FLOW_RUN_ID_INFO_KEY, runId);
+  }
+
+  public long getStartTime() {
+    return getCreatedTime();
+  }
+
+  public void setStartTime(long startTime) {
+    setCreatedTime(startTime);
+  }
+
+  public long getMaxEndTime() {
+    Object time = getInfo().get(FLOW_RUN_END_TIME);
+    return time == null ? 0L : (Long)time;
+  }
+
+  public void setMaxEndTime(long endTime) {
+    addInfo(FLOW_RUN_END_TIME, endTime);
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/2e7e0f0b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/timelineservice/TimelineEntityType.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/timelineservice/TimelineEntityType.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/timelineservice/TimelineEntityType.java
index 6062fe1..ba32e20 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/timelineservice/TimelineEntityType.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/timelineservice/TimelineEntityType.java
@@ -24,21 +24,25 @@ import org.apache.hadoop.classification.InterfaceStability;
 @InterfaceStability.Unstable
 public enum TimelineEntityType {
   YARN_CLUSTER,
-  YARN_FLOW,
+  YARN_FLOW_RUN,
   YARN_APPLICATION,
   YARN_APPLICATION_ATTEMPT,
   YARN_CONTAINER,
   YARN_USER,
-  YARN_QUEUE;
+  YARN_QUEUE,
+  YARN_FLOW_ACTIVITY;
 
+  /**
+   * Whether the input type can be a parent of this entity.
+   */
   public boolean isParent(TimelineEntityType type) {
     switch (this) {
       case YARN_CLUSTER:
         return false;
-      case YARN_FLOW:
-        return YARN_FLOW == type || YARN_CLUSTER == type;
+      case YARN_FLOW_RUN:
+        return YARN_FLOW_RUN == type || YARN_CLUSTER == type;
       case YARN_APPLICATION:
-        return YARN_FLOW == type || YARN_CLUSTER == type;
+        return YARN_FLOW_RUN == type || YARN_CLUSTER == type;
       case YARN_APPLICATION_ATTEMPT:
         return YARN_APPLICATION == type;
       case YARN_CONTAINER:
@@ -50,12 +54,15 @@ public enum TimelineEntityType {
     }
   }
 
+  /**
+   * Whether the input type can be a child of this entity.
+   */
   public boolean isChild(TimelineEntityType type) {
     switch (this) {
       case YARN_CLUSTER:
-        return YARN_FLOW == type || YARN_APPLICATION == type;
-      case YARN_FLOW:
-        return YARN_FLOW == type || YARN_APPLICATION == type;
+        return YARN_FLOW_RUN == type || YARN_APPLICATION == type;
+      case YARN_FLOW_RUN:
+        return YARN_FLOW_RUN == type || YARN_APPLICATION == type;
       case YARN_APPLICATION:
         return YARN_APPLICATION_ATTEMPT == type;
       case YARN_APPLICATION_ATTEMPT:
@@ -68,4 +75,12 @@ public enum TimelineEntityType {
         return false;
     }
   }
+
+  /**
+   * Whether the type of this entity matches the type indicated by the input
+   * argument.
+   */
+  public boolean matches(String typeString) {
+    return toString().equals(typeString);
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/2e7e0f0b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/api/records/timelineservice/TestTimelineServiceRecords.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/api/records/timelineservice/TestTimelineServiceRecords.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/api/records/timelineservice/TestTimelineServiceRecords.java
index 78943e0..7c9acf2 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/api/records/timelineservice/TestTimelineServiceRecords.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/api/records/timelineservice/TestTimelineServiceRecords.java
@@ -182,14 +182,14 @@ public class TestTimelineServiceRecords {
     ClusterEntity cluster = new ClusterEntity();
     cluster.setId("test cluster id");
 
-    FlowEntity flow1 = new FlowEntity();
+    FlowRunEntity flow1 = new FlowRunEntity();
     //flow1.setId("test flow id 1");
     flow1.setUser(user.getId());
     flow1.setName("test flow name 1");
     flow1.setVersion("test flow version 1");
     flow1.setRunId(1L);
 
-    FlowEntity flow2 = new FlowEntity();
+    FlowRunEntity flow2 = new FlowRunEntity();
     //flow2.setId("test flow run id 2");
     flow2.setUser(user.getId());
     flow2.setName("test flow name 2");
@@ -213,19 +213,19 @@ public class TestTimelineServiceRecords {
         ApplicationAttemptId.newInstance(
             ApplicationId.newInstance(0, 1), 1), 1).toString());
 
-    cluster.addChild(TimelineEntityType.YARN_FLOW.toString(), flow1.getId());
+    cluster.addChild(TimelineEntityType.YARN_FLOW_RUN.toString(), flow1.getId());
     flow1
         .setParent(TimelineEntityType.YARN_CLUSTER.toString(), cluster.getId());
-    flow1.addChild(TimelineEntityType.YARN_FLOW.toString(), flow2.getId());
-    flow2.setParent(TimelineEntityType.YARN_FLOW.toString(), flow1.getId());
+    flow1.addChild(TimelineEntityType.YARN_FLOW_RUN.toString(), flow2.getId());
+    flow2.setParent(TimelineEntityType.YARN_FLOW_RUN.toString(), flow1.getId());
     flow2.addChild(TimelineEntityType.YARN_APPLICATION.toString(), app1.getId());
     flow2.addChild(TimelineEntityType.YARN_APPLICATION.toString(), app2.getId());
-    app1.setParent(TimelineEntityType.YARN_FLOW.toString(), flow2.getId());
+    app1.setParent(TimelineEntityType.YARN_FLOW_RUN.toString(), flow2.getId());
     app1.addChild(TimelineEntityType.YARN_APPLICATION_ATTEMPT.toString(),
         appAttempt.getId());
     appAttempt
         .setParent(TimelineEntityType.YARN_APPLICATION.toString(), app1.getId());
-    app2.setParent(TimelineEntityType.YARN_FLOW.toString(), flow2.getId());
+    app2.setParent(TimelineEntityType.YARN_FLOW_RUN.toString(), flow2.getId());
     appAttempt.addChild(TimelineEntityType.YARN_CONTAINER.toString(),
         container.getId());
     container.setParent(TimelineEntityType.YARN_APPLICATION_ATTEMPT.toString(),

http://git-wip-us.apache.org/repos/asf/hadoop/blob/2e7e0f0b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/TestTimelineServiceClientIntegration.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/TestTimelineServiceClientIntegration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/TestTimelineServiceClientIntegration.java
index 69031a2..5672759 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/TestTimelineServiceClientIntegration.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/TestTimelineServiceClientIntegration.java
@@ -107,7 +107,7 @@ public class TestTimelineServiceClientIntegration {
       client.start();
       ClusterEntity cluster = new ClusterEntity();
       cluster.setId(YarnConfiguration.DEFAULT_RM_CLUSTER_ID);
-      FlowEntity flow = new FlowEntity();
+      FlowRunEntity flow = new FlowRunEntity();
       flow.setUser(UserGroupInformation.getCurrentUser().getShortUserName());
       flow.setName("test_flow_name");
       flow.setVersion("test_flow_version");

http://git-wip-us.apache.org/repos/asf/hadoop/blob/2e7e0f0b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/TimelineCollectorWebService.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/TimelineCollectorWebService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/TimelineCollectorWebService.java
index 42fa365..8f595e2 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/TimelineCollectorWebService.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/TimelineCollectorWebService.java
@@ -47,7 +47,7 @@ import org.apache.hadoop.yarn.api.records.timelineservice.ApplicationAttemptEnti
 import org.apache.hadoop.yarn.api.records.timelineservice.ApplicationEntity;
 import org.apache.hadoop.yarn.api.records.timelineservice.ClusterEntity;
 import org.apache.hadoop.yarn.api.records.timelineservice.ContainerEntity;
-import org.apache.hadoop.yarn.api.records.timelineservice.FlowEntity;
+import org.apache.hadoop.yarn.api.records.timelineservice.FlowRunEntity;
 import org.apache.hadoop.yarn.api.records.timelineservice.QueueEntity;
 import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntities;
 import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity;
@@ -205,8 +205,8 @@ public class TimelineCollectorWebService {
           case YARN_CLUSTER:
             entitiesToReturn.addEntity(new ClusterEntity(entity));
             break;
-          case YARN_FLOW:
-            entitiesToReturn.addEntity(new FlowEntity(entity));
+          case YARN_FLOW_RUN:
+            entitiesToReturn.addEntity(new FlowRunEntity(entity));
             break;
           case YARN_APPLICATION:
             entitiesToReturn.addEntity(new ApplicationEntity(entity));

http://git-wip-us.apache.org/repos/asf/hadoop/blob/2e7e0f0b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/ApplicationEntityReader.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/ApplicationEntityReader.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/ApplicationEntityReader.java
new file mode 100644
index 0000000..dfbc31d
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/ApplicationEntityReader.java
@@ -0,0 +1,229 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.timelineservice.storage;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.EnumSet;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.client.Connection;
+import org.apache.hadoop.hbase.client.Get;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity;
+import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntityType;
+import org.apache.hadoop.yarn.server.timelineservice.storage.TimelineReader.Field;
+import org.apache.hadoop.yarn.server.timelineservice.storage.application.ApplicationColumn;
+import org.apache.hadoop.yarn.server.timelineservice.storage.application.ApplicationColumnPrefix;
+import org.apache.hadoop.yarn.server.timelineservice.storage.application.ApplicationRowKey;
+import org.apache.hadoop.yarn.server.timelineservice.storage.application.ApplicationTable;
+import org.apache.hadoop.yarn.server.timelineservice.storage.common.BaseTable;
+import org.apache.hadoop.yarn.server.timelineservice.storage.common.TimelineReaderUtils;
+
+/**
+ * Timeline entity reader for application entities that are stored in the
+ * application table.
+ */
+class ApplicationEntityReader extends GenericEntityReader {
+  private static final ApplicationTable APPLICATION_TABLE =
+      new ApplicationTable();
+
+  public ApplicationEntityReader(String userId, String clusterId,
+      String flowId, Long flowRunId, String appId, String entityType,
+      Long limit, Long createdTimeBegin, Long createdTimeEnd,
+      Long modifiedTimeBegin, Long modifiedTimeEnd,
+      Map<String, Set<String>> relatesTo, Map<String, Set<String>> isRelatedTo,
+      Map<String, Object> infoFilters, Map<String, String> configFilters,
+      Set<String> metricFilters, Set<String> eventFilters,
+      EnumSet<Field> fieldsToRetrieve) {
+    super(userId, clusterId, flowId, flowRunId, appId, entityType, limit,
+        createdTimeBegin, createdTimeEnd, modifiedTimeBegin, modifiedTimeEnd,
+        relatesTo, isRelatedTo, infoFilters, configFilters, metricFilters,
+        eventFilters, fieldsToRetrieve);
+  }
+
+  public ApplicationEntityReader(String userId, String clusterId,
+      String flowId, Long flowRunId, String appId, String entityType,
+      String entityId, EnumSet<Field> fieldsToRetrieve) {
+    super(userId, clusterId, flowId, flowRunId, appId, entityType, entityId,
+        fieldsToRetrieve);
+  }
+
+  /**
+   * Uses the {@link ApplicationTable}.
+   */
+  protected BaseTable<?> getTable() {
+    return APPLICATION_TABLE;
+  }
+
+  @Override
+  protected Result getResult(Configuration hbaseConf, Connection conn)
+      throws IOException {
+    byte[] rowKey =
+        ApplicationRowKey.getRowKey(clusterId, userId, flowId, flowRunId,
+            appId);
+    Get get = new Get(rowKey);
+    get.setMaxVersions(Integer.MAX_VALUE);
+    return table.getResult(hbaseConf, conn, get);
+  }
+
+  @Override
+  protected Iterable<Result> getResults(Configuration hbaseConf,
+      Connection conn) throws IOException {
+    // If getEntities() is called for an application, there can be at most
+    // one entity. If the entity passes the filter, it is returned. Otherwise,
+    // an empty set is returned.
+    byte[] rowKey = ApplicationRowKey.getRowKey(clusterId, userId, flowId,
+        flowRunId, appId);
+    Get get = new Get(rowKey);
+    get.setMaxVersions(Integer.MAX_VALUE);
+    Result result = table.getResult(hbaseConf, conn, get);
+    TimelineEntity entity = parseEntity(result);
+    Set<Result> set;
+    if (entity != null) {
+      set = Collections.singleton(result);
+    } else {
+      set = Collections.emptySet();
+    }
+    return set;
+  }
+
+  @Override
+  protected TimelineEntity parseEntity(Result result) throws IOException {
+    if (result == null || result.isEmpty()) {
+      return null;
+    }
+    TimelineEntity entity = new TimelineEntity();
+    entity.setType(TimelineEntityType.YARN_APPLICATION.toString());
+    String entityId = ApplicationColumn.ID.readResult(result).toString();
+    entity.setId(entityId);
+
+    // fetch created time
+    Number createdTime =
+        (Number)ApplicationColumn.CREATED_TIME.readResult(result);
+    entity.setCreatedTime(createdTime.longValue());
+    if (!singleEntityRead && (entity.getCreatedTime() < createdTimeBegin ||
+        entity.getCreatedTime() > createdTimeEnd)) {
+      return null;
+    }
+
+    // fetch modified time
+    Number modifiedTime =
+        (Number)ApplicationColumn.MODIFIED_TIME.readResult(result);
+    entity.setModifiedTime(modifiedTime.longValue());
+    if (!singleEntityRead && (entity.getModifiedTime() < modifiedTimeBegin ||
+        entity.getModifiedTime() > modifiedTimeEnd)) {
+      return null;
+    }
+
+    // fetch is related to entities
+    boolean checkIsRelatedTo = isRelatedTo != null && isRelatedTo.size() > 0;
+    if (fieldsToRetrieve.contains(Field.ALL) ||
+        fieldsToRetrieve.contains(Field.IS_RELATED_TO) || checkIsRelatedTo) {
+      readRelationship(entity, result, ApplicationColumnPrefix.IS_RELATED_TO,
+          true);
+      if (checkIsRelatedTo && !TimelineReaderUtils.matchRelations(
+          entity.getIsRelatedToEntities(), isRelatedTo)) {
+        return null;
+      }
+      if (!fieldsToRetrieve.contains(Field.ALL) &&
+          !fieldsToRetrieve.contains(Field.IS_RELATED_TO)) {
+        entity.getIsRelatedToEntities().clear();
+      }
+    }
+
+    // fetch relates to entities
+    boolean checkRelatesTo = relatesTo != null && relatesTo.size() > 0;
+    if (fieldsToRetrieve.contains(Field.ALL) ||
+        fieldsToRetrieve.contains(Field.RELATES_TO) || checkRelatesTo) {
+      readRelationship(entity, result, ApplicationColumnPrefix.RELATES_TO,
+          false);
+      if (checkRelatesTo && !TimelineReaderUtils.matchRelations(
+          entity.getRelatesToEntities(), relatesTo)) {
+        return null;
+      }
+      if (!fieldsToRetrieve.contains(Field.ALL) &&
+          !fieldsToRetrieve.contains(Field.RELATES_TO)) {
+        entity.getRelatesToEntities().clear();
+      }
+    }
+
+    // fetch info
+    boolean checkInfo = infoFilters != null && infoFilters.size() > 0;
+    if (fieldsToRetrieve.contains(Field.ALL) ||
+        fieldsToRetrieve.contains(Field.INFO) || checkInfo) {
+      readKeyValuePairs(entity, result, ApplicationColumnPrefix.INFO, false);
+      if (checkInfo &&
+          !TimelineReaderUtils.matchFilters(entity.getInfo(), infoFilters)) {
+        return null;
+      }
+      if (!fieldsToRetrieve.contains(Field.ALL) &&
+          !fieldsToRetrieve.contains(Field.INFO)) {
+        entity.getInfo().clear();
+      }
+    }
+
+    // fetch configs
+    boolean checkConfigs = configFilters != null && configFilters.size() > 0;
+    if (fieldsToRetrieve.contains(Field.ALL) ||
+        fieldsToRetrieve.contains(Field.CONFIGS) || checkConfigs) {
+      readKeyValuePairs(entity, result, ApplicationColumnPrefix.CONFIG, true);
+      if (checkConfigs && !TimelineReaderUtils.matchFilters(
+          entity.getConfigs(), configFilters)) {
+        return null;
+      }
+      if (!fieldsToRetrieve.contains(Field.ALL) &&
+          !fieldsToRetrieve.contains(Field.CONFIGS)) {
+        entity.getConfigs().clear();
+      }
+    }
+
+    // fetch events
+    boolean checkEvents = eventFilters != null && eventFilters.size() > 0;
+    if (fieldsToRetrieve.contains(Field.ALL) ||
+        fieldsToRetrieve.contains(Field.EVENTS) || checkEvents) {
+      readEvents(entity, result, true);
+      if (checkEvents && !TimelineReaderUtils.matchEventFilters(
+          entity.getEvents(), eventFilters)) {
+        return null;
+      }
+      if (!fieldsToRetrieve.contains(Field.ALL) &&
+          !fieldsToRetrieve.contains(Field.EVENTS)) {
+        entity.getEvents().clear();
+      }
+    }
+
+    // fetch metrics
+    boolean checkMetrics = metricFilters != null && metricFilters.size() > 0;
+    if (fieldsToRetrieve.contains(Field.ALL) ||
+        fieldsToRetrieve.contains(Field.METRICS) || checkMetrics) {
+      readMetrics(entity, result, ApplicationColumnPrefix.METRIC);
+      if (checkMetrics && !TimelineReaderUtils.matchMetricFilters(
+          entity.getMetrics(), metricFilters)) {
+        return null;
+      }
+      if (!fieldsToRetrieve.contains(Field.ALL) &&
+          !fieldsToRetrieve.contains(Field.METRICS)) {
+        entity.getMetrics().clear();
+      }
+    }
+    return entity;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/2e7e0f0b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/FlowActivityEntityReader.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/FlowActivityEntityReader.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/FlowActivityEntityReader.java
new file mode 100644
index 0000000..d5ece2e
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/FlowActivityEntityReader.java
@@ -0,0 +1,168 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.timelineservice.storage;
+
+import java.io.IOException;
+import java.util.EnumSet;
+import java.util.Map;
+import java.util.NavigableSet;
+import java.util.Set;
+import java.util.TreeSet;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.client.Connection;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.filter.PageFilter;
+import org.apache.hadoop.yarn.api.records.timelineservice.FlowActivityEntity;
+import org.apache.hadoop.yarn.api.records.timelineservice.FlowRunEntity;
+import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity;
+import org.apache.hadoop.yarn.server.timelineservice.storage.TimelineReader.Field;
+import org.apache.hadoop.yarn.server.timelineservice.storage.common.BaseTable;
+import org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowActivityColumnPrefix;
+import org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowActivityRowKey;
+import org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowActivityTable;
+
+import com.google.common.base.Preconditions;
+
+/**
+ * Timeline entity reader for flow activity entities that are stored in the
+ * flow activity table.
+ */
+class FlowActivityEntityReader extends TimelineEntityReader {
+  private static final FlowActivityTable FLOW_ACTIVITY_TABLE =
+      new FlowActivityTable();
+
+  public FlowActivityEntityReader(String userId, String clusterId,
+      String flowId, Long flowRunId, String appId, String entityType,
+      Long limit, Long createdTimeBegin, Long createdTimeEnd,
+      Long modifiedTimeBegin, Long modifiedTimeEnd,
+      Map<String, Set<String>> relatesTo, Map<String, Set<String>> isRelatedTo,
+      Map<String, Object> infoFilters, Map<String, String> configFilters,
+      Set<String> metricFilters, Set<String> eventFilters,
+      EnumSet<Field> fieldsToRetrieve) {
+    super(userId, clusterId, flowId, flowRunId, appId, entityType, limit,
+        createdTimeBegin, createdTimeEnd, modifiedTimeBegin, modifiedTimeEnd,
+        relatesTo, isRelatedTo, infoFilters, configFilters, metricFilters,
+        eventFilters, fieldsToRetrieve);
+  }
+
+  public FlowActivityEntityReader(String userId, String clusterId,
+      String flowId, Long flowRunId, String appId, String entityType,
+      String entityId, EnumSet<Field> fieldsToRetrieve) {
+    super(userId, clusterId, flowId, flowRunId, appId, entityType, entityId,
+        fieldsToRetrieve);
+  }
+
+  /**
+   * Uses the {@link FlowActivityTable}.
+   */
+  @Override
+  protected BaseTable<?> getTable() {
+    return FLOW_ACTIVITY_TABLE;
+  }
+
+  /**
+   * Since this is strictly sorted by the row key, it is sufficient to collect
+   * the first results as specified by the limit.
+   */
+  @Override
+  public Set<TimelineEntity> readEntities(Configuration hbaseConf,
+      Connection conn) throws IOException {
+    validateParams();
+    augmentParams(hbaseConf, conn);
+
+    NavigableSet<TimelineEntity> entities = new TreeSet<>();
+    Iterable<Result> results = getResults(hbaseConf, conn);
+    for (Result result : results) {
+      TimelineEntity entity = parseEntity(result);
+      if (entity == null) {
+        continue;
+      }
+      entities.add(entity);
+      if (entities.size() == limit) {
+        break;
+      }
+    }
+    return entities;
+  }
+
+  @Override
+  protected void validateParams() {
+    Preconditions.checkNotNull(clusterId, "clusterId shouldn't be null");
+  }
+
+  @Override
+  protected void augmentParams(Configuration hbaseConf, Connection conn)
+      throws IOException {
+    if (limit == null || limit < 0) {
+      limit = TimelineReader.DEFAULT_LIMIT;
+    }
+  }
+
+  @Override
+  protected Result getResult(Configuration hbaseConf, Connection conn)
+      throws IOException {
+    throw new UnsupportedOperationException(
+        "we don't support a single entity query");
+  }
+
+  @Override
+  protected Iterable<Result> getResults(Configuration hbaseConf,
+      Connection conn) throws IOException {
+    Scan scan = new Scan();
+    scan.setRowPrefixFilter(FlowActivityRowKey.getRowKeyPrefix(clusterId));
+    // use the page filter to limit the result to the page size
+    // the scanner may still return more than the limit; therefore we need to
+    // read the right number as we iterate
+    scan.setFilter(new PageFilter(limit));
+    return table.getResultScanner(hbaseConf, conn, scan);
+  }
+
+  @Override
+  protected TimelineEntity parseEntity(Result result) throws IOException {
+    FlowActivityRowKey rowKey = FlowActivityRowKey.parseRowKey(result.getRow());
+
+    long time = rowKey.getDayTimestamp();
+    String user = rowKey.getUserId();
+    String flowName = rowKey.getFlowId();
+
+    FlowActivityEntity flowActivity =
+        new FlowActivityEntity(clusterId, time, user, flowName);
+    // set the id
+    flowActivity.setId(flowActivity.getId());
+    // get the list of run ids along with the version that are associated with
+    // this flow on this day
+    Map<String, Object> runIdsMap =
+        FlowActivityColumnPrefix.RUN_ID.readResults(result);
+    for (Map.Entry<String, Object> e : runIdsMap.entrySet()) {
+      Long runId = Long.valueOf(e.getKey());
+      String version = (String)e.getValue();
+      FlowRunEntity flowRun = new FlowRunEntity();
+      flowRun.setUser(user);
+      flowRun.setName(flowName);
+      flowRun.setRunId(runId);
+      flowRun.setVersion(version);
+      // set the id
+      flowRun.setId(flowRun.getId());
+      flowActivity.addFlowRun(flowRun);
+    }
+
+    return flowActivity;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/2e7e0f0b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/FlowRunEntityReader.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/FlowRunEntityReader.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/FlowRunEntityReader.java
new file mode 100644
index 0000000..ced795d
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/FlowRunEntityReader.java
@@ -0,0 +1,136 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.timelineservice.storage;
+
+import java.io.IOException;
+import java.util.EnumSet;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.client.Connection;
+import org.apache.hadoop.hbase.client.Get;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.yarn.api.records.timelineservice.FlowRunEntity;
+import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity;
+import org.apache.hadoop.yarn.server.timelineservice.storage.TimelineReader.Field;
+import org.apache.hadoop.yarn.server.timelineservice.storage.common.BaseTable;
+import org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowRunColumn;
+import org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowRunColumnPrefix;
+import org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowRunRowKey;
+import org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowRunTable;
+
+import com.google.common.base.Preconditions;
+
+/**
+ * Timeline entity reader for flow run entities that are stored in the flow run
+ * table.
+ */
+class FlowRunEntityReader extends TimelineEntityReader {
+  private static final FlowRunTable FLOW_RUN_TABLE = new FlowRunTable();
+
+  public FlowRunEntityReader(String userId, String clusterId,
+      String flowId, Long flowRunId, String appId, String entityType,
+      Long limit, Long createdTimeBegin, Long createdTimeEnd,
+      Long modifiedTimeBegin, Long modifiedTimeEnd,
+      Map<String, Set<String>> relatesTo, Map<String, Set<String>> isRelatedTo,
+      Map<String, Object> infoFilters, Map<String, String> configFilters,
+      Set<String> metricFilters, Set<String> eventFilters,
+      EnumSet<Field> fieldsToRetrieve) {
+    super(userId, clusterId, flowId, flowRunId, appId, entityType, limit,
+        createdTimeBegin, createdTimeEnd, modifiedTimeBegin, modifiedTimeEnd,
+        relatesTo, isRelatedTo, infoFilters, configFilters, metricFilters,
+        eventFilters, fieldsToRetrieve);
+  }
+
+  public FlowRunEntityReader(String userId, String clusterId,
+      String flowId, Long flowRunId, String appId, String entityType,
+      String entityId, EnumSet<Field> fieldsToRetrieve) {
+    super(userId, clusterId, flowId, flowRunId, appId, entityType, entityId,
+        fieldsToRetrieve);
+  }
+
+  /**
+   * Uses the {@link FlowRunTable}.
+   */
+  @Override
+  protected BaseTable<?> getTable() {
+    return FLOW_RUN_TABLE;
+  }
+
+  @Override
+  protected void validateParams() {
+    Preconditions.checkNotNull(clusterId, "clusterId shouldn't be null");
+    Preconditions.checkNotNull(userId, "userId shouldn't be null");
+    Preconditions.checkNotNull(flowId, "flowId shouldn't be null");
+    Preconditions.checkNotNull(flowRunId, "flowRunId shouldn't be null");
+  }
+
+  @Override
+  protected void augmentParams(Configuration hbaseConf, Connection conn) {
+  }
+
+  @Override
+  protected Result getResult(Configuration hbaseConf, Connection conn)
+      throws IOException {
+    byte[] rowKey =
+        FlowRunRowKey.getRowKey(clusterId, userId, flowId, flowRunId);
+    Get get = new Get(rowKey);
+    get.setMaxVersions(Integer.MAX_VALUE);
+    return table.getResult(hbaseConf, conn, get);
+  }
+
+  @Override
+  protected Iterable<Result> getResults(Configuration hbaseConf,
+      Connection conn) throws IOException {
+    throw new UnsupportedOperationException(
+        "multiple entity query is not supported");
+  }
+
+  @Override
+  protected TimelineEntity parseEntity(Result result) throws IOException {
+    FlowRunEntity flowRun = new FlowRunEntity();
+    flowRun.setUser(userId);
+    flowRun.setName(flowId);
+    flowRun.setRunId(flowRunId);
+
+    // read the start time
+    Long startTime = (Long)FlowRunColumn.MIN_START_TIME.readResult(result);
+    if (startTime != null) {
+      flowRun.setStartTime(startTime);
+    }
+    // read the end time if available
+    Long endTime = (Long)FlowRunColumn.MAX_END_TIME.readResult(result);
+    if (endTime != null) {
+      flowRun.setMaxEndTime(endTime);
+    }
+
+    // read the flow version
+    String version = (String)FlowRunColumn.FLOW_VERSION.readResult(result);
+    if (version != null) {
+      flowRun.setVersion(version);
+    }
+
+    // read metrics
+    readMetrics(flowRun, result, FlowRunColumnPrefix.METRIC);
+
+    // set the id
+    flowRun.setId(flowRun.getId());
+    return flowRun;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/2e7e0f0b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/GenericEntityReader.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/GenericEntityReader.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/GenericEntityReader.java
new file mode 100644
index 0000000..466914b
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/GenericEntityReader.java
@@ -0,0 +1,389 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.timelineservice.storage;
+
+import java.io.IOException;
+import java.util.EnumSet;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.client.Connection;
+import org.apache.hadoop.hbase.client.Get;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity;
+import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent;
+import org.apache.hadoop.yarn.server.timelineservice.storage.TimelineReader.Field;
+import org.apache.hadoop.yarn.server.timelineservice.storage.application.ApplicationColumnPrefix;
+import org.apache.hadoop.yarn.server.timelineservice.storage.application.ApplicationTable;
+import org.apache.hadoop.yarn.server.timelineservice.storage.apptoflow.AppToFlowColumn;
+import org.apache.hadoop.yarn.server.timelineservice.storage.apptoflow.AppToFlowRowKey;
+import org.apache.hadoop.yarn.server.timelineservice.storage.apptoflow.AppToFlowTable;
+import org.apache.hadoop.yarn.server.timelineservice.storage.common.BaseTable;
+import org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnPrefix;
+import org.apache.hadoop.yarn.server.timelineservice.storage.common.Separator;
+import org.apache.hadoop.yarn.server.timelineservice.storage.common.TimelineReaderUtils;
+import org.apache.hadoop.yarn.server.timelineservice.storage.common.TimelineWriterUtils;
+import org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityColumn;
+import org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityColumnPrefix;
+import org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityRowKey;
+import org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityTable;
+
+import com.google.common.base.Preconditions;
+
+/**
+ * Timeline entity reader for generic entities that are stored in the entity
+ * table.
+ */
+class GenericEntityReader extends TimelineEntityReader {
+  private static final EntityTable ENTITY_TABLE = new EntityTable();
+  private static final Log LOG = LogFactory.getLog(GenericEntityReader.class);
+
+  private static final long DEFAULT_BEGIN_TIME = 0L;
+  private static final long DEFAULT_END_TIME = Long.MAX_VALUE;
+
+  /**
+   * Used to look up the flow context.
+   */
+  private final AppToFlowTable appToFlowTable = new AppToFlowTable();
+
+  public GenericEntityReader(String userId, String clusterId,
+      String flowId, Long flowRunId, String appId, String entityType,
+      Long limit, Long createdTimeBegin, Long createdTimeEnd,
+      Long modifiedTimeBegin, Long modifiedTimeEnd,
+      Map<String, Set<String>> relatesTo, Map<String, Set<String>> isRelatedTo,
+      Map<String, Object> infoFilters, Map<String, String> configFilters,
+      Set<String> metricFilters, Set<String> eventFilters,
+      EnumSet<Field> fieldsToRetrieve) {
+    super(userId, clusterId, flowId, flowRunId, appId, entityType, limit,
+        createdTimeBegin, createdTimeEnd, modifiedTimeBegin, modifiedTimeEnd,
+        relatesTo, isRelatedTo, infoFilters, configFilters, metricFilters,
+        eventFilters, fieldsToRetrieve);
+  }
+
+  public GenericEntityReader(String userId, String clusterId,
+      String flowId, Long flowRunId, String appId, String entityType,
+      String entityId, EnumSet<Field> fieldsToRetrieve) {
+    super(userId, clusterId, flowId, flowRunId, appId, entityType, entityId,
+        fieldsToRetrieve);
+  }
+
+  /**
+   * Uses the {@link EntityTable}.
+   */
+  protected BaseTable<?> getTable() {
+    return ENTITY_TABLE;
+  }
+
+  private FlowContext lookupFlowContext(String clusterId, String appId,
+      Configuration hbaseConf, Connection conn) throws IOException {
+    byte[] rowKey = AppToFlowRowKey.getRowKey(clusterId, appId);
+    Get get = new Get(rowKey);
+    Result result = appToFlowTable.getResult(hbaseConf, conn, get);
+    if (result != null && !result.isEmpty()) {
+      return new FlowContext(
+          AppToFlowColumn.FLOW_ID.readResult(result).toString(),
+          ((Number)AppToFlowColumn.FLOW_RUN_ID.readResult(result)).longValue());
+    } else {
+       throw new IOException(
+           "Unable to find the context flow ID and flow run ID for clusterId=" +
+           clusterId + ", appId=" + appId);
+    }
+  }
+
+  private static class FlowContext {
+    private final String flowId;
+    private final Long flowRunId;
+    public FlowContext(String flowId, Long flowRunId) {
+      this.flowId = flowId;
+      this.flowRunId = flowRunId;
+    }
+  }
+
+  @Override
+  protected void validateParams() {
+    Preconditions.checkNotNull(userId, "userId shouldn't be null");
+    Preconditions.checkNotNull(clusterId, "clusterId shouldn't be null");
+    Preconditions.checkNotNull(appId, "appId shouldn't be null");
+    Preconditions.checkNotNull(entityType, "entityType shouldn't be null");
+    if (singleEntityRead) {
+      Preconditions.checkNotNull(entityId, "entityId shouldn't be null");
+    }
+  }
+
+  @Override
+  protected void augmentParams(Configuration hbaseConf, Connection conn)
+      throws IOException {
+    // In reality both should be null or neither should be null
+    if (flowId == null || flowRunId == null) {
+      FlowContext context =
+          lookupFlowContext(clusterId, appId, hbaseConf, conn);
+      flowId = context.flowId;
+      flowRunId = context.flowRunId;
+    }
+    if (fieldsToRetrieve == null) {
+      fieldsToRetrieve = EnumSet.noneOf(Field.class);
+    }
+    if (!singleEntityRead) {
+      if (limit == null || limit < 0) {
+        limit = TimelineReader.DEFAULT_LIMIT;
+      }
+      if (createdTimeBegin == null) {
+        createdTimeBegin = DEFAULT_BEGIN_TIME;
+      }
+      if (createdTimeEnd == null) {
+        createdTimeEnd = DEFAULT_END_TIME;
+      }
+      if (modifiedTimeBegin == null) {
+        modifiedTimeBegin = DEFAULT_BEGIN_TIME;
+      }
+      if (modifiedTimeEnd == null) {
+        modifiedTimeEnd = DEFAULT_END_TIME;
+      }
+    }
+  }
+
+  @Override
+  protected Result getResult(Configuration hbaseConf, Connection conn)
+      throws IOException {
+    byte[] rowKey =
+        EntityRowKey.getRowKey(clusterId, userId, flowId, flowRunId, appId,
+            entityType, entityId);
+    Get get = new Get(rowKey);
+    get.setMaxVersions(Integer.MAX_VALUE);
+    return table.getResult(hbaseConf, conn, get);
+  }
+
+  @Override
+  protected Iterable<Result> getResults(Configuration hbaseConf,
+      Connection conn) throws IOException {
+    // Scan through part of the table to find the entities belong to one app
+    // and one type
+    Scan scan = new Scan();
+    scan.setRowPrefixFilter(EntityRowKey.getRowKeyPrefix(
+        clusterId, userId, flowId, flowRunId, appId, entityType));
+    scan.setMaxVersions(Integer.MAX_VALUE);
+    return table.getResultScanner(hbaseConf, conn, scan);
+  }
+
+  @Override
+  protected TimelineEntity parseEntity(Result result) throws IOException {
+    if (result == null || result.isEmpty()) {
+      return null;
+    }
+    TimelineEntity entity = new TimelineEntity();
+    String entityType = EntityColumn.TYPE.readResult(result).toString();
+    entity.setType(entityType);
+    String entityId = EntityColumn.ID.readResult(result).toString();
+    entity.setId(entityId);
+
+    // fetch created time
+    Number createdTime = (Number)EntityColumn.CREATED_TIME.readResult(result);
+    entity.setCreatedTime(createdTime.longValue());
+    if (!singleEntityRead && (entity.getCreatedTime() < createdTimeBegin ||
+        entity.getCreatedTime() > createdTimeEnd)) {
+      return null;
+    }
+
+    // fetch modified time
+    Number modifiedTime = (Number)EntityColumn.MODIFIED_TIME.readResult(result);
+    entity.setModifiedTime(modifiedTime.longValue());
+    if (!singleEntityRead && (entity.getModifiedTime() < modifiedTimeBegin ||
+        entity.getModifiedTime() > modifiedTimeEnd)) {
+      return null;
+    }
+
+    // fetch is related to entities
+    boolean checkIsRelatedTo = isRelatedTo != null && isRelatedTo.size() > 0;
+    if (fieldsToRetrieve.contains(Field.ALL) ||
+        fieldsToRetrieve.contains(Field.IS_RELATED_TO) || checkIsRelatedTo) {
+      readRelationship(entity, result, EntityColumnPrefix.IS_RELATED_TO, true);
+      if (checkIsRelatedTo && !TimelineReaderUtils.matchRelations(
+          entity.getIsRelatedToEntities(), isRelatedTo)) {
+        return null;
+      }
+      if (!fieldsToRetrieve.contains(Field.ALL) &&
+          !fieldsToRetrieve.contains(Field.IS_RELATED_TO)) {
+        entity.getIsRelatedToEntities().clear();
+      }
+    }
+
+    // fetch relates to entities
+    boolean checkRelatesTo = relatesTo != null && relatesTo.size() > 0;
+    if (fieldsToRetrieve.contains(Field.ALL) ||
+        fieldsToRetrieve.contains(Field.RELATES_TO) || checkRelatesTo) {
+      readRelationship(entity, result, EntityColumnPrefix.RELATES_TO, false);
+      if (checkRelatesTo && !TimelineReaderUtils.matchRelations(
+          entity.getRelatesToEntities(), relatesTo)) {
+        return null;
+      }
+      if (!fieldsToRetrieve.contains(Field.ALL) &&
+          !fieldsToRetrieve.contains(Field.RELATES_TO)) {
+        entity.getRelatesToEntities().clear();
+      }
+    }
+
+    // fetch info
+    boolean checkInfo = infoFilters != null && infoFilters.size() > 0;
+    if (fieldsToRetrieve.contains(Field.ALL) ||
+        fieldsToRetrieve.contains(Field.INFO) || checkInfo) {
+      readKeyValuePairs(entity, result, EntityColumnPrefix.INFO, false);
+      if (checkInfo &&
+          !TimelineReaderUtils.matchFilters(entity.getInfo(), infoFilters)) {
+        return null;
+      }
+      if (!fieldsToRetrieve.contains(Field.ALL) &&
+          !fieldsToRetrieve.contains(Field.INFO)) {
+        entity.getInfo().clear();
+      }
+    }
+
+    // fetch configs
+    boolean checkConfigs = configFilters != null && configFilters.size() > 0;
+    if (fieldsToRetrieve.contains(Field.ALL) ||
+        fieldsToRetrieve.contains(Field.CONFIGS) || checkConfigs) {
+      readKeyValuePairs(entity, result, EntityColumnPrefix.CONFIG, true);
+      if (checkConfigs && !TimelineReaderUtils.matchFilters(
+          entity.getConfigs(), configFilters)) {
+        return null;
+      }
+      if (!fieldsToRetrieve.contains(Field.ALL) &&
+          !fieldsToRetrieve.contains(Field.CONFIGS)) {
+        entity.getConfigs().clear();
+      }
+    }
+
+    // fetch events
+    boolean checkEvents = eventFilters != null && eventFilters.size() > 0;
+    if (fieldsToRetrieve.contains(Field.ALL) ||
+        fieldsToRetrieve.contains(Field.EVENTS) || checkEvents) {
+      readEvents(entity, result, false);
+      if (checkEvents && !TimelineReaderUtils.matchEventFilters(
+          entity.getEvents(), eventFilters)) {
+        return null;
+      }
+      if (!fieldsToRetrieve.contains(Field.ALL) &&
+          !fieldsToRetrieve.contains(Field.EVENTS)) {
+        entity.getEvents().clear();
+      }
+    }
+
+    // fetch metrics
+    boolean checkMetrics = metricFilters != null && metricFilters.size() > 0;
+    if (fieldsToRetrieve.contains(Field.ALL) ||
+        fieldsToRetrieve.contains(Field.METRICS) || checkMetrics) {
+      readMetrics(entity, result, EntityColumnPrefix.METRIC);
+      if (checkMetrics && !TimelineReaderUtils.matchMetricFilters(
+          entity.getMetrics(), metricFilters)) {
+        return null;
+      }
+      if (!fieldsToRetrieve.contains(Field.ALL) &&
+          !fieldsToRetrieve.contains(Field.METRICS)) {
+        entity.getMetrics().clear();
+      }
+    }
+    return entity;
+  }
+
+  /**
+   * Helper method for reading relationship.
+   */
+  protected <T> void readRelationship(
+      TimelineEntity entity, Result result, ColumnPrefix<T> prefix,
+      boolean isRelatedTo) throws IOException {
+    // isRelatedTo and relatesTo are of type Map<String, Set<String>>
+    Map<String, Object> columns = prefix.readResults(result);
+    for (Map.Entry<String, Object> column : columns.entrySet()) {
+      for (String id : Separator.VALUES.splitEncoded(
+          column.getValue().toString())) {
+        if (isRelatedTo) {
+          entity.addIsRelatedToEntity(column.getKey(), id);
+        } else {
+          entity.addRelatesToEntity(column.getKey(), id);
+        }
+      }
+    }
+  }
+
+  /**
+   * Helper method for reading key-value pairs for either info or config.
+   */
+  protected <T> void readKeyValuePairs(
+      TimelineEntity entity, Result result, ColumnPrefix<T> prefix,
+      boolean isConfig) throws IOException {
+    // info and configuration are of type Map<String, Object or String>
+    Map<String, Object> columns = prefix.readResults(result);
+    if (isConfig) {
+      for (Map.Entry<String, Object> column : columns.entrySet()) {
+        entity.addConfig(column.getKey(), column.getValue().toString());
+      }
+    } else {
+      entity.addInfo(columns);
+    }
+  }
+
+  /**
+   * Read events from the entity table or the application table. The column name
+   * is of the form "eventId=timestamp=infoKey" where "infoKey" may be omitted
+   * if there is no info associated with the event.
+   *
+   * See {@link EntityTable} and {@link ApplicationTable} for a more detailed
+   * schema description.
+   */
+  protected void readEvents(TimelineEntity entity, Result result,
+      boolean isApplication) throws IOException {
+    Map<String, TimelineEvent> eventsMap = new HashMap<>();
+    Map<?, Object> eventsResult = isApplication ?
+        ApplicationColumnPrefix.EVENT.
+            readResultsHavingCompoundColumnQualifiers(result) :
+        EntityColumnPrefix.EVENT.
+            readResultsHavingCompoundColumnQualifiers(result);
+    for (Map.Entry<?, Object> eventResult : eventsResult.entrySet()) {
+      byte[][] karr = (byte[][])eventResult.getKey();
+      // the column name is of the form "eventId=timestamp=infoKey"
+      if (karr.length == 3) {
+        String id = Bytes.toString(karr[0]);
+        long ts = TimelineWriterUtils.invert(Bytes.toLong(karr[1]));
+        String key = Separator.VALUES.joinEncoded(id, Long.toString(ts));
+        TimelineEvent event = eventsMap.get(key);
+        if (event == null) {
+          event = new TimelineEvent();
+          event.setId(id);
+          event.setTimestamp(ts);
+          eventsMap.put(key, event);
+        }
+        // handle empty info
+        String infoKey = karr[2].length == 0 ? null : Bytes.toString(karr[2]);
+        if (infoKey != null) {
+          event.addInfo(infoKey, eventResult.getValue());
+        }
+      } else {
+        LOG.warn("incorrectly formatted column name: it will be discarded");
+        continue;
+      }
+    }
+    Set<TimelineEvent> eventsSet = new HashSet<>(eventsMap.values());
+    entity.addEvents(eventsSet);
+  }
+}