You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by sj...@apache.org on 2016/03/18 02:22:28 UTC
[2/2] hadoop git commit: YARN-4062. Add the flush and compaction
functionality via coprocessors and scanners for flow run table (Vrushali C
via sjlee)
YARN-4062. Add the flush and compaction functionality via coprocessors and scanners for flow run table (Vrushali C via sjlee)
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/bc698197
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/bc698197
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/bc698197
Branch: refs/heads/YARN-2928
Commit: bc698197cde0f40e6e85a9fb1a11f1f92952e91e
Parents: c6f4c51
Author: Sangjin Lee <sj...@apache.org>
Authored: Thu Mar 17 18:22:04 2016 -0700
Committer: Sangjin Lee <sj...@apache.org>
Committed: Thu Mar 17 18:22:04 2016 -0700
----------------------------------------------------------------------
hadoop-yarn-project/CHANGES.txt | 3 +
.../hadoop/yarn/conf/YarnConfiguration.java | 16 +
.../src/main/resources/yarn-default.xml | 10 +
.../storage/HBaseTimelineWriterImpl.java | 5 +-
.../storage/common/TimelineStorageUtils.java | 55 ++
.../storage/common/TimestampGenerator.java | 13 +-
.../storage/flow/AggregationOperation.java | 17 +-
.../storage/flow/FlowRunColumn.java | 4 +-
.../storage/flow/FlowRunColumnPrefix.java | 2 +-
.../storage/flow/FlowRunCoprocessor.java | 70 +-
.../storage/flow/FlowRunRowKey.java | 16 +
.../storage/flow/FlowScanner.java | 269 ++++++--
.../storage/flow/FlowScannerOperation.java | 46 ++
.../storage/flow/TestFlowDataGenerator.java | 178 +++++-
.../storage/flow/TestHBaseStorageFlowRun.java | 112 +++-
.../flow/TestHBaseStorageFlowRunCompaction.java | 635 +++++++++++++++++++
16 files changed, 1365 insertions(+), 86 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/bc698197/hadoop-yarn-project/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt
index 4b7fd2c..762e43c 100644
--- a/hadoop-yarn-project/CHANGES.txt
+++ b/hadoop-yarn-project/CHANGES.txt
@@ -127,6 +127,9 @@ Branch YARN-2928: Timeline Server Next Generation: Phase 1
YARN-4179. [reader implementation] support flow activity queries based on
time (Varun Saxena via sjlee)
+ YARN-4062. Add the flush and compaction functionality via coprocessors and
+ scanners for flow run table (Vrushali C via sjlee)
+
IMPROVEMENTS
YARN-4224. Support fetching entities by UID and change the REST interface
http://git-wip-us.apache.org/repos/asf/hadoop/blob/bc698197/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
index 6ac6fb9..863b5a1 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
@@ -1757,6 +1757,22 @@ public class YarnConfiguration extends Configuration {
public static final int
DEFAULT_TIMELINE_SERVICE_WRITER_FLUSH_INTERVAL_SECONDS = 60;
+ /**
+ * The name for setting that controls how long the final value of
+ * a metric of a completed app is retained before merging
+ * into the flow sum.
+ */
+ public static final String APP_FINAL_VALUE_RETENTION_THRESHOLD =
+ TIMELINE_SERVICE_PREFIX
+ + "coprocessor.app-final-value-retention-milliseconds";
+
+ /**
+ * The setting that controls how long the final value of a metric
+ * of a completed app is retained before merging into the flow sum.
+ */
+ public static final long DEFAULT_APP_FINAL_VALUE_RETENTION_THRESHOLD = 3 * 24
+ * 60 * 60 * 1000L;
+
public static final String ATS_APP_COLLECTOR_LINGER_PERIOD_IN_MS =
TIMELINE_SERVICE_PREFIX + "app-collector.linger-period.ms";
http://git-wip-us.apache.org/repos/asf/hadoop/blob/bc698197/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
index 2cbc836..31b897b 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
@@ -2067,6 +2067,7 @@
<value>604800</value>
</property>
+ <!-- Timeline Service v2 Configuration -->
<property>
<description>The setting that controls how often the timeline collector
flushes the timeline writer.</description>
@@ -2088,6 +2089,15 @@
<name>yarn.timeline-service.timeline-client.number-of-async-entities-to-merge</name>
<value>10</value>
</property>
+
+ <property>
+ <description> The setting that controls how long the final value
+ of a metric of a completed app is retained before merging into
+ the flow sum.</description>
+ <name>yarn.timeline-service.coprocessor.app-final-value-retention-milliseconds</name>
+ <value>259200000</value>
+ </property>
+
<!-- Shared Cache Configuration -->
<property>
http://git-wip-us.apache.org/repos/asf/hadoop/blob/bc698197/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/HBaseTimelineWriterImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/HBaseTimelineWriterImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/HBaseTimelineWriterImpl.java
index 1afe878..b75007d 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/HBaseTimelineWriterImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/HBaseTimelineWriterImpl.java
@@ -261,7 +261,8 @@ public class HBaseTimelineWriterImpl extends AbstractService implements
byte[] rowKey = FlowRunRowKey.getRowKey(clusterId, userId, flowName,
flowRunId);
storeFlowMetrics(rowKey, metrics,
- AggregationCompactionDimension.APPLICATION_ID.getAttribute(appId));
+ AggregationCompactionDimension.APPLICATION_ID.getAttribute(appId),
+ AggregationOperation.SUM.getAttribute());
}
}
@@ -500,4 +501,4 @@ public class HBaseTimelineWriterImpl extends AbstractService implements
super.serviceStop();
}
-}
\ No newline at end of file
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/bc698197/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/TimelineStorageUtils.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/TimelineStorageUtils.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/TimelineStorageUtils.java
index 605dbe7..b5fc214 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/TimelineStorageUtils.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/TimelineStorageUtils.java
@@ -24,9 +24,13 @@ import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
import java.util.SortedSet;
+import java.io.IOException;
import org.apache.hadoop.classification.InterfaceAudience.Public;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.CellUtil;
+import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.Tag;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.yarn.api.records.ApplicationId;
@@ -475,4 +479,55 @@ public final class TimelineStorageUtils {
return (obj instanceof Short) || (obj instanceof Integer) ||
(obj instanceof Long);
}
+
+ /**
+ * creates a new cell based on the input cell but with the new value.
+ *
+ * @param origCell Original cell
+ * @param newValue new cell value
+ * @return cell
+ * @throws IOException while creating new cell.
+ */
+ public static Cell createNewCell(Cell origCell, byte[] newValue)
+ throws IOException {
+ return CellUtil.createCell(CellUtil.cloneRow(origCell),
+ CellUtil.cloneFamily(origCell), CellUtil.cloneQualifier(origCell),
+ origCell.getTimestamp(), KeyValue.Type.Put.getCode(), newValue);
+ }
+
+ /**
+ * creates a cell with the given inputs.
+ *
+ * @param row row of the cell to be created
+ * @param family column family name of the new cell
+ * @param qualifier qualifier for the new cell
+ * @param ts timestamp of the new cell
+ * @param newValue value of the new cell
+ * @param tags tags in the new cell
+ * @return cell
+ * @throws IOException while creating the cell.
+ */
+ public static Cell createNewCell(byte[] row, byte[] family, byte[] qualifier,
+ long ts, byte[] newValue, byte[] tags) throws IOException {
+ return CellUtil.createCell(row, family, qualifier, ts, KeyValue.Type.Put,
+ newValue, tags);
+ }
+
+ /**
+ * returns app id from the list of tags.
+ *
+ * @param tags cell tags to be looked into
+ * @return App Id as the AggregationCompactionDimension
+ */
+ public static String getAggregationCompactionDimension(List<Tag> tags) {
+ String appId = null;
+ for (Tag t : tags) {
+ if (AggregationCompactionDimension.APPLICATION_ID.getTagType() == t
+ .getType()) {
+ appId = Bytes.toString(t.getValue());
+ return appId;
+ }
+ }
+ return appId;
+ }
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/bc698197/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/TimestampGenerator.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/TimestampGenerator.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/TimestampGenerator.java
index 7238efa..288046c 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/TimestampGenerator.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/TimestampGenerator.java
@@ -33,7 +33,7 @@ public class TimestampGenerator {
* if this is changed, then reading cell timestamps written with older
* multiplier value will not work
*/
- public static final long TS_MULTIPLIER = 1000L;
+ public static final long TS_MULTIPLIER = 1000000L;
private final AtomicLong lastTimestamp = new AtomicLong();
@@ -74,13 +74,14 @@ public class TimestampGenerator {
}
/**
- * returns a timestamp multiplied with TS_MULTIPLIER and last few digits of
- * application id
+ * Returns a timestamp multiplied with TS_MULTIPLIER and last few digits of
+ * application id.
*
* Unlikely scenario of generating a timestamp that is a duplicate: If more
- * than a 1000 concurrent apps are running in one flow run AND write to same
- * column at the same time, then say appId of 1001 will overlap with appId of
- * 001 and there may be collisions for that flow run's specific column.
+ * than a 1M concurrent apps are running in one flow run AND write to same
+ * column at the same time, then say appId of 1M and 1 will overlap
+ * with appId of 001 and there may be collisions for that flow run's
+ * specific column.
*
* @param incomingTS Timestamp to be converted.
* @param appId Application Id.
http://git-wip-us.apache.org/repos/asf/hadoop/blob/bc698197/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/AggregationOperation.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/AggregationOperation.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/AggregationOperation.java
index 6240e81..40cdd2c 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/AggregationOperation.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/AggregationOperation.java
@@ -21,19 +21,19 @@ import org.apache.hadoop.hbase.util.Bytes;
/**
* Identifies the attributes to be set for puts into the {@link FlowRunTable}.
- * The numbers used for tagType are prime numbers
+ * The numbers used for tagType are prime numbers.
*/
public enum AggregationOperation {
/**
* When the flow was started.
*/
- MIN((byte) 71),
+ GLOBAL_MIN((byte) 71),
/**
* When it ended.
*/
- MAX((byte) 73),
+ GLOBAL_MAX((byte) 73),
/**
* The metrics of the flow.
@@ -46,9 +46,16 @@ public enum AggregationOperation {
SUM_FINAL((byte) 83),
/**
- * compact.
+ * Min value as per the latest timestamp
+ * seen for a given app.
*/
- COMPACT((byte) 89);
+ LATEST_MIN((byte) 89),
+
+ /**
+ * Max value as per the latest timestamp
+ * seen for a given app.
+ */
+ LATEST_MAX((byte) 97);
private byte tagType;
private byte[] inBytes;
http://git-wip-us.apache.org/repos/asf/hadoop/blob/bc698197/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunColumn.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunColumn.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunColumn.java
index 148a37f..d50bb16 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunColumn.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunColumn.java
@@ -41,14 +41,14 @@ public enum FlowRunColumn implements Column<FlowRunTable> {
* application start times.
*/
MIN_START_TIME(FlowRunColumnFamily.INFO, "min_start_time",
- AggregationOperation.MIN, LongConverter.getInstance()),
+ AggregationOperation.GLOBAL_MIN, LongConverter.getInstance()),
/**
* When the flow ended. This is the maximum of currently known application end
* times.
*/
MAX_END_TIME(FlowRunColumnFamily.INFO, "max_end_time",
- AggregationOperation.MAX, LongConverter.getInstance()),
+ AggregationOperation.GLOBAL_MAX, LongConverter.getInstance()),
/**
* The version of the flow that this flow belongs to.
http://git-wip-us.apache.org/repos/asf/hadoop/blob/bc698197/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunColumnPrefix.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunColumnPrefix.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunColumnPrefix.java
index 3d7c40e..fa94aae 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunColumnPrefix.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunColumnPrefix.java
@@ -40,7 +40,7 @@ public enum FlowRunColumnPrefix implements ColumnPrefix<FlowRunTable> {
/**
* To store flow run info values.
*/
- METRIC(FlowRunColumnFamily.INFO, "m", AggregationOperation.SUM,
+ METRIC(FlowRunColumnFamily.INFO, "m", null,
LongConverter.getInstance());
private final ColumnHelper<FlowRunTable> column;
http://git-wip-us.apache.org/repos/asf/hadoop/blob/bc698197/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunCoprocessor.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunCoprocessor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunCoprocessor.java
index 9698f06..450640a 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunCoprocessor.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunCoprocessor.java
@@ -40,7 +40,12 @@ import org.apache.hadoop.hbase.coprocessor.BaseRegionObserver;
import org.apache.hadoop.hbase.coprocessor.ObserverContext;
import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
import org.apache.hadoop.hbase.regionserver.HRegion;
+import org.apache.hadoop.hbase.regionserver.InternalScanner;
import org.apache.hadoop.hbase.regionserver.RegionScanner;
+import org.apache.hadoop.hbase.regionserver.ScanType;
+import org.apache.hadoop.hbase.regionserver.Store;
+import org.apache.hadoop.hbase.regionserver.StoreFile;
+import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest;
import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.yarn.server.timelineservice.storage.common.TimelineStorageUtils;
@@ -51,7 +56,6 @@ import org.apache.hadoop.yarn.server.timelineservice.storage.common.TimestampGen
*/
public class FlowRunCoprocessor extends BaseRegionObserver {
- @SuppressWarnings("unused")
private static final Log LOG = LogFactory.getLog(FlowRunCoprocessor.class);
private HRegion region;
@@ -160,8 +164,8 @@ public class FlowRunCoprocessor extends BaseRegionObserver {
scan.setMaxVersions();
RegionScanner scanner = null;
try {
- scanner = new FlowScanner(region, scan.getBatch(),
- region.getScanner(scan));
+ scanner = new FlowScanner(e.getEnvironment(), scan.getBatch(),
+ region.getScanner(scan), FlowScannerOperation.READ);
scanner.next(results);
e.bypass();
} finally {
@@ -209,6 +213,64 @@ public class FlowRunCoprocessor extends BaseRegionObserver {
public RegionScanner postScannerOpen(
ObserverContext<RegionCoprocessorEnvironment> e, Scan scan,
RegionScanner scanner) throws IOException {
- return new FlowScanner(region, scan.getBatch(), scanner);
+ return new FlowScanner(e.getEnvironment(), scan.getBatch(),
+ scanner, FlowScannerOperation.READ);
+ }
+
+ @Override
+ public InternalScanner preFlush(
+ ObserverContext<RegionCoprocessorEnvironment> c, Store store,
+ InternalScanner scanner) throws IOException {
+ if (LOG.isDebugEnabled()) {
+ if (store != null) {
+ LOG.debug("preFlush store = " + store.getColumnFamilyName()
+ + " flushableSize=" + store.getFlushableSize()
+ + " flushedCellsCount=" + store.getFlushedCellsCount()
+ + " compactedCellsCount=" + store.getCompactedCellsCount()
+ + " majorCompactedCellsCount="
+ + store.getMajorCompactedCellsCount() + " memstoreFlushSize="
+ + store.getMemstoreFlushSize() + " memstoreSize="
+ + store.getMemStoreSize() + " size=" + store.getSize()
+ + " storeFilesCount=" + store.getStorefilesCount());
+ }
+ }
+ return new FlowScanner(c.getEnvironment(), -1, scanner,
+ FlowScannerOperation.FLUSH);
+ }
+
+ @Override
+ public void postFlush(ObserverContext<RegionCoprocessorEnvironment> c,
+ Store store, StoreFile resultFile) {
+ if (LOG.isDebugEnabled()) {
+ if (store != null) {
+ LOG.debug("postFlush store = " + store.getColumnFamilyName()
+ + " flushableSize=" + store.getFlushableSize()
+ + " flushedCellsCount=" + store.getFlushedCellsCount()
+ + " compactedCellsCount=" + store.getCompactedCellsCount()
+ + " majorCompactedCellsCount="
+ + store.getMajorCompactedCellsCount() + " memstoreFlushSize="
+ + store.getMemstoreFlushSize() + " memstoreSize="
+ + store.getMemStoreSize() + " size=" + store.getSize()
+ + " storeFilesCount=" + store.getStorefilesCount());
+ }
+ }
+ }
+
+ @Override
+ public InternalScanner preCompact(
+ ObserverContext<RegionCoprocessorEnvironment> e, Store store,
+ InternalScanner scanner, ScanType scanType, CompactionRequest request)
+ throws IOException {
+
+ FlowScannerOperation requestOp = FlowScannerOperation.MINOR_COMPACTION;
+ if (request != null) {
+ requestOp = (request.isMajor() ? FlowScannerOperation.MAJOR_COMPACTION
+ : FlowScannerOperation.MINOR_COMPACTION);
+ LOG.info("Compactionrequest= " + request.toString() + " "
+ + requestOp.toString() + " RegionName="
+ + e.getEnvironment().getRegion().getRegionNameAsString());
+ }
+
+ return new FlowScanner(e.getEnvironment(), -1, scanner, requestOp);
}
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/bc698197/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunRowKey.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunRowKey.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunRowKey.java
index 0585dc9..eac8f05 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunRowKey.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunRowKey.java
@@ -113,4 +113,20 @@ public class FlowRunRowKey {
TimelineStorageUtils.invertLong(Bytes.toLong(rowKeyComponents[3]));
return new FlowRunRowKey(clusterId, userId, flowName, flowRunId);
}
+
+ /**
+ * returns the Flow Key as a verbose String output.
+ * @return String
+ */
+ @Override
+ public String toString() {
+ StringBuilder flowKeyStr = new StringBuilder();
+ flowKeyStr.append("{clusterId=" + clusterId);
+ flowKeyStr.append(" userId=" + userId);
+ flowKeyStr.append(" flowName=" + flowName);
+ flowKeyStr.append(" flowRunId=");
+ flowKeyStr.append(flowRunId);
+ flowKeyStr.append("}");
+ return flowKeyStr.toString();
+ }
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/bc698197/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowScanner.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowScanner.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowScanner.java
index 6fefd15..6baea37 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowScanner.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowScanner.java
@@ -29,20 +29,26 @@ import java.util.TreeSet;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.Tag;
+import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
import org.apache.hadoop.hbase.regionserver.HRegion;
import org.apache.hadoop.hbase.regionserver.InternalScanner;
import org.apache.hadoop.hbase.regionserver.RegionScanner;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.Bytes.ByteArrayComparator;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.server.timelineservice.storage.common.GenericConverter;
import org.apache.hadoop.yarn.server.timelineservice.storage.common.NumericValueConverter;
import org.apache.hadoop.yarn.server.timelineservice.storage.common.TimelineStorageUtils;
import org.apache.hadoop.yarn.server.timelineservice.storage.common.ValueConverter;
+import org.apache.hadoop.yarn.server.timelineservice.storage.common.TimestampGenerator;
+
+import com.google.common.annotations.VisibleForTesting;
/**
* Invoked via the coprocessor when a Get or a Scan is issued for flow run
@@ -55,23 +61,42 @@ class FlowScanner implements RegionScanner, Closeable {
private static final Log LOG = LogFactory.getLog(FlowScanner.class);
+ /**
+ * use a special application id to represent the flow id this is needed since
+ * TimestampGenerator parses the app id to generate a cell timestamp.
+ */
+ private static final String FLOW_APP_ID = "application_00000000000_0000";
+
private final HRegion region;
private final InternalScanner flowRunScanner;
- private RegionScanner regionScanner;
private final int limit;
+ private final long appFinalValueRetentionThreshold;
+ private RegionScanner regionScanner;
private boolean hasMore;
private byte[] currentRow;
private List<Cell> availableCells = new ArrayList<>();
private int currentIndex;
+ private FlowScannerOperation action = FlowScannerOperation.READ;
- FlowScanner(HRegion region, int limit, InternalScanner internalScanner) {
- this.region = region;
+ FlowScanner(RegionCoprocessorEnvironment env, int limit,
+ InternalScanner internalScanner, FlowScannerOperation action) {
this.limit = limit;
this.flowRunScanner = internalScanner;
if (internalScanner instanceof RegionScanner) {
this.regionScanner = (RegionScanner) internalScanner;
}
- // TODO: note if it's compaction/flush
+ this.action = action;
+ if (env == null) {
+ this.appFinalValueRetentionThreshold =
+ YarnConfiguration.DEFAULT_APP_FINAL_VALUE_RETENTION_THRESHOLD;
+ this.region = null;
+ } else {
+ this.region = env.getRegion();
+ Configuration hbaseConf = env.getConfiguration();
+ this.appFinalValueRetentionThreshold = hbaseConf.getLong(
+ YarnConfiguration.APP_FINAL_VALUE_RETENTION_THRESHOLD,
+ YarnConfiguration.DEFAULT_APP_FINAL_VALUE_RETENTION_THRESHOLD);
+ }
}
/*
@@ -104,17 +129,6 @@ class FlowScanner implements RegionScanner, Closeable {
return nextInternal(cells, cellLimit);
}
- private String getAggregationCompactionDimension(List<Tag> tags) {
- String appId = null;
- for (Tag t : tags) {
- if (AggregationCompactionDimension.APPLICATION_ID.getTagType() == t
- .getType()) {
- appId = Bytes.toString(t.getValue());
- }
- }
- return appId;
- }
-
/**
* Get value converter associated with a column or a column prefix. If nothing
* matches, generic converter is returned.
@@ -165,6 +179,7 @@ class FlowScanner implements RegionScanner, Closeable {
* @return true if next row is available for the scanner, false otherwise
* @throws IOException
*/
+ @SuppressWarnings("deprecation")
private boolean nextInternal(List<Cell> cells, int cellLimit)
throws IOException {
Cell cell = null;
@@ -183,14 +198,18 @@ class FlowScanner implements RegionScanner, Closeable {
SortedSet<Cell> currentColumnCells = new TreeSet<>(KeyValue.COMPARATOR);
Set<String> alreadySeenAggDim = new HashSet<>();
int addedCnt = 0;
+ long currentTimestamp = System.currentTimeMillis();
ValueConverter converter = null;
- while (((cell = peekAtNextCell(cellLimit)) != null)
- && (cellLimit <= 0 || addedCnt < cellLimit)) {
+ while (cellLimit <= 0 || addedCnt < cellLimit) {
+ cell = peekAtNextCell(cellLimit);
+ if (cell == null) {
+ break;
+ }
byte[] newColumnQualifier = CellUtil.cloneQualifier(cell);
if (comp.compare(currentColumnQualifier, newColumnQualifier) != 0) {
if (converter != null && isNumericConverter(converter)) {
addedCnt += emitCells(cells, currentColumnCells, currentAggOp,
- (NumericValueConverter)converter);
+ (NumericValueConverter)converter, currentTimestamp);
}
resetState(currentColumnCells, alreadySeenAggDim);
currentColumnQualifier = newColumnQualifier;
@@ -207,8 +226,17 @@ class FlowScanner implements RegionScanner, Closeable {
nextCell(cellLimit);
}
if (!currentColumnCells.isEmpty()) {
- emitCells(cells, currentColumnCells, currentAggOp,
- (NumericValueConverter)converter);
+ addedCnt += emitCells(cells, currentColumnCells, currentAggOp,
+ (NumericValueConverter)converter, currentTimestamp);
+ if (LOG.isDebugEnabled()) {
+ if (addedCnt > 0) {
+ LOG.debug("emitted cells. " + addedCnt + " for " + this.action
+ + " rowKey="
+ + FlowRunRowKey.parseRowKey(cells.get(0).getRow()).toString());
+ } else {
+ LOG.debug("emitted no cells for " + this.action);
+ }
+ }
}
return hasMore();
}
@@ -247,7 +275,7 @@ class FlowScanner implements RegionScanner, Closeable {
}
switch (currentAggOp) {
- case MIN:
+ case GLOBAL_MIN:
if (currentColumnCells.size() == 0) {
currentColumnCells.add(cell);
} else {
@@ -260,7 +288,7 @@ class FlowScanner implements RegionScanner, Closeable {
}
}
break;
- case MAX:
+ case GLOBAL_MAX:
if (currentColumnCells.size() == 0) {
currentColumnCells.add(cell);
} else {
@@ -275,16 +303,32 @@ class FlowScanner implements RegionScanner, Closeable {
break;
case SUM:
case SUM_FINAL:
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("In collect cells "
+ + " FlowSannerOperation="
+ + this.action
+ + " currentAggOp="
+ + currentAggOp
+ + " cell qualifier="
+ + Bytes.toString(CellUtil.cloneQualifier(cell))
+ + " cell value= "
+ + (Number) converter.decodeValue(CellUtil.cloneValue(cell))
+ + " timestamp=" + cell.getTimestamp());
+ }
+
// only if this app has not been seen yet, add to current column cells
List<Tag> tags = Tag.asList(cell.getTagsArray(), cell.getTagsOffset(),
cell.getTagsLength());
- String aggDim = getAggregationCompactionDimension(tags);
-
- // If this agg dimension has already been seen, since they show up in
- // sorted order, we drop the rest which are older. In other words, this
- // cell is older than previously seen cells for that agg dim.
+ String aggDim = TimelineStorageUtils
+ .getAggregationCompactionDimension(tags);
if (!alreadySeenAggDim.contains(aggDim)) {
- // Not seen this agg dim, hence consider this cell in our working set
+ // if this agg dimension has already been seen,
+ // since they show up in sorted order
+ // we drop the rest which are older
+ // in other words, this cell is older than previously seen cells
+ // for that agg dim
+ // but when this agg dim is not seen,
+ // consider this cell in our working set
currentColumnCells.add(cell);
alreadySeenAggDim.add(aggDim);
}
@@ -300,8 +344,8 @@ class FlowScanner implements RegionScanner, Closeable {
* parameter.
*/
private int emitCells(List<Cell> cells, SortedSet<Cell> currentColumnCells,
- AggregationOperation currentAggOp, NumericValueConverter converter)
- throws IOException {
+ AggregationOperation currentAggOp, NumericValueConverter converter,
+ long currentTimestamp) throws IOException {
if ((currentColumnCells == null) || (currentColumnCells.size() == 0)) {
return 0;
}
@@ -309,17 +353,36 @@ class FlowScanner implements RegionScanner, Closeable {
cells.addAll(currentColumnCells);
return currentColumnCells.size();
}
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("In emitCells " + this.action + " currentColumnCells size= "
+ + currentColumnCells.size() + " currentAggOp" + currentAggOp);
+ }
switch (currentAggOp) {
- case MIN:
- case MAX:
+ case GLOBAL_MIN:
+ case GLOBAL_MAX:
cells.addAll(currentColumnCells);
return currentColumnCells.size();
case SUM:
case SUM_FINAL:
- Cell sumCell = processSummation(currentColumnCells, converter);
- cells.add(sumCell);
- return 1;
+ switch (action) {
+ case FLUSH:
+ case MINOR_COMPACTION:
+ cells.addAll(currentColumnCells);
+ return currentColumnCells.size();
+ case READ:
+ Cell sumCell = processSummation(currentColumnCells, converter);
+ cells.add(sumCell);
+ return 1;
+ case MAJOR_COMPACTION:
+ List<Cell> finalCells = processSummationMajorCompaction(
+ currentColumnCells, converter, currentTimestamp);
+ cells.addAll(finalCells);
+ return finalCells.size();
+ default:
+ cells.addAll(currentColumnCells);
+ return currentColumnCells.size();
+ }
default:
cells.addAll(currentColumnCells);
return currentColumnCells.size();
@@ -349,10 +412,122 @@ class FlowScanner implements RegionScanner, Closeable {
sum = converter.add(sum, currentValue);
}
byte[] sumBytes = converter.encodeValue(sum);
- Cell sumCell = createNewCell(mostRecentCell, sumBytes);
+ Cell sumCell = TimelineStorageUtils.createNewCell(mostRecentCell, sumBytes);
return sumCell;
}
+
+ /**
+ * Returns a list of cells that contains
+ *
+ * A) the latest cells for applications that haven't finished yet
+ * B) summation
+ * for the flow, based on applications that have completed and are older than
+ * a certain time
+ *
+ * The new cell created has the timestamp of the most recent metric cell. The
+ * sum of a metric for a flow run is the summation at the point of the last
+ * metric update in that flow till that time.
+ */
+ @VisibleForTesting
+ List<Cell> processSummationMajorCompaction(
+ SortedSet<Cell> currentColumnCells, NumericValueConverter converter,
+ long currentTimestamp)
+ throws IOException {
+ Number sum = 0;
+ Number currentValue = 0;
+ long ts = 0L;
+ boolean summationDone = false;
+ List<Cell> finalCells = new ArrayList<Cell>();
+ if (currentColumnCells == null) {
+ return finalCells;
+ }
+
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("In processSummationMajorCompaction,"
+ + " will drop cells older than " + currentTimestamp
+ + " CurrentColumnCells size=" + currentColumnCells.size());
+ }
+
+ for (Cell cell : currentColumnCells) {
+ AggregationOperation cellAggOp = getCurrentAggOp(cell);
+ // if this is the existing flow sum cell
+ List<Tag> tags = Tag.asList(cell.getTagsArray(), cell.getTagsOffset(),
+ cell.getTagsLength());
+ String appId = TimelineStorageUtils
+ .getAggregationCompactionDimension(tags);
+ if (appId == FLOW_APP_ID) {
+ sum = converter.add(sum, currentValue);
+ summationDone = true;
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("reading flow app id sum=" + sum);
+ }
+ } else {
+ currentValue = (Number) converter.decodeValue(CellUtil
+ .cloneValue(cell));
+ // read the timestamp truncated by the generator
+ ts = TimestampGenerator.getTruncatedTimestamp(cell.getTimestamp());
+ if ((cellAggOp == AggregationOperation.SUM_FINAL)
+ && ((ts + this.appFinalValueRetentionThreshold)
+ < currentTimestamp)) {
+ sum = converter.add(sum, currentValue);
+ summationDone = true;
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("MAJOR COMPACTION loop sum= " + sum
+ + " discarding now: " + " qualifier="
+ + Bytes.toString(CellUtil.cloneQualifier(cell)) + " value="
+ + (Number) converter.decodeValue(CellUtil.cloneValue(cell))
+ + " timestamp=" + cell.getTimestamp() + " " + this.action);
+ }
+ } else {
+ // not a final value but it's the latest cell for this app
+ // so include this cell in the list of cells to write back
+ finalCells.add(cell);
+ }
+ }
+ }
+ if (summationDone) {
+ Cell anyCell = currentColumnCells.first();
+ List<Tag> tags = new ArrayList<Tag>();
+ Tag t = new Tag(AggregationOperation.SUM_FINAL.getTagType(),
+ Bytes.toBytes(FLOW_APP_ID));
+ tags.add(t);
+ t = new Tag(AggregationCompactionDimension.APPLICATION_ID.getTagType(),
+ Bytes.toBytes(FLOW_APP_ID));
+ tags.add(t);
+ byte[] tagByteArray = Tag.fromList(tags);
+ Cell sumCell = TimelineStorageUtils.createNewCell(
+ CellUtil.cloneRow(anyCell),
+ CellUtil.cloneFamily(anyCell),
+ CellUtil.cloneQualifier(anyCell),
+ TimestampGenerator.getSupplementedTimestamp(
+ System.currentTimeMillis(), FLOW_APP_ID),
+ converter.encodeValue(sum), tagByteArray);
+ finalCells.add(sumCell);
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("MAJOR COMPACTION final sum= " + sum + " for "
+ + Bytes.toString(CellUtil.cloneQualifier(sumCell))
+ + " " + this.action);
+ }
+ LOG.info("After major compaction for qualifier="
+ + Bytes.toString(CellUtil.cloneQualifier(sumCell))
+ + " with currentColumnCells.size="
+ + currentColumnCells.size()
+ + " returning finalCells.size=" + finalCells.size()
+ + " with sum=" + sum.longValue()
+ + " with cell timestamp " + sumCell.getTimestamp());
+ } else {
+ String qualifier = "";
+ LOG.info("After major compaction for qualifier=" + qualifier
+ + " with currentColumnCells.size="
+ + currentColumnCells.size()
+ + " returning finalCells.size=" + finalCells.size()
+ + " with zero sum="
+ + sum.longValue());
+ }
+ return finalCells;
+ }
+
/**
* Determines which cell is to be returned based on the values in each cell
* and the comparison operation MIN or MAX.
@@ -375,7 +550,7 @@ class FlowScanner implements RegionScanner, Closeable {
Number currentCellValue = (Number) converter.decodeValue(CellUtil
.cloneValue(currentCell));
switch (currentAggOp) {
- case MIN:
+ case GLOBAL_MIN:
if (converter.compare(
currentCellValue, previouslyChosenCellValue) < 0) {
// new value is minimum, hence return this cell
@@ -384,7 +559,7 @@ class FlowScanner implements RegionScanner, Closeable {
// previously chosen value is miniumum, hence return previous min cell
return previouslyChosenCell;
}
- case MAX:
+ case GLOBAL_MAX:
if (converter.compare(
currentCellValue, previouslyChosenCellValue) > 0) {
// new value is max, hence return this cell
@@ -402,16 +577,13 @@ class FlowScanner implements RegionScanner, Closeable {
}
}
- private Cell createNewCell(Cell origCell, byte[] newValue)
- throws IOException {
- return CellUtil.createCell(CellUtil.cloneRow(origCell),
- CellUtil.cloneFamily(origCell), CellUtil.cloneQualifier(origCell),
- origCell.getTimestamp(), KeyValue.Type.Put.getCode(), newValue);
- }
-
@Override
public void close() throws IOException {
- flowRunScanner.close();
+ if (flowRunScanner != null) {
+ flowRunScanner.close();
+ } else {
+ LOG.warn("scanner close called but scanner is null");
+ }
}
/**
@@ -423,8 +595,6 @@ class FlowScanner implements RegionScanner, Closeable {
/**
* Returns whether or not the underlying scanner has more rows.
- *
- * @return true, if there are more cells to return, false otherwise.
*/
public boolean hasMore() {
return currentIndex < availableCells.size() ? true : hasMore;
@@ -440,8 +610,7 @@ class FlowScanner implements RegionScanner, Closeable {
* fetched by the wrapped scanner
* @return the next available cell or null if no more cells are available for
* the current row
- * @throws IOException if any problem is encountered while grabbing the next
- * cell.
+ * @throws IOException
*/
public Cell nextCell(int cellLimit) throws IOException {
Cell cell = peekAtNextCell(cellLimit);
http://git-wip-us.apache.org/repos/asf/hadoop/blob/bc698197/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowScannerOperation.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowScannerOperation.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowScannerOperation.java
new file mode 100644
index 0000000..73c666f
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowScannerOperation.java
@@ -0,0 +1,46 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.timelineservice.storage.flow;
+
+
+/**
+ * Identifies the scanner operation on the {@link FlowRunTable}.
+ */
+public enum FlowScannerOperation {
+
+ /**
+ * If the scanner is opened for reading
+ * during preGet or preScan.
+ */
+ READ,
+
+ /**
+ * If the scanner is opened during preFlush.
+ */
+ FLUSH,
+
+ /**
+ * If the scanner is opened during minor Compaction.
+ */
+ MINOR_COMPACTION,
+
+ /**
+ * If the scanner is opened during major Compaction.
+ */
+ MAJOR_COMPACTION
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/bc698197/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/TestFlowDataGenerator.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/TestFlowDataGenerator.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/TestFlowDataGenerator.java
index d45df57..9793ce6 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/TestFlowDataGenerator.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/TestFlowDataGenerator.java
@@ -17,7 +17,6 @@
*/
package org.apache.hadoop.yarn.server.timelineservice.storage.flow;
-
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
@@ -29,17 +28,60 @@ import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent;
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetric;
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetric.Type;
import org.apache.hadoop.yarn.server.metrics.ApplicationMetricsConstants;
+import org.apache.hadoop.conf.Configuration;
/**
* Generates the data/entities for the FlowRun and FlowActivity Tables
*/
class TestFlowDataGenerator {
- private final static String metric1 = "MAP_SLOT_MILLIS";
- private final static String metric2 = "HDFS_BYTES_READ";
+ private static final String metric1 = "MAP_SLOT_MILLIS";
+ private static final String metric2 = "HDFS_BYTES_READ";
+ public static final long END_TS_INCR = 10000L;
+
+ static TimelineEntity getEntityMetricsApp1(long insertTs, Configuration c1) {
+ TimelineEntity entity = new TimelineEntity();
+ String id = "flowRunMetrics_test";
+ String type = TimelineEntityType.YARN_APPLICATION.toString();
+ entity.setId(id);
+ entity.setType(type);
+ long cTime = 1425016501000L;
+ entity.setCreatedTime(cTime);
+ // add metrics
+ Set<TimelineMetric> metrics = new HashSet<>();
+ TimelineMetric m1 = new TimelineMetric();
+ m1.setId(metric1);
+ Map<Long, Number> metricValues = new HashMap<Long, Number>();
+ long ts = insertTs;
- static TimelineEntity getEntityMetricsApp1() {
+ for (int k=1; k< 100 ; k++) {
+ metricValues.put(ts - k*200000, 20L);
+ }
+ metricValues.put(ts - 80000, 40L);
+ m1.setType(Type.TIME_SERIES);
+ m1.setValues(metricValues);
+ metrics.add(m1);
+
+ TimelineMetric m2 = new TimelineMetric();
+ m2.setId(metric2);
+ metricValues = new HashMap<Long, Number>();
+ ts = System.currentTimeMillis();
+ for (int k=1; k< 100 ; k++) {
+ metricValues.put(ts - k*100000, 31L);
+ }
+
+ metricValues.put(ts - 80000, 57L);
+ m2.setType(Type.TIME_SERIES);
+ m2.setValues(metricValues);
+ metrics.add(m2);
+
+ entity.addMetrics(metrics);
+ return entity;
+ }
+
+
+ static TimelineEntity getEntityMetricsApp1Complete(long insertTs, Configuration c1) {
TimelineEntity entity = new TimelineEntity();
String id = "flowRunMetrics_test";
String type = TimelineEntityType.YARN_APPLICATION.toString();
@@ -53,7 +95,48 @@ class TestFlowDataGenerator {
TimelineMetric m1 = new TimelineMetric();
m1.setId(metric1);
Map<Long, Number> metricValues = new HashMap<Long, Number>();
- long ts = System.currentTimeMillis();
+ long ts = insertTs;
+
+ metricValues.put(ts - 80000, 40L);
+ m1.setType(Type.TIME_SERIES);
+ m1.setValues(metricValues);
+ metrics.add(m1);
+
+ TimelineMetric m2 = new TimelineMetric();
+ m2.setId(metric2);
+ metricValues = new HashMap<Long, Number>();
+ ts = insertTs;
+ metricValues.put(ts - 80000, 57L);
+ m2.setType(Type.TIME_SERIES);
+ m2.setValues(metricValues);
+ metrics.add(m2);
+
+ entity.addMetrics(metrics);
+
+ TimelineEvent event = new TimelineEvent();
+ event.setId(ApplicationMetricsConstants.FINISHED_EVENT_TYPE);
+ event.setTimestamp(insertTs);
+ event.addInfo("done", "insertTs=" + insertTs);
+ entity.addEvent(event);
+ return entity;
+ }
+
+
+ static TimelineEntity getEntityMetricsApp1(long insertTs) {
+ TimelineEntity entity = new TimelineEntity();
+ String id = "flowRunMetrics_test";
+ String type = TimelineEntityType.YARN_APPLICATION.toString();
+ entity.setId(id);
+ entity.setType(type);
+ long cTime = 1425016501000L;
+ entity.setCreatedTime(cTime);
+
+ // add metrics
+ Set<TimelineMetric> metrics = new HashSet<>();
+ TimelineMetric m1 = new TimelineMetric();
+ m1.setId(metric1);
+ Map<Long, Number> metricValues = new HashMap<Long, Number>();
+ long ts = insertTs;
metricValues.put(ts - 100000, 2L);
metricValues.put(ts - 80000, 40L);
m1.setType(Type.TIME_SERIES);
@@ -63,7 +146,7 @@ class TestFlowDataGenerator {
TimelineMetric m2 = new TimelineMetric();
m2.setId(metric2);
metricValues = new HashMap<Long, Number>();
- ts = System.currentTimeMillis();
+ ts = insertTs;
metricValues.put(ts - 100000, 31L);
metricValues.put(ts - 80000, 57L);
m2.setType(Type.TIME_SERIES);
@@ -74,7 +157,8 @@ class TestFlowDataGenerator {
return entity;
}
- static TimelineEntity getEntityMetricsApp2() {
+
+ static TimelineEntity getEntityMetricsApp2(long insertTs) {
TimelineEntity entity = new TimelineEntity();
String id = "flowRunMetrics_test";
String type = TimelineEntityType.YARN_APPLICATION.toString();
@@ -87,7 +171,7 @@ class TestFlowDataGenerator {
TimelineMetric m1 = new TimelineMetric();
m1.setId(metric1);
Map<Long, Number> metricValues = new HashMap<Long, Number>();
- long ts = System.currentTimeMillis();
+ long ts = insertTs;
metricValues.put(ts - 100000, 5L);
metricValues.put(ts - 80000, 101L);
m1.setType(Type.TIME_SERIES);
@@ -140,6 +224,55 @@ class TestFlowDataGenerator {
return entity;
}
+ static TimelineEntity getAFullEntity(long ts, long endTs) {
+ TimelineEntity entity = new TimelineEntity();
+ String id = "flowRunFullEntity";
+ String type = TimelineEntityType.YARN_APPLICATION.toString();
+ entity.setId(id);
+ entity.setType(type);
+ entity.setCreatedTime(ts);
+ // add metrics
+ Set<TimelineMetric> metrics = new HashSet<>();
+ TimelineMetric m1 = new TimelineMetric();
+ m1.setId(metric1);
+ Map<Long, Number> metricValues = new HashMap<Long, Number>();
+ metricValues.put(ts - 120000, 100000000L);
+ metricValues.put(ts - 100000, 200000000L);
+ metricValues.put(ts - 80000, 300000000L);
+ metricValues.put(ts - 60000, 400000000L);
+ metricValues.put(ts - 40000, 50000000000L);
+ metricValues.put(ts - 20000, 60000000000L);
+ m1.setType(Type.TIME_SERIES);
+ m1.setValues(metricValues);
+ metrics.add(m1);
+ TimelineMetric m2 = new TimelineMetric();
+ m2.setId(metric2);
+ metricValues = new HashMap<Long, Number>();
+ metricValues.put(ts - 900000, 31L);
+ metricValues.put(ts - 30000, 57L);
+ m2.setType(Type.TIME_SERIES);
+ m2.setValues(metricValues);
+ metrics.add(m2);
+ entity.addMetrics(metrics);
+
+ TimelineEvent event = new TimelineEvent();
+ event.setId(ApplicationMetricsConstants.CREATED_EVENT_TYPE);
+ event.setTimestamp(ts);
+ String expKey = "foo_event";
+ Object expVal = "test";
+ event.addInfo(expKey, expVal);
+ entity.addEvent(event);
+
+ event = new TimelineEvent();
+ event.setId(ApplicationMetricsConstants.FINISHED_EVENT_TYPE);
+ long expTs = ts + 21600000;// start time + 6hrs
+ event.setTimestamp(expTs);
+ event.addInfo(expKey, expVal);
+ entity.addEvent(event);
+
+ return entity;
+ }
+
static TimelineEntity getEntityGreaterStartTime(long startTs) {
TimelineEntity entity = new TimelineEntity();
entity.setCreatedTime(startTs);
@@ -184,6 +317,34 @@ class TestFlowDataGenerator {
return entity;
}
+ static TimelineEntity getMinFlushEntity(long startTs) {
+ TimelineEntity entity = new TimelineEntity();
+ String id = "flowRunHelloFlushEntityMin";
+ String type = TimelineEntityType.YARN_APPLICATION.toString();
+ entity.setId(id);
+ entity.setType(type);
+ entity.setCreatedTime(startTs);
+ TimelineEvent event = new TimelineEvent();
+ event.setId(ApplicationMetricsConstants.CREATED_EVENT_TYPE);
+ event.setTimestamp(startTs);
+ entity.addEvent(event);
+ return entity;
+ }
+
+ static TimelineEntity getMaxFlushEntity(long startTs) {
+ TimelineEntity entity = new TimelineEntity();
+ String id = "flowRunHelloFlushEntityMax";
+ String type = TimelineEntityType.YARN_APPLICATION.toString();
+ entity.setId(id);
+ entity.setType(type);
+ entity.setCreatedTime(startTs);
+
+ TimelineEvent event = new TimelineEvent();
+ event.setId(ApplicationMetricsConstants.FINISHED_EVENT_TYPE);
+ event.setTimestamp(startTs + END_TS_INCR);
+ entity.addEvent(event);
+ return entity;
+ }
static TimelineEntity getFlowApp1(long appCreatedTime) {
TimelineEntity entity = new TimelineEntity();
@@ -203,5 +364,4 @@ class TestFlowDataGenerator {
return entity;
}
-
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/bc698197/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/TestHBaseStorageFlowRun.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/TestHBaseStorageFlowRun.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/TestHBaseStorageFlowRun.java
index b234bfd..f04dd48 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/TestHBaseStorageFlowRun.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/TestHBaseStorageFlowRun.java
@@ -52,8 +52,8 @@ import org.apache.hadoop.yarn.server.timelineservice.reader.filter.TimelineFilte
import org.apache.hadoop.yarn.server.timelineservice.reader.filter.TimelinePrefixFilter;
import org.apache.hadoop.yarn.server.timelineservice.storage.HBaseTimelineReaderImpl;
import org.apache.hadoop.yarn.server.timelineservice.storage.HBaseTimelineWriterImpl;
-import org.apache.hadoop.yarn.server.timelineservice.storage.TimelineReader.Field;
import org.apache.hadoop.yarn.server.timelineservice.storage.TimelineSchemaCreator;
+import org.apache.hadoop.yarn.server.timelineservice.storage.TimelineReader.Field;
import org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnHelper;
import org.junit.AfterClass;
import org.junit.BeforeClass;
@@ -216,7 +216,8 @@ public class TestHBaseStorageFlowRun {
long runid = 1002345678919L;
TimelineEntities te = new TimelineEntities();
- TimelineEntity entityApp1 = TestFlowDataGenerator.getEntityMetricsApp1();
+ TimelineEntity entityApp1 = TestFlowDataGenerator
+ .getEntityMetricsApp1(System.currentTimeMillis());
te.addEntity(entityApp1);
HBaseTimelineWriterImpl hbi = null;
@@ -228,7 +229,8 @@ public class TestHBaseStorageFlowRun {
hbi.write(cluster, user, flow, flowVersion, runid, appName, te);
// write another application with same metric to this flow
te = new TimelineEntities();
- TimelineEntity entityApp2 = TestFlowDataGenerator.getEntityMetricsApp2();
+ TimelineEntity entityApp2 = TestFlowDataGenerator
+ .getEntityMetricsApp2(System.currentTimeMillis());
te.addEntity(entityApp2);
appName = "application_11111111111111_2222";
hbi.write(cluster, user, flow, flowVersion, runid, appName, te);
@@ -323,7 +325,8 @@ public class TestHBaseStorageFlowRun {
long runid = 1002345678919L;
TimelineEntities te = new TimelineEntities();
- TimelineEntity entityApp1 = TestFlowDataGenerator.getEntityMetricsApp1();
+ TimelineEntity entityApp1 = TestFlowDataGenerator
+ .getEntityMetricsApp1(System.currentTimeMillis());
te.addEntity(entityApp1);
HBaseTimelineWriterImpl hbi = null;
@@ -335,7 +338,8 @@ public class TestHBaseStorageFlowRun {
hbi.write(cluster, user, flow, flowVersion, runid, appName, te);
// write another application with same metric to this flow
te = new TimelineEntities();
- TimelineEntity entityApp2 = TestFlowDataGenerator.getEntityMetricsApp2();
+ TimelineEntity entityApp2 = TestFlowDataGenerator
+ .getEntityMetricsApp2(System.currentTimeMillis());
te.addEntity(entityApp2);
appName = "application_11111111111111_2222";
hbi.write(cluster, user, flow, flowVersion, runid, appName, te);
@@ -420,7 +424,8 @@ public class TestHBaseStorageFlowRun {
long runid = 1002345678919L;
TimelineEntities te = new TimelineEntities();
- TimelineEntity entityApp1 = TestFlowDataGenerator.getEntityMetricsApp1();
+ TimelineEntity entityApp1 = TestFlowDataGenerator
+ .getEntityMetricsApp1(System.currentTimeMillis());
te.addEntity(entityApp1);
HBaseTimelineWriterImpl hbi = null;
@@ -432,7 +437,8 @@ public class TestHBaseStorageFlowRun {
hbi.write(cluster, user, flow, flowVersion, runid, appName, te);
// write another application with same metric to this flow
te = new TimelineEntities();
- TimelineEntity entityApp2 = TestFlowDataGenerator.getEntityMetricsApp2();
+ TimelineEntity entityApp2 = TestFlowDataGenerator
+ .getEntityMetricsApp2(System.currentTimeMillis());
te.addEntity(entityApp2);
appName = "application_11111111111111_2222";
hbi.write(cluster, user, flow, flowVersion, runid, appName, te);
@@ -494,6 +500,98 @@ public class TestHBaseStorageFlowRun {
}
}
+ @Test
+ public void testWriteFlowRunFlush() throws Exception {
+ String cluster = "atestFlushFlowRun_cluster1";
+ String user = "atestFlushFlowRun__user1";
+ String flow = "atestFlushFlowRun_flow_name";
+ String flowVersion = "AF1021C19F1351";
+ long runid = 1449526652000L;
+
+ int start = 10;
+ int count = 20000;
+ int appIdSuffix = 1;
+ HBaseTimelineWriterImpl hbi = null;
+ long insertTs = 1449796654827L - count;
+ long minTS = insertTs + 1;
+ long startTs = insertTs;
+ Configuration c1 = util.getConfiguration();
+ TimelineEntities te1 = null;
+ TimelineEntity entityApp1 = null;
+ TimelineEntity entityApp2 = null;
+ try {
+ hbi = new HBaseTimelineWriterImpl(c1);
+ hbi.init(c1);
+
+ for (int i = start; i < count; i++) {
+ String appName = "application_1060350000000_" + appIdSuffix;
+ insertTs++;
+ te1 = new TimelineEntities();
+ entityApp1 = TestFlowDataGenerator.getMinFlushEntity(insertTs);
+ te1.addEntity(entityApp1);
+ entityApp2 = TestFlowDataGenerator.getMaxFlushEntity(insertTs);
+ te1.addEntity(entityApp2);
+ hbi.write(cluster, user, flow, flowVersion, runid, appName, te1);
+ Thread.sleep(1);
+
+ appName = "application_1001199480000_7" + appIdSuffix;
+ insertTs++;
+ appIdSuffix++;
+ te1 = new TimelineEntities();
+ entityApp1 = TestFlowDataGenerator.getMinFlushEntity(insertTs);
+ te1.addEntity(entityApp1);
+ entityApp2 = TestFlowDataGenerator.getMaxFlushEntity(insertTs);
+ te1.addEntity(entityApp2);
+
+ hbi.write(cluster, user, flow, flowVersion, runid, appName, te1);
+ if (i % 1000 == 0) {
+ hbi.flush();
+ checkMinMaxFlush(c1, minTS, startTs, count, cluster, user, flow,
+ runid, false);
+ }
+ }
+ } finally {
+ hbi.flush();
+ hbi.close();
+ checkMinMaxFlush(c1, minTS, startTs, count, cluster, user, flow, runid,
+ true);
+ }
+ }
+
+ private void checkMinMaxFlush(Configuration c1, long minTS, long startTs,
+ int count, String cluster, String user, String flow, long runid,
+ boolean checkMax) throws IOException {
+ Connection conn = ConnectionFactory.createConnection(c1);
+ // check in flow run table
+ Table table1 = conn.getTable(TableName
+ .valueOf(FlowRunTable.DEFAULT_TABLE_NAME));
+ // scan the table and see that we get back the right min and max
+ // timestamps
+ byte[] startRow = FlowRunRowKey.getRowKey(cluster, user, flow, runid);
+ Get g = new Get(startRow);
+ g.addColumn(FlowRunColumnFamily.INFO.getBytes(),
+ FlowRunColumn.MIN_START_TIME.getColumnQualifierBytes());
+ g.addColumn(FlowRunColumnFamily.INFO.getBytes(),
+ FlowRunColumn.MAX_END_TIME.getColumnQualifierBytes());
+
+ Result r1 = table1.get(g);
+ assertNotNull(r1);
+ assertTrue(!r1.isEmpty());
+ Map<byte[], byte[]> values = r1.getFamilyMap(FlowRunColumnFamily.INFO
+ .getBytes());
+ int start = 10;
+ assertEquals(2, r1.size());
+ long starttime = Bytes.toLong(values
+ .get(FlowRunColumn.MIN_START_TIME.getColumnQualifierBytes()));
+ assertEquals(minTS, starttime);
+ if (checkMax) {
+ assertEquals(startTs + 2 * (count - start)
+ + TestFlowDataGenerator.END_TS_INCR,
+ Bytes.toLong(values
+ .get(FlowRunColumn.MAX_END_TIME.getColumnQualifierBytes())));
+ }
+ }
+
@AfterClass
public static void tearDownAfterClass() throws Exception {
util.shutdownMiniCluster();