You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by sj...@apache.org on 2016/03/18 02:22:28 UTC

[2/2] hadoop git commit: YARN-4062. Add the flush and compaction functionality via coprocessors and scanners for flow run table (Vrushali C via sjlee)

YARN-4062. Add the flush and compaction functionality via coprocessors and scanners for flow run table (Vrushali C via sjlee)


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/bc698197
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/bc698197
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/bc698197

Branch: refs/heads/YARN-2928
Commit: bc698197cde0f40e6e85a9fb1a11f1f92952e91e
Parents: c6f4c51
Author: Sangjin Lee <sj...@apache.org>
Authored: Thu Mar 17 18:22:04 2016 -0700
Committer: Sangjin Lee <sj...@apache.org>
Committed: Thu Mar 17 18:22:04 2016 -0700

----------------------------------------------------------------------
 hadoop-yarn-project/CHANGES.txt                 |   3 +
 .../hadoop/yarn/conf/YarnConfiguration.java     |  16 +
 .../src/main/resources/yarn-default.xml         |  10 +
 .../storage/HBaseTimelineWriterImpl.java        |   5 +-
 .../storage/common/TimelineStorageUtils.java    |  55 ++
 .../storage/common/TimestampGenerator.java      |  13 +-
 .../storage/flow/AggregationOperation.java      |  17 +-
 .../storage/flow/FlowRunColumn.java             |   4 +-
 .../storage/flow/FlowRunColumnPrefix.java       |   2 +-
 .../storage/flow/FlowRunCoprocessor.java        |  70 +-
 .../storage/flow/FlowRunRowKey.java             |  16 +
 .../storage/flow/FlowScanner.java               | 269 ++++++--
 .../storage/flow/FlowScannerOperation.java      |  46 ++
 .../storage/flow/TestFlowDataGenerator.java     | 178 +++++-
 .../storage/flow/TestHBaseStorageFlowRun.java   | 112 +++-
 .../flow/TestHBaseStorageFlowRunCompaction.java | 635 +++++++++++++++++++
 16 files changed, 1365 insertions(+), 86 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/bc698197/hadoop-yarn-project/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt
index 4b7fd2c..762e43c 100644
--- a/hadoop-yarn-project/CHANGES.txt
+++ b/hadoop-yarn-project/CHANGES.txt
@@ -127,6 +127,9 @@ Branch YARN-2928: Timeline Server Next Generation: Phase 1
     YARN-4179. [reader implementation] support flow activity queries based on
     time (Varun Saxena via sjlee)
 
+    YARN-4062. Add the flush and compaction functionality via coprocessors and
+    scanners for flow run table (Vrushali C via sjlee)
+
   IMPROVEMENTS
 
     YARN-4224. Support fetching entities by UID and change the REST interface 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/bc698197/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
index 6ac6fb9..863b5a1 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
@@ -1757,6 +1757,22 @@ public class YarnConfiguration extends Configuration {
   public static final int
       DEFAULT_TIMELINE_SERVICE_WRITER_FLUSH_INTERVAL_SECONDS = 60;
 
+  /**
+   * The name for setting that controls how long the final value of
+   * a metric of a completed app is retained before merging
+   * into the flow sum.
+   */
+  public static final String APP_FINAL_VALUE_RETENTION_THRESHOLD =
+      TIMELINE_SERVICE_PREFIX
+      + "coprocessor.app-final-value-retention-milliseconds";
+
+  /**
+   * The setting that controls how long the final value of a metric
+   * of a completed app is retained before merging into the flow sum.
+   */
+  public static final long DEFAULT_APP_FINAL_VALUE_RETENTION_THRESHOLD = 3 * 24
+      * 60 * 60 * 1000L;
+
   public static final String ATS_APP_COLLECTOR_LINGER_PERIOD_IN_MS =
       TIMELINE_SERVICE_PREFIX + "app-collector.linger-period.ms";
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/bc698197/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
index 2cbc836..31b897b 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
@@ -2067,6 +2067,7 @@
     <value>604800</value>
   </property>
 
+  <!-- Timeline Service v2 Configuration -->
   <property>
     <description>The setting that controls how often the timeline collector
     flushes the timeline writer.</description>
@@ -2088,6 +2089,15 @@
     <name>yarn.timeline-service.timeline-client.number-of-async-entities-to-merge</name>
     <value>10</value>
   </property>
+
+  <property>
+    <description> The setting that controls how long the final value
+    of a metric of a completed app is retained before merging into
+    the flow sum.</description>
+    <name>yarn.timeline-service.coprocessor.app-final-value-retention-milliseconds</name>
+    <value>259200000</value>
+  </property>
+
   <!--  Shared Cache Configuration -->
 
   <property>

http://git-wip-us.apache.org/repos/asf/hadoop/blob/bc698197/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/HBaseTimelineWriterImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/HBaseTimelineWriterImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/HBaseTimelineWriterImpl.java
index 1afe878..b75007d 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/HBaseTimelineWriterImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/HBaseTimelineWriterImpl.java
@@ -261,7 +261,8 @@ public class HBaseTimelineWriterImpl extends AbstractService implements
       byte[] rowKey = FlowRunRowKey.getRowKey(clusterId, userId, flowName,
           flowRunId);
       storeFlowMetrics(rowKey, metrics,
-          AggregationCompactionDimension.APPLICATION_ID.getAttribute(appId));
+          AggregationCompactionDimension.APPLICATION_ID.getAttribute(appId),
+          AggregationOperation.SUM.getAttribute());
     }
   }
 
@@ -500,4 +501,4 @@ public class HBaseTimelineWriterImpl extends AbstractService implements
     super.serviceStop();
   }
 
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/bc698197/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/TimelineStorageUtils.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/TimelineStorageUtils.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/TimelineStorageUtils.java
index 605dbe7..b5fc214 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/TimelineStorageUtils.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/TimelineStorageUtils.java
@@ -24,9 +24,13 @@ import java.util.Map;
 import java.util.Map.Entry;
 import java.util.Set;
 import java.util.SortedSet;
+import java.io.IOException;
 
 import org.apache.hadoop.classification.InterfaceAudience.Public;
 import org.apache.hadoop.classification.InterfaceStability.Unstable;
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.CellUtil;
+import org.apache.hadoop.hbase.KeyValue;
 import org.apache.hadoop.hbase.Tag;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
@@ -475,4 +479,55 @@ public final class TimelineStorageUtils {
     return (obj instanceof Short) || (obj instanceof Integer) ||
         (obj instanceof Long);
   }
+
+  /**
+   * creates a new cell based on the input cell but with the new value.
+   *
+   * @param origCell Original cell
+   * @param newValue new cell value
+   * @return cell
+   * @throws IOException while creating new cell.
+   */
+  public static Cell createNewCell(Cell origCell, byte[] newValue)
+      throws IOException {
+    return CellUtil.createCell(CellUtil.cloneRow(origCell),
+        CellUtil.cloneFamily(origCell), CellUtil.cloneQualifier(origCell),
+        origCell.getTimestamp(), KeyValue.Type.Put.getCode(), newValue);
+  }
+
+  /**
+   * creates a cell with the given inputs.
+   *
+   * @param row row of the cell to be created
+   * @param family column family name of the new cell
+   * @param qualifier qualifier for the new cell
+   * @param ts timestamp of the new cell
+   * @param newValue value of the new cell
+   * @param tags tags in the new cell
+   * @return cell
+   * @throws IOException while creating the cell.
+   */
+  public static Cell createNewCell(byte[] row, byte[] family, byte[] qualifier,
+      long ts, byte[] newValue, byte[] tags) throws IOException {
+    return CellUtil.createCell(row, family, qualifier, ts, KeyValue.Type.Put,
+        newValue, tags);
+  }
+
+  /**
+   * returns app id from the list of tags.
+   *
+   * @param tags cell tags to be looked into
+   * @return App Id as the AggregationCompactionDimension
+   */
+  public static String getAggregationCompactionDimension(List<Tag> tags) {
+    String appId = null;
+    for (Tag t : tags) {
+      if (AggregationCompactionDimension.APPLICATION_ID.getTagType() == t
+          .getType()) {
+        appId = Bytes.toString(t.getValue());
+        return appId;
+      }
+    }
+    return appId;
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/bc698197/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/TimestampGenerator.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/TimestampGenerator.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/TimestampGenerator.java
index 7238efa..288046c 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/TimestampGenerator.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/TimestampGenerator.java
@@ -33,7 +33,7 @@ public class TimestampGenerator {
    * if this is changed, then reading cell timestamps written with older
    * multiplier value will not work
    */
-  public static final long TS_MULTIPLIER = 1000L;
+  public static final long TS_MULTIPLIER = 1000000L;
 
   private final AtomicLong lastTimestamp = new AtomicLong();
 
@@ -74,13 +74,14 @@ public class TimestampGenerator {
   }
 
   /**
-   * returns a timestamp multiplied with TS_MULTIPLIER and last few digits of
-   * application id
+   * Returns a timestamp multiplied with TS_MULTIPLIER and last few digits of
+   * application id.
    *
    * Unlikely scenario of generating a timestamp that is a duplicate: If more
-   * than a 1000 concurrent apps are running in one flow run AND write to same
-   * column at the same time, then say appId of 1001 will overlap with appId of
-   * 001 and there may be collisions for that flow run's specific column.
+   * than a 1M concurrent apps are running in one flow run AND write to same
+   * column at the same time, then say appId of 1M and 1 will overlap
+   * with appId of 001 and there may be collisions for that flow run's
+   * specific column.
    *
    * @param incomingTS Timestamp to be converted.
    * @param appId Application Id.

http://git-wip-us.apache.org/repos/asf/hadoop/blob/bc698197/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/AggregationOperation.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/AggregationOperation.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/AggregationOperation.java
index 6240e81..40cdd2c 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/AggregationOperation.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/AggregationOperation.java
@@ -21,19 +21,19 @@ import org.apache.hadoop.hbase.util.Bytes;
 
 /**
  * Identifies the attributes to be set for puts into the {@link FlowRunTable}.
- * The numbers used for tagType are prime numbers
+ * The numbers used for tagType are prime numbers.
  */
 public enum AggregationOperation {
 
   /**
    * When the flow was started.
    */
-  MIN((byte) 71),
+  GLOBAL_MIN((byte) 71),
 
   /**
    * When it ended.
    */
-  MAX((byte) 73),
+  GLOBAL_MAX((byte) 73),
 
   /**
    * The metrics of the flow.
@@ -46,9 +46,16 @@ public enum AggregationOperation {
   SUM_FINAL((byte) 83),
 
   /**
-   * compact.
+   * Min value as per the latest timestamp
+   * seen for a given app.
    */
-  COMPACT((byte) 89);
+  LATEST_MIN((byte) 89),
+
+  /**
+   * Max value as per the latest timestamp
+   * seen for a given app.
+   */
+  LATEST_MAX((byte) 97);
 
   private byte tagType;
   private byte[] inBytes;

http://git-wip-us.apache.org/repos/asf/hadoop/blob/bc698197/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunColumn.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunColumn.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunColumn.java
index 148a37f..d50bb16 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunColumn.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunColumn.java
@@ -41,14 +41,14 @@ public enum FlowRunColumn implements Column<FlowRunTable> {
    * application start times.
    */
   MIN_START_TIME(FlowRunColumnFamily.INFO, "min_start_time",
-      AggregationOperation.MIN, LongConverter.getInstance()),
+      AggregationOperation.GLOBAL_MIN, LongConverter.getInstance()),
 
   /**
    * When the flow ended. This is the maximum of currently known application end
    * times.
    */
   MAX_END_TIME(FlowRunColumnFamily.INFO, "max_end_time",
-      AggregationOperation.MAX, LongConverter.getInstance()),
+      AggregationOperation.GLOBAL_MAX, LongConverter.getInstance()),
 
   /**
    * The version of the flow that this flow belongs to.

http://git-wip-us.apache.org/repos/asf/hadoop/blob/bc698197/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunColumnPrefix.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunColumnPrefix.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunColumnPrefix.java
index 3d7c40e..fa94aae 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunColumnPrefix.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunColumnPrefix.java
@@ -40,7 +40,7 @@ public enum FlowRunColumnPrefix implements ColumnPrefix<FlowRunTable> {
   /**
    * To store flow run info values.
    */
-  METRIC(FlowRunColumnFamily.INFO, "m", AggregationOperation.SUM,
+  METRIC(FlowRunColumnFamily.INFO, "m", null,
       LongConverter.getInstance());
 
   private final ColumnHelper<FlowRunTable> column;

http://git-wip-us.apache.org/repos/asf/hadoop/blob/bc698197/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunCoprocessor.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunCoprocessor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunCoprocessor.java
index 9698f06..450640a 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunCoprocessor.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunCoprocessor.java
@@ -40,7 +40,12 @@ import org.apache.hadoop.hbase.coprocessor.BaseRegionObserver;
 import org.apache.hadoop.hbase.coprocessor.ObserverContext;
 import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
 import org.apache.hadoop.hbase.regionserver.HRegion;
+import org.apache.hadoop.hbase.regionserver.InternalScanner;
 import org.apache.hadoop.hbase.regionserver.RegionScanner;
+import org.apache.hadoop.hbase.regionserver.ScanType;
+import org.apache.hadoop.hbase.regionserver.Store;
+import org.apache.hadoop.hbase.regionserver.StoreFile;
+import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest;
 import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.yarn.server.timelineservice.storage.common.TimelineStorageUtils;
@@ -51,7 +56,6 @@ import org.apache.hadoop.yarn.server.timelineservice.storage.common.TimestampGen
  */
 public class FlowRunCoprocessor extends BaseRegionObserver {
 
-  @SuppressWarnings("unused")
   private static final Log LOG = LogFactory.getLog(FlowRunCoprocessor.class);
 
   private HRegion region;
@@ -160,8 +164,8 @@ public class FlowRunCoprocessor extends BaseRegionObserver {
     scan.setMaxVersions();
     RegionScanner scanner = null;
     try {
-      scanner = new FlowScanner(region, scan.getBatch(),
-          region.getScanner(scan));
+      scanner = new FlowScanner(e.getEnvironment(), scan.getBatch(),
+          region.getScanner(scan), FlowScannerOperation.READ);
       scanner.next(results);
       e.bypass();
     } finally {
@@ -209,6 +213,64 @@ public class FlowRunCoprocessor extends BaseRegionObserver {
   public RegionScanner postScannerOpen(
       ObserverContext<RegionCoprocessorEnvironment> e, Scan scan,
       RegionScanner scanner) throws IOException {
-    return new FlowScanner(region, scan.getBatch(), scanner);
+    return new FlowScanner(e.getEnvironment(), scan.getBatch(),
+        scanner, FlowScannerOperation.READ);
+  }
+
+  @Override
+  public InternalScanner preFlush(
+      ObserverContext<RegionCoprocessorEnvironment> c, Store store,
+      InternalScanner scanner) throws IOException {
+    if (LOG.isDebugEnabled()) {
+      if (store != null) {
+        LOG.debug("preFlush store = " + store.getColumnFamilyName()
+            + " flushableSize=" + store.getFlushableSize()
+            + " flushedCellsCount=" + store.getFlushedCellsCount()
+            + " compactedCellsCount=" + store.getCompactedCellsCount()
+            + " majorCompactedCellsCount="
+            + store.getMajorCompactedCellsCount() + " memstoreFlushSize="
+            + store.getMemstoreFlushSize() + " memstoreSize="
+            + store.getMemStoreSize() + " size=" + store.getSize()
+            + " storeFilesCount=" + store.getStorefilesCount());
+      }
+    }
+    return new FlowScanner(c.getEnvironment(), -1, scanner,
+        FlowScannerOperation.FLUSH);
+  }
+
+  @Override
+  public void postFlush(ObserverContext<RegionCoprocessorEnvironment> c,
+      Store store, StoreFile resultFile) {
+    if (LOG.isDebugEnabled()) {
+      if (store != null) {
+        LOG.debug("postFlush store = " + store.getColumnFamilyName()
+            + " flushableSize=" + store.getFlushableSize()
+            + " flushedCellsCount=" + store.getFlushedCellsCount()
+            + " compactedCellsCount=" + store.getCompactedCellsCount()
+            + " majorCompactedCellsCount="
+            + store.getMajorCompactedCellsCount() + " memstoreFlushSize="
+            + store.getMemstoreFlushSize() + " memstoreSize="
+            + store.getMemStoreSize() + " size=" + store.getSize()
+            + " storeFilesCount=" + store.getStorefilesCount());
+      }
+    }
+  }
+
+  @Override
+  public InternalScanner preCompact(
+      ObserverContext<RegionCoprocessorEnvironment> e, Store store,
+      InternalScanner scanner, ScanType scanType, CompactionRequest request)
+      throws IOException {
+
+    FlowScannerOperation requestOp = FlowScannerOperation.MINOR_COMPACTION;
+    if (request != null) {
+      requestOp = (request.isMajor() ? FlowScannerOperation.MAJOR_COMPACTION
+          : FlowScannerOperation.MINOR_COMPACTION);
+      LOG.info("Compactionrequest= " + request.toString() + " "
+          + requestOp.toString() + " RegionName="
+          + e.getEnvironment().getRegion().getRegionNameAsString());
+    }
+
+    return new FlowScanner(e.getEnvironment(), -1, scanner, requestOp);
   }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/bc698197/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunRowKey.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunRowKey.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunRowKey.java
index 0585dc9..eac8f05 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunRowKey.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunRowKey.java
@@ -113,4 +113,20 @@ public class FlowRunRowKey {
         TimelineStorageUtils.invertLong(Bytes.toLong(rowKeyComponents[3]));
     return new FlowRunRowKey(clusterId, userId, flowName, flowRunId);
   }
+
+  /**
+   * returns the Flow Key as a verbose String output.
+   * @return String
+   */
+  @Override
+  public String toString() {
+    StringBuilder flowKeyStr = new StringBuilder();
+    flowKeyStr.append("{clusterId=" + clusterId);
+    flowKeyStr.append(" userId=" + userId);
+    flowKeyStr.append(" flowName=" + flowName);
+    flowKeyStr.append(" flowRunId=");
+    flowKeyStr.append(flowRunId);
+    flowKeyStr.append("}");
+    return flowKeyStr.toString();
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/bc698197/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowScanner.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowScanner.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowScanner.java
index 6fefd15..6baea37 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowScanner.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowScanner.java
@@ -29,20 +29,26 @@ import java.util.TreeSet;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.Cell;
 import org.apache.hadoop.hbase.CellUtil;
 import org.apache.hadoop.hbase.HRegionInfo;
 import org.apache.hadoop.hbase.KeyValue;
 import org.apache.hadoop.hbase.Tag;
+import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
 import org.apache.hadoop.hbase.regionserver.HRegion;
 import org.apache.hadoop.hbase.regionserver.InternalScanner;
 import org.apache.hadoop.hbase.regionserver.RegionScanner;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.Bytes.ByteArrayComparator;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.server.timelineservice.storage.common.GenericConverter;
 import org.apache.hadoop.yarn.server.timelineservice.storage.common.NumericValueConverter;
 import org.apache.hadoop.yarn.server.timelineservice.storage.common.TimelineStorageUtils;
 import org.apache.hadoop.yarn.server.timelineservice.storage.common.ValueConverter;
+import org.apache.hadoop.yarn.server.timelineservice.storage.common.TimestampGenerator;
+
+import com.google.common.annotations.VisibleForTesting;
 
 /**
  * Invoked via the coprocessor when a Get or a Scan is issued for flow run
@@ -55,23 +61,42 @@ class FlowScanner implements RegionScanner, Closeable {
 
   private static final Log LOG = LogFactory.getLog(FlowScanner.class);
 
+  /**
+   * use a special application id to represent the flow id this is needed since
+   * TimestampGenerator parses the app id to generate a cell timestamp.
+   */
+  private static final String FLOW_APP_ID = "application_00000000000_0000";
+
   private final HRegion region;
   private final InternalScanner flowRunScanner;
-  private RegionScanner regionScanner;
   private final int limit;
+  private final long appFinalValueRetentionThreshold;
+  private RegionScanner regionScanner;
   private boolean hasMore;
   private byte[] currentRow;
   private List<Cell> availableCells = new ArrayList<>();
   private int currentIndex;
+  private FlowScannerOperation action = FlowScannerOperation.READ;
 
-  FlowScanner(HRegion region, int limit, InternalScanner internalScanner) {
-    this.region = region;
+  FlowScanner(RegionCoprocessorEnvironment env, int limit,
+      InternalScanner internalScanner, FlowScannerOperation action) {
     this.limit = limit;
     this.flowRunScanner = internalScanner;
     if (internalScanner instanceof RegionScanner) {
       this.regionScanner = (RegionScanner) internalScanner;
     }
-    // TODO: note if it's compaction/flush
+    this.action = action;
+    if (env == null) {
+      this.appFinalValueRetentionThreshold =
+          YarnConfiguration.DEFAULT_APP_FINAL_VALUE_RETENTION_THRESHOLD;
+      this.region = null;
+    } else {
+      this.region = env.getRegion();
+      Configuration hbaseConf = env.getConfiguration();
+      this.appFinalValueRetentionThreshold = hbaseConf.getLong(
+          YarnConfiguration.APP_FINAL_VALUE_RETENTION_THRESHOLD,
+          YarnConfiguration.DEFAULT_APP_FINAL_VALUE_RETENTION_THRESHOLD);
+    }
   }
 
   /*
@@ -104,17 +129,6 @@ class FlowScanner implements RegionScanner, Closeable {
     return nextInternal(cells, cellLimit);
   }
 
-  private String getAggregationCompactionDimension(List<Tag> tags) {
-    String appId = null;
-    for (Tag t : tags) {
-      if (AggregationCompactionDimension.APPLICATION_ID.getTagType() == t
-          .getType()) {
-        appId = Bytes.toString(t.getValue());
-      }
-    }
-    return appId;
-  }
-
   /**
    * Get value converter associated with a column or a column prefix. If nothing
    * matches, generic converter is returned.
@@ -165,6 +179,7 @@ class FlowScanner implements RegionScanner, Closeable {
    * @return true if next row is available for the scanner, false otherwise
    * @throws IOException
    */
+  @SuppressWarnings("deprecation")
   private boolean nextInternal(List<Cell> cells, int cellLimit)
       throws IOException {
     Cell cell = null;
@@ -183,14 +198,18 @@ class FlowScanner implements RegionScanner, Closeable {
     SortedSet<Cell> currentColumnCells = new TreeSet<>(KeyValue.COMPARATOR);
     Set<String> alreadySeenAggDim = new HashSet<>();
     int addedCnt = 0;
+    long currentTimestamp = System.currentTimeMillis();
     ValueConverter converter = null;
-    while (((cell = peekAtNextCell(cellLimit)) != null)
-        && (cellLimit <= 0 || addedCnt < cellLimit)) {
+    while (cellLimit <= 0 || addedCnt < cellLimit) {
+      cell = peekAtNextCell(cellLimit);
+      if (cell == null) {
+        break;
+      }
       byte[] newColumnQualifier = CellUtil.cloneQualifier(cell);
       if (comp.compare(currentColumnQualifier, newColumnQualifier) != 0) {
         if (converter != null && isNumericConverter(converter)) {
           addedCnt += emitCells(cells, currentColumnCells, currentAggOp,
-              (NumericValueConverter)converter);
+              (NumericValueConverter)converter, currentTimestamp);
         }
         resetState(currentColumnCells, alreadySeenAggDim);
         currentColumnQualifier = newColumnQualifier;
@@ -207,8 +226,17 @@ class FlowScanner implements RegionScanner, Closeable {
       nextCell(cellLimit);
     }
     if (!currentColumnCells.isEmpty()) {
-      emitCells(cells, currentColumnCells, currentAggOp,
-          (NumericValueConverter)converter);
+      addedCnt += emitCells(cells, currentColumnCells, currentAggOp,
+          (NumericValueConverter)converter, currentTimestamp);
+      if (LOG.isDebugEnabled()) {
+        if (addedCnt > 0) {
+          LOG.debug("emitted cells. " + addedCnt + " for " + this.action
+              + " rowKey="
+              + FlowRunRowKey.parseRowKey(cells.get(0).getRow()).toString());
+        } else {
+          LOG.debug("emitted no cells for " + this.action);
+        }
+      }
     }
     return hasMore();
   }
@@ -247,7 +275,7 @@ class FlowScanner implements RegionScanner, Closeable {
     }
 
     switch (currentAggOp) {
-    case MIN:
+    case GLOBAL_MIN:
       if (currentColumnCells.size() == 0) {
         currentColumnCells.add(cell);
       } else {
@@ -260,7 +288,7 @@ class FlowScanner implements RegionScanner, Closeable {
         }
       }
       break;
-    case MAX:
+    case GLOBAL_MAX:
       if (currentColumnCells.size() == 0) {
         currentColumnCells.add(cell);
       } else {
@@ -275,16 +303,32 @@ class FlowScanner implements RegionScanner, Closeable {
       break;
     case SUM:
     case SUM_FINAL:
+      if (LOG.isTraceEnabled()) {
+        LOG.trace("In collect cells "
+            + " FlowSannerOperation="
+            + this.action
+            + " currentAggOp="
+            + currentAggOp
+            + " cell qualifier="
+            + Bytes.toString(CellUtil.cloneQualifier(cell))
+            + " cell value= "
+            + (Number) converter.decodeValue(CellUtil.cloneValue(cell))
+            + " timestamp=" + cell.getTimestamp());
+      }
+
       // only if this app has not been seen yet, add to current column cells
       List<Tag> tags = Tag.asList(cell.getTagsArray(), cell.getTagsOffset(),
           cell.getTagsLength());
-      String aggDim = getAggregationCompactionDimension(tags);
-
-      // If this agg dimension has already been seen, since they show up in
-      // sorted order, we drop the rest which are older. In other words, this
-      // cell is older than previously seen cells for that agg dim.
+      String aggDim = TimelineStorageUtils
+          .getAggregationCompactionDimension(tags);
       if (!alreadySeenAggDim.contains(aggDim)) {
-        // Not seen this agg dim, hence consider this cell in our working set
+        // if this agg dimension has already been seen,
+        // since they show up in sorted order
+        // we drop the rest which are older
+        // in other words, this cell is older than previously seen cells
+        // for that agg dim
+        // but when this agg dim is not seen,
+        // consider this cell in our working set
         currentColumnCells.add(cell);
         alreadySeenAggDim.add(aggDim);
       }
@@ -300,8 +344,8 @@ class FlowScanner implements RegionScanner, Closeable {
    * parameter.
    */
   private int emitCells(List<Cell> cells, SortedSet<Cell> currentColumnCells,
-      AggregationOperation currentAggOp, NumericValueConverter converter)
-      throws IOException {
+      AggregationOperation currentAggOp, NumericValueConverter converter,
+      long currentTimestamp) throws IOException {
     if ((currentColumnCells == null) || (currentColumnCells.size() == 0)) {
       return 0;
     }
@@ -309,17 +353,36 @@ class FlowScanner implements RegionScanner, Closeable {
       cells.addAll(currentColumnCells);
       return currentColumnCells.size();
     }
+    if (LOG.isTraceEnabled()) {
+      LOG.trace("In emitCells " + this.action + " currentColumnCells size= "
+          + currentColumnCells.size() + " currentAggOp" + currentAggOp);
+    }
 
     switch (currentAggOp) {
-    case MIN:
-    case MAX:
+    case GLOBAL_MIN:
+    case GLOBAL_MAX:
       cells.addAll(currentColumnCells);
       return currentColumnCells.size();
     case SUM:
     case SUM_FINAL:
-      Cell sumCell = processSummation(currentColumnCells, converter);
-      cells.add(sumCell);
-      return 1;
+      switch (action) {
+      case FLUSH:
+      case MINOR_COMPACTION:
+        cells.addAll(currentColumnCells);
+        return currentColumnCells.size();
+      case READ:
+        Cell sumCell = processSummation(currentColumnCells, converter);
+        cells.add(sumCell);
+        return 1;
+      case MAJOR_COMPACTION:
+        List<Cell> finalCells = processSummationMajorCompaction(
+            currentColumnCells, converter, currentTimestamp);
+        cells.addAll(finalCells);
+        return finalCells.size();
+      default:
+        cells.addAll(currentColumnCells);
+        return currentColumnCells.size();
+      }
     default:
       cells.addAll(currentColumnCells);
       return currentColumnCells.size();
@@ -349,10 +412,122 @@ class FlowScanner implements RegionScanner, Closeable {
       sum = converter.add(sum, currentValue);
     }
     byte[] sumBytes = converter.encodeValue(sum);
-    Cell sumCell = createNewCell(mostRecentCell, sumBytes);
+    Cell sumCell = TimelineStorageUtils.createNewCell(mostRecentCell, sumBytes);
     return sumCell;
   }
 
+
+  /**
+   * Returns a list of cells that contains
+   *
+   * A) the latest cells for applications that haven't finished yet
+   * B) summation
+   * for the flow, based on applications that have completed and are older than
+   * a certain time
+   *
+   * The new cell created has the timestamp of the most recent metric cell. The
+   * sum of a metric for a flow run is the summation at the point of the last
+   * metric update in that flow till that time.
+   */
+  @VisibleForTesting
+  List<Cell> processSummationMajorCompaction(
+      SortedSet<Cell> currentColumnCells, NumericValueConverter converter,
+      long currentTimestamp)
+      throws IOException {
+    Number sum = 0;
+    Number currentValue = 0;
+    long ts = 0L;
+    boolean summationDone = false;
+    List<Cell> finalCells = new ArrayList<Cell>();
+    if (currentColumnCells == null) {
+      return finalCells;
+    }
+
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("In processSummationMajorCompaction,"
+          + " will drop cells older than " + currentTimestamp
+          + " CurrentColumnCells size=" + currentColumnCells.size());
+    }
+
+    for (Cell cell : currentColumnCells) {
+      AggregationOperation cellAggOp = getCurrentAggOp(cell);
+      // if this is the existing flow sum cell
+      List<Tag> tags = Tag.asList(cell.getTagsArray(), cell.getTagsOffset(),
+          cell.getTagsLength());
+      String appId = TimelineStorageUtils
+          .getAggregationCompactionDimension(tags);
+      if (appId == FLOW_APP_ID) {
+        sum = converter.add(sum, currentValue);
+        summationDone = true;
+        if (LOG.isTraceEnabled()) {
+          LOG.trace("reading flow app id sum=" + sum);
+        }
+      } else {
+        currentValue = (Number) converter.decodeValue(CellUtil
+            .cloneValue(cell));
+        // read the timestamp truncated by the generator
+        ts =  TimestampGenerator.getTruncatedTimestamp(cell.getTimestamp());
+        if ((cellAggOp == AggregationOperation.SUM_FINAL)
+            && ((ts + this.appFinalValueRetentionThreshold)
+                < currentTimestamp)) {
+          sum = converter.add(sum, currentValue);
+          summationDone = true;
+          if (LOG.isTraceEnabled()) {
+            LOG.trace("MAJOR COMPACTION loop sum= " + sum
+                + " discarding now: " + " qualifier="
+                + Bytes.toString(CellUtil.cloneQualifier(cell)) + " value="
+                + (Number) converter.decodeValue(CellUtil.cloneValue(cell))
+                + " timestamp=" + cell.getTimestamp() + " " + this.action);
+          }
+        } else {
+          // not a final value but it's the latest cell for this app
+          // so include this cell in the list of cells to write back
+          finalCells.add(cell);
+        }
+      }
+    }
+    if (summationDone) {
+      Cell anyCell = currentColumnCells.first();
+      List<Tag> tags = new ArrayList<Tag>();
+      Tag t = new Tag(AggregationOperation.SUM_FINAL.getTagType(),
+          Bytes.toBytes(FLOW_APP_ID));
+      tags.add(t);
+      t = new Tag(AggregationCompactionDimension.APPLICATION_ID.getTagType(),
+          Bytes.toBytes(FLOW_APP_ID));
+      tags.add(t);
+      byte[] tagByteArray = Tag.fromList(tags);
+      Cell sumCell = TimelineStorageUtils.createNewCell(
+          CellUtil.cloneRow(anyCell),
+          CellUtil.cloneFamily(anyCell),
+          CellUtil.cloneQualifier(anyCell),
+          TimestampGenerator.getSupplementedTimestamp(
+              System.currentTimeMillis(), FLOW_APP_ID),
+              converter.encodeValue(sum), tagByteArray);
+      finalCells.add(sumCell);
+      if (LOG.isTraceEnabled()) {
+        LOG.trace("MAJOR COMPACTION final sum= " + sum + " for "
+            + Bytes.toString(CellUtil.cloneQualifier(sumCell))
+            + " " + this.action);
+      }
+      LOG.info("After major compaction for qualifier="
+          + Bytes.toString(CellUtil.cloneQualifier(sumCell))
+          + " with currentColumnCells.size="
+          + currentColumnCells.size()
+          + " returning finalCells.size=" + finalCells.size()
+          + " with sum=" + sum.longValue()
+          + " with cell timestamp " + sumCell.getTimestamp());
+    } else {
+      String qualifier = "";
+      LOG.info("After major compaction for qualifier=" + qualifier
+          + " with currentColumnCells.size="
+          + currentColumnCells.size()
+          + " returning finalCells.size=" + finalCells.size()
+          + " with zero sum="
+          + sum.longValue());
+    }
+    return finalCells;
+  }
+
   /**
    * Determines which cell is to be returned based on the values in each cell
    * and the comparison operation MIN or MAX.
@@ -375,7 +550,7 @@ class FlowScanner implements RegionScanner, Closeable {
       Number currentCellValue = (Number) converter.decodeValue(CellUtil
           .cloneValue(currentCell));
       switch (currentAggOp) {
-      case MIN:
+      case GLOBAL_MIN:
         if (converter.compare(
             currentCellValue, previouslyChosenCellValue) < 0) {
           // new value is minimum, hence return this cell
@@ -384,7 +559,7 @@ class FlowScanner implements RegionScanner, Closeable {
           // previously chosen value is miniumum, hence return previous min cell
           return previouslyChosenCell;
         }
-      case MAX:
+      case GLOBAL_MAX:
         if (converter.compare(
             currentCellValue, previouslyChosenCellValue) > 0) {
           // new value is max, hence return this cell
@@ -402,16 +577,13 @@ class FlowScanner implements RegionScanner, Closeable {
     }
   }
 
-  private Cell createNewCell(Cell origCell, byte[] newValue)
-      throws IOException {
-    return CellUtil.createCell(CellUtil.cloneRow(origCell),
-        CellUtil.cloneFamily(origCell), CellUtil.cloneQualifier(origCell),
-        origCell.getTimestamp(), KeyValue.Type.Put.getCode(), newValue);
-  }
-
   @Override
   public void close() throws IOException {
-    flowRunScanner.close();
+    if (flowRunScanner != null) {
+      flowRunScanner.close();
+    } else {
+      LOG.warn("scanner close called but scanner is null");
+    }
   }
 
   /**
@@ -423,8 +595,6 @@ class FlowScanner implements RegionScanner, Closeable {
 
   /**
    * Returns whether or not the underlying scanner has more rows.
-   *
-   * @return true, if there are more cells to return, false otherwise.
    */
   public boolean hasMore() {
     return currentIndex < availableCells.size() ? true : hasMore;
@@ -440,8 +610,7 @@ class FlowScanner implements RegionScanner, Closeable {
    *          fetched by the wrapped scanner
    * @return the next available cell or null if no more cells are available for
    *         the current row
-   * @throws IOException if any problem is encountered while grabbing the next
-   *     cell.
+   * @throws IOException
    */
   public Cell nextCell(int cellLimit) throws IOException {
     Cell cell = peekAtNextCell(cellLimit);

http://git-wip-us.apache.org/repos/asf/hadoop/blob/bc698197/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowScannerOperation.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowScannerOperation.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowScannerOperation.java
new file mode 100644
index 0000000..73c666f
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowScannerOperation.java
@@ -0,0 +1,46 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.timelineservice.storage.flow;
+
+
+/**
+ * Identifies the scanner operation on the {@link FlowRunTable}.
+ */
+public enum FlowScannerOperation {
+
+  /**
+   * If the scanner is opened for reading
+   * during preGet or preScan.
+   */
+  READ,
+
+  /**
+   * If the scanner is opened during preFlush.
+   */
+  FLUSH,
+
+  /**
+   * If the scanner is opened during minor Compaction.
+   */
+  MINOR_COMPACTION,
+
+  /**
+   * If the scanner is opened during major Compaction.
+   */
+  MAJOR_COMPACTION
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/bc698197/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/TestFlowDataGenerator.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/TestFlowDataGenerator.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/TestFlowDataGenerator.java
index d45df57..9793ce6 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/TestFlowDataGenerator.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/TestFlowDataGenerator.java
@@ -17,7 +17,6 @@
  */
 
 package org.apache.hadoop.yarn.server.timelineservice.storage.flow;
-
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Map;
@@ -29,17 +28,60 @@ import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent;
 import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetric;
 import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetric.Type;
 import org.apache.hadoop.yarn.server.metrics.ApplicationMetricsConstants;
+import org.apache.hadoop.conf.Configuration;
 
 /**
  * Generates the data/entities for the FlowRun and FlowActivity Tables
  */
 class TestFlowDataGenerator {
 
-  private final static String metric1 = "MAP_SLOT_MILLIS";
-  private final static String metric2 = "HDFS_BYTES_READ";
+  private static final String metric1 = "MAP_SLOT_MILLIS";
+  private static final String metric2 = "HDFS_BYTES_READ";
+  public static final long END_TS_INCR = 10000L;
+
+  static TimelineEntity getEntityMetricsApp1(long insertTs, Configuration c1) {
+    TimelineEntity entity = new TimelineEntity();
+    String id = "flowRunMetrics_test";
+    String type = TimelineEntityType.YARN_APPLICATION.toString();
+    entity.setId(id);
+    entity.setType(type);
+    long cTime = 1425016501000L;
+    entity.setCreatedTime(cTime);
 
+    // add metrics
+    Set<TimelineMetric> metrics = new HashSet<>();
+    TimelineMetric m1 = new TimelineMetric();
+    m1.setId(metric1);
+    Map<Long, Number> metricValues = new HashMap<Long, Number>();
+    long ts = insertTs;
 
-  static TimelineEntity getEntityMetricsApp1() {
+    for (int k=1; k< 100 ; k++) {
+    metricValues.put(ts - k*200000, 20L);
+    }
+    metricValues.put(ts - 80000, 40L);
+    m1.setType(Type.TIME_SERIES);
+    m1.setValues(metricValues);
+    metrics.add(m1);
+
+    TimelineMetric m2 = new TimelineMetric();
+    m2.setId(metric2);
+    metricValues = new HashMap<Long, Number>();
+    ts = System.currentTimeMillis();
+    for (int k=1; k< 100 ; k++) {
+      metricValues.put(ts - k*100000, 31L);
+    }
+
+    metricValues.put(ts - 80000, 57L);
+    m2.setType(Type.TIME_SERIES);
+    m2.setValues(metricValues);
+    metrics.add(m2);
+
+    entity.addMetrics(metrics);
+    return entity;
+  }
+
+
+  static TimelineEntity getEntityMetricsApp1Complete(long insertTs, Configuration c1) {
     TimelineEntity entity = new TimelineEntity();
     String id = "flowRunMetrics_test";
     String type = TimelineEntityType.YARN_APPLICATION.toString();
@@ -53,7 +95,48 @@ class TestFlowDataGenerator {
     TimelineMetric m1 = new TimelineMetric();
     m1.setId(metric1);
     Map<Long, Number> metricValues = new HashMap<Long, Number>();
-    long ts = System.currentTimeMillis();
+    long ts = insertTs;
+
+    metricValues.put(ts - 80000, 40L);
+    m1.setType(Type.TIME_SERIES);
+    m1.setValues(metricValues);
+    metrics.add(m1);
+
+    TimelineMetric m2 = new TimelineMetric();
+    m2.setId(metric2);
+    metricValues = new HashMap<Long, Number>();
+    ts = insertTs;
+    metricValues.put(ts - 80000, 57L);
+    m2.setType(Type.TIME_SERIES);
+    m2.setValues(metricValues);
+    metrics.add(m2);
+
+    entity.addMetrics(metrics);
+
+    TimelineEvent event = new TimelineEvent();
+    event.setId(ApplicationMetricsConstants.FINISHED_EVENT_TYPE);
+    event.setTimestamp(insertTs);
+    event.addInfo("done", "insertTs=" + insertTs);
+    entity.addEvent(event);
+    return entity;
+  }
+
+
+  static TimelineEntity getEntityMetricsApp1(long insertTs) {
+    TimelineEntity entity = new TimelineEntity();
+    String id = "flowRunMetrics_test";
+    String type = TimelineEntityType.YARN_APPLICATION.toString();
+    entity.setId(id);
+    entity.setType(type);
+    long cTime = 1425016501000L;
+    entity.setCreatedTime(cTime);
+
+    // add metrics
+    Set<TimelineMetric> metrics = new HashSet<>();
+    TimelineMetric m1 = new TimelineMetric();
+    m1.setId(metric1);
+    Map<Long, Number> metricValues = new HashMap<Long, Number>();
+    long ts = insertTs;
     metricValues.put(ts - 100000, 2L);
     metricValues.put(ts - 80000, 40L);
     m1.setType(Type.TIME_SERIES);
@@ -63,7 +146,7 @@ class TestFlowDataGenerator {
     TimelineMetric m2 = new TimelineMetric();
     m2.setId(metric2);
     metricValues = new HashMap<Long, Number>();
-    ts = System.currentTimeMillis();
+    ts = insertTs;
     metricValues.put(ts - 100000, 31L);
     metricValues.put(ts - 80000, 57L);
     m2.setType(Type.TIME_SERIES);
@@ -74,7 +157,8 @@ class TestFlowDataGenerator {
     return entity;
   }
 
-  static TimelineEntity getEntityMetricsApp2() {
+
+  static TimelineEntity getEntityMetricsApp2(long insertTs) {
     TimelineEntity entity = new TimelineEntity();
     String id = "flowRunMetrics_test";
     String type = TimelineEntityType.YARN_APPLICATION.toString();
@@ -87,7 +171,7 @@ class TestFlowDataGenerator {
     TimelineMetric m1 = new TimelineMetric();
     m1.setId(metric1);
     Map<Long, Number> metricValues = new HashMap<Long, Number>();
-    long ts = System.currentTimeMillis();
+    long ts = insertTs;
     metricValues.put(ts - 100000, 5L);
     metricValues.put(ts - 80000, 101L);
     m1.setType(Type.TIME_SERIES);
@@ -140,6 +224,55 @@ class TestFlowDataGenerator {
     return entity;
   }
 
+  static TimelineEntity getAFullEntity(long ts, long endTs) {
+    TimelineEntity entity = new TimelineEntity();
+    String id = "flowRunFullEntity";
+    String type = TimelineEntityType.YARN_APPLICATION.toString();
+    entity.setId(id);
+    entity.setType(type);
+    entity.setCreatedTime(ts);
+    // add metrics
+    Set<TimelineMetric> metrics = new HashSet<>();
+    TimelineMetric m1 = new TimelineMetric();
+    m1.setId(metric1);
+    Map<Long, Number> metricValues = new HashMap<Long, Number>();
+    metricValues.put(ts - 120000, 100000000L);
+    metricValues.put(ts - 100000, 200000000L);
+    metricValues.put(ts - 80000, 300000000L);
+    metricValues.put(ts - 60000, 400000000L);
+    metricValues.put(ts - 40000, 50000000000L);
+    metricValues.put(ts - 20000, 60000000000L);
+    m1.setType(Type.TIME_SERIES);
+    m1.setValues(metricValues);
+    metrics.add(m1);
+    TimelineMetric m2 = new TimelineMetric();
+    m2.setId(metric2);
+    metricValues = new HashMap<Long, Number>();
+    metricValues.put(ts - 900000, 31L);
+    metricValues.put(ts - 30000, 57L);
+    m2.setType(Type.TIME_SERIES);
+    m2.setValues(metricValues);
+    metrics.add(m2);
+    entity.addMetrics(metrics);
+
+    TimelineEvent event = new TimelineEvent();
+    event.setId(ApplicationMetricsConstants.CREATED_EVENT_TYPE);
+    event.setTimestamp(ts);
+    String expKey = "foo_event";
+    Object expVal = "test";
+    event.addInfo(expKey, expVal);
+    entity.addEvent(event);
+
+    event = new TimelineEvent();
+    event.setId(ApplicationMetricsConstants.FINISHED_EVENT_TYPE);
+    long expTs = ts + 21600000;// start time + 6hrs
+    event.setTimestamp(expTs);
+    event.addInfo(expKey, expVal);
+    entity.addEvent(event);
+
+    return entity;
+  }
+
   static TimelineEntity getEntityGreaterStartTime(long startTs) {
     TimelineEntity entity = new TimelineEntity();
     entity.setCreatedTime(startTs);
@@ -184,6 +317,34 @@ class TestFlowDataGenerator {
     return entity;
   }
 
+  static TimelineEntity getMinFlushEntity(long startTs) {
+    TimelineEntity entity = new TimelineEntity();
+    String id = "flowRunHelloFlushEntityMin";
+    String type = TimelineEntityType.YARN_APPLICATION.toString();
+    entity.setId(id);
+    entity.setType(type);
+    entity.setCreatedTime(startTs);
+    TimelineEvent event = new TimelineEvent();
+    event.setId(ApplicationMetricsConstants.CREATED_EVENT_TYPE);
+    event.setTimestamp(startTs);
+    entity.addEvent(event);
+    return entity;
+  }
+
+  static TimelineEntity getMaxFlushEntity(long startTs) {
+    TimelineEntity entity = new TimelineEntity();
+    String id = "flowRunHelloFlushEntityMax";
+    String type = TimelineEntityType.YARN_APPLICATION.toString();
+    entity.setId(id);
+    entity.setType(type);
+    entity.setCreatedTime(startTs);
+
+    TimelineEvent event = new TimelineEvent();
+    event.setId(ApplicationMetricsConstants.FINISHED_EVENT_TYPE);
+    event.setTimestamp(startTs + END_TS_INCR);
+    entity.addEvent(event);
+    return entity;
+  }
 
   static TimelineEntity getFlowApp1(long appCreatedTime) {
     TimelineEntity entity = new TimelineEntity();
@@ -203,5 +364,4 @@ class TestFlowDataGenerator {
 
     return entity;
   }
-
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/bc698197/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/TestHBaseStorageFlowRun.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/TestHBaseStorageFlowRun.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/TestHBaseStorageFlowRun.java
index b234bfd..f04dd48 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/TestHBaseStorageFlowRun.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/TestHBaseStorageFlowRun.java
@@ -52,8 +52,8 @@ import org.apache.hadoop.yarn.server.timelineservice.reader.filter.TimelineFilte
 import org.apache.hadoop.yarn.server.timelineservice.reader.filter.TimelinePrefixFilter;
 import org.apache.hadoop.yarn.server.timelineservice.storage.HBaseTimelineReaderImpl;
 import org.apache.hadoop.yarn.server.timelineservice.storage.HBaseTimelineWriterImpl;
-import org.apache.hadoop.yarn.server.timelineservice.storage.TimelineReader.Field;
 import org.apache.hadoop.yarn.server.timelineservice.storage.TimelineSchemaCreator;
+import org.apache.hadoop.yarn.server.timelineservice.storage.TimelineReader.Field;
 import org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnHelper;
 import org.junit.AfterClass;
 import org.junit.BeforeClass;
@@ -216,7 +216,8 @@ public class TestHBaseStorageFlowRun {
     long runid = 1002345678919L;
 
     TimelineEntities te = new TimelineEntities();
-    TimelineEntity entityApp1 = TestFlowDataGenerator.getEntityMetricsApp1();
+    TimelineEntity entityApp1 = TestFlowDataGenerator
+        .getEntityMetricsApp1(System.currentTimeMillis());
     te.addEntity(entityApp1);
 
     HBaseTimelineWriterImpl hbi = null;
@@ -228,7 +229,8 @@ public class TestHBaseStorageFlowRun {
       hbi.write(cluster, user, flow, flowVersion, runid, appName, te);
       // write another application with same metric to this flow
       te = new TimelineEntities();
-      TimelineEntity entityApp2 = TestFlowDataGenerator.getEntityMetricsApp2();
+      TimelineEntity entityApp2 = TestFlowDataGenerator
+          .getEntityMetricsApp2(System.currentTimeMillis());
       te.addEntity(entityApp2);
       appName = "application_11111111111111_2222";
       hbi.write(cluster, user, flow, flowVersion, runid, appName, te);
@@ -323,7 +325,8 @@ public class TestHBaseStorageFlowRun {
     long runid = 1002345678919L;
 
     TimelineEntities te = new TimelineEntities();
-    TimelineEntity entityApp1 = TestFlowDataGenerator.getEntityMetricsApp1();
+    TimelineEntity entityApp1 = TestFlowDataGenerator
+        .getEntityMetricsApp1(System.currentTimeMillis());
     te.addEntity(entityApp1);
 
     HBaseTimelineWriterImpl hbi = null;
@@ -335,7 +338,8 @@ public class TestHBaseStorageFlowRun {
       hbi.write(cluster, user, flow, flowVersion, runid, appName, te);
       // write another application with same metric to this flow
       te = new TimelineEntities();
-      TimelineEntity entityApp2 = TestFlowDataGenerator.getEntityMetricsApp2();
+      TimelineEntity entityApp2 = TestFlowDataGenerator
+          .getEntityMetricsApp2(System.currentTimeMillis());
       te.addEntity(entityApp2);
       appName = "application_11111111111111_2222";
       hbi.write(cluster, user, flow, flowVersion, runid, appName, te);
@@ -420,7 +424,8 @@ public class TestHBaseStorageFlowRun {
     long runid = 1002345678919L;
 
     TimelineEntities te = new TimelineEntities();
-    TimelineEntity entityApp1 = TestFlowDataGenerator.getEntityMetricsApp1();
+    TimelineEntity entityApp1 = TestFlowDataGenerator
+        .getEntityMetricsApp1(System.currentTimeMillis());
     te.addEntity(entityApp1);
 
     HBaseTimelineWriterImpl hbi = null;
@@ -432,7 +437,8 @@ public class TestHBaseStorageFlowRun {
       hbi.write(cluster, user, flow, flowVersion, runid, appName, te);
       // write another application with same metric to this flow
       te = new TimelineEntities();
-      TimelineEntity entityApp2 = TestFlowDataGenerator.getEntityMetricsApp2();
+      TimelineEntity entityApp2 = TestFlowDataGenerator
+          .getEntityMetricsApp2(System.currentTimeMillis());
       te.addEntity(entityApp2);
       appName = "application_11111111111111_2222";
       hbi.write(cluster, user, flow, flowVersion, runid, appName, te);
@@ -494,6 +500,98 @@ public class TestHBaseStorageFlowRun {
     }
   }
 
+  @Test
+  public void testWriteFlowRunFlush() throws Exception {
+    String cluster = "atestFlushFlowRun_cluster1";
+    String user = "atestFlushFlowRun__user1";
+    String flow = "atestFlushFlowRun_flow_name";
+    String flowVersion = "AF1021C19F1351";
+    long runid = 1449526652000L;
+
+    int start = 10;
+    int count = 20000;
+    int appIdSuffix = 1;
+    HBaseTimelineWriterImpl hbi = null;
+    long insertTs = 1449796654827L - count;
+    long minTS = insertTs + 1;
+    long startTs = insertTs;
+    Configuration c1 = util.getConfiguration();
+    TimelineEntities te1 = null;
+    TimelineEntity entityApp1 = null;
+    TimelineEntity entityApp2 = null;
+    try {
+      hbi = new HBaseTimelineWriterImpl(c1);
+      hbi.init(c1);
+
+      for (int i = start; i < count; i++) {
+        String appName = "application_1060350000000_" + appIdSuffix;
+        insertTs++;
+        te1 = new TimelineEntities();
+        entityApp1 = TestFlowDataGenerator.getMinFlushEntity(insertTs);
+        te1.addEntity(entityApp1);
+        entityApp2 = TestFlowDataGenerator.getMaxFlushEntity(insertTs);
+        te1.addEntity(entityApp2);
+        hbi.write(cluster, user, flow, flowVersion, runid, appName, te1);
+        Thread.sleep(1);
+
+        appName = "application_1001199480000_7" + appIdSuffix;
+        insertTs++;
+        appIdSuffix++;
+        te1 = new TimelineEntities();
+        entityApp1 = TestFlowDataGenerator.getMinFlushEntity(insertTs);
+        te1.addEntity(entityApp1);
+        entityApp2 = TestFlowDataGenerator.getMaxFlushEntity(insertTs);
+        te1.addEntity(entityApp2);
+
+        hbi.write(cluster, user, flow, flowVersion, runid, appName, te1);
+        if (i % 1000 == 0) {
+          hbi.flush();
+          checkMinMaxFlush(c1, minTS, startTs, count, cluster, user, flow,
+              runid, false);
+        }
+      }
+    } finally {
+      hbi.flush();
+      hbi.close();
+      checkMinMaxFlush(c1, minTS, startTs, count, cluster, user, flow, runid,
+          true);
+    }
+  }
+
+  private void checkMinMaxFlush(Configuration c1, long minTS, long startTs,
+      int count, String cluster, String user, String flow, long runid,
+      boolean checkMax) throws IOException {
+    Connection conn = ConnectionFactory.createConnection(c1);
+    // check in flow run table
+    Table table1 = conn.getTable(TableName
+        .valueOf(FlowRunTable.DEFAULT_TABLE_NAME));
+    // scan the table and see that we get back the right min and max
+    // timestamps
+    byte[] startRow = FlowRunRowKey.getRowKey(cluster, user, flow, runid);
+    Get g = new Get(startRow);
+    g.addColumn(FlowRunColumnFamily.INFO.getBytes(),
+        FlowRunColumn.MIN_START_TIME.getColumnQualifierBytes());
+    g.addColumn(FlowRunColumnFamily.INFO.getBytes(),
+        FlowRunColumn.MAX_END_TIME.getColumnQualifierBytes());
+
+    Result r1 = table1.get(g);
+    assertNotNull(r1);
+    assertTrue(!r1.isEmpty());
+    Map<byte[], byte[]> values = r1.getFamilyMap(FlowRunColumnFamily.INFO
+        .getBytes());
+    int start = 10;
+    assertEquals(2, r1.size());
+    long starttime = Bytes.toLong(values
+        .get(FlowRunColumn.MIN_START_TIME.getColumnQualifierBytes()));
+    assertEquals(minTS, starttime);
+    if (checkMax) {
+      assertEquals(startTs + 2 * (count - start)
+          + TestFlowDataGenerator.END_TS_INCR,
+          Bytes.toLong(values
+          .get(FlowRunColumn.MAX_END_TIME.getColumnQualifierBytes())));
+    }
+  }
+
   @AfterClass
   public static void tearDownAfterClass() throws Exception {
     util.shutdownMiniCluster();