You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by gt...@apache.org on 2016/01/20 10:13:44 UTC
[27/50] [abbrv] hadoop git commit: YARN-4178. [storage
implementation] app id as string in row keys can cause incorrect ordering
(Varun Saxena via sjlee)
YARN-4178. [storage implementation] app id as string in row keys can cause incorrect ordering (Varun Saxena via sjlee)
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/fdaa1e4e
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/fdaa1e4e
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/fdaa1e4e
Branch: refs/heads/feature-YARN-2928
Commit: fdaa1e4e16e71d9793b4302a50a4335356e3c70f
Parents: 2b6784a
Author: Sangjin Lee <sj...@apache.org>
Authored: Tue Oct 6 16:06:28 2015 -0700
Committer: Li Lu <gt...@apache.org>
Committed: Tue Jan 19 17:41:58 2016 -0800
----------------------------------------------------------------------
hadoop-yarn-project/CHANGES.txt | 3 +
.../storage/ApplicationEntityReader.java | 14 +-
.../storage/FileSystemTimelineReaderImpl.java | 14 +-
.../storage/GenericEntityReader.java | 17 +-
.../storage/HBaseTimelineWriterImpl.java | 20 +-
.../storage/application/ApplicationRowKey.java | 13 +-
.../storage/apptoflow/AppToFlowRowKey.java | 7 +-
.../storage/common/Separator.java | 4 +-
.../storage/common/TimelineReaderUtils.java | 112 -----
.../storage/common/TimelineStorageUtils.java | 475 +++++++++++++++++++
.../storage/common/TimelineWriterUtils.java | 328 -------------
.../storage/entity/EntityRowKey.java | 32 +-
.../storage/flow/FlowActivityColumnPrefix.java | 6 +-
.../storage/flow/FlowActivityRowKey.java | 9 +-
.../storage/flow/FlowRunColumn.java | 4 +-
.../storage/flow/FlowRunColumnPrefix.java | 6 +-
.../storage/flow/FlowRunCoprocessor.java | 4 +-
.../storage/flow/FlowRunRowKey.java | 6 +-
.../storage/flow/FlowScanner.java | 6 +-
.../storage/TestHBaseTimelineStorage.java | 13 +-
.../common/TestTimelineStorageUtils.java | 56 +++
.../storage/common/TestTimelineWriterUtils.java | 29 --
.../flow/TestHBaseStorageFlowActivity.java | 10 +-
23 files changed, 629 insertions(+), 559 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/fdaa1e4e/hadoop-yarn-project/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt
index 902d05e..4ed4949 100644
--- a/hadoop-yarn-project/CHANGES.txt
+++ b/hadoop-yarn-project/CHANGES.txt
@@ -168,6 +168,9 @@ Branch YARN-2928: Timeline Server Next Generation: Phase 1
YARN-4058. Miscellaneous issues in NodeManager project (Naganarasimha G R
via sjlee)
+ YARN-4178. [storage implementation] app id as string in row keys can cause
+ incorrect ordering (Varun Saxena via sjlee)
+
Trunk - Unreleased
INCOMPATIBLE CHANGES
http://git-wip-us.apache.org/repos/asf/hadoop/blob/fdaa1e4e/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/ApplicationEntityReader.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/ApplicationEntityReader.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/ApplicationEntityReader.java
index 61954e1..6d1a2ff 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/ApplicationEntityReader.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/ApplicationEntityReader.java
@@ -37,7 +37,7 @@ import org.apache.hadoop.yarn.server.timelineservice.storage.application.Applica
import org.apache.hadoop.yarn.server.timelineservice.storage.application.ApplicationRowKey;
import org.apache.hadoop.yarn.server.timelineservice.storage.application.ApplicationTable;
import org.apache.hadoop.yarn.server.timelineservice.storage.common.BaseTable;
-import org.apache.hadoop.yarn.server.timelineservice.storage.common.TimelineReaderUtils;
+import org.apache.hadoop.yarn.server.timelineservice.storage.common.TimelineStorageUtils;
import com.google.common.base.Preconditions;
@@ -182,7 +182,7 @@ class ApplicationEntityReader extends GenericEntityReader {
fieldsToRetrieve.contains(Field.IS_RELATED_TO) || checkIsRelatedTo) {
readRelationship(entity, result, ApplicationColumnPrefix.IS_RELATED_TO,
true);
- if (checkIsRelatedTo && !TimelineReaderUtils.matchRelations(
+ if (checkIsRelatedTo && !TimelineStorageUtils.matchRelations(
entity.getIsRelatedToEntities(), isRelatedTo)) {
return null;
}
@@ -198,7 +198,7 @@ class ApplicationEntityReader extends GenericEntityReader {
fieldsToRetrieve.contains(Field.RELATES_TO) || checkRelatesTo) {
readRelationship(entity, result, ApplicationColumnPrefix.RELATES_TO,
false);
- if (checkRelatesTo && !TimelineReaderUtils.matchRelations(
+ if (checkRelatesTo && !TimelineStorageUtils.matchRelations(
entity.getRelatesToEntities(), relatesTo)) {
return null;
}
@@ -214,7 +214,7 @@ class ApplicationEntityReader extends GenericEntityReader {
fieldsToRetrieve.contains(Field.INFO) || checkInfo) {
readKeyValuePairs(entity, result, ApplicationColumnPrefix.INFO, false);
if (checkInfo &&
- !TimelineReaderUtils.matchFilters(entity.getInfo(), infoFilters)) {
+ !TimelineStorageUtils.matchFilters(entity.getInfo(), infoFilters)) {
return null;
}
if (!fieldsToRetrieve.contains(Field.ALL) &&
@@ -228,7 +228,7 @@ class ApplicationEntityReader extends GenericEntityReader {
if (fieldsToRetrieve.contains(Field.ALL) ||
fieldsToRetrieve.contains(Field.CONFIGS) || checkConfigs) {
readKeyValuePairs(entity, result, ApplicationColumnPrefix.CONFIG, true);
- if (checkConfigs && !TimelineReaderUtils.matchFilters(
+ if (checkConfigs && !TimelineStorageUtils.matchFilters(
entity.getConfigs(), configFilters)) {
return null;
}
@@ -243,7 +243,7 @@ class ApplicationEntityReader extends GenericEntityReader {
if (fieldsToRetrieve.contains(Field.ALL) ||
fieldsToRetrieve.contains(Field.EVENTS) || checkEvents) {
readEvents(entity, result, true);
- if (checkEvents && !TimelineReaderUtils.matchEventFilters(
+ if (checkEvents && !TimelineStorageUtils.matchEventFilters(
entity.getEvents(), eventFilters)) {
return null;
}
@@ -258,7 +258,7 @@ class ApplicationEntityReader extends GenericEntityReader {
if (fieldsToRetrieve.contains(Field.ALL) ||
fieldsToRetrieve.contains(Field.METRICS) || checkMetrics) {
readMetrics(entity, result, ApplicationColumnPrefix.METRIC);
- if (checkMetrics && !TimelineReaderUtils.matchMetricFilters(
+ if (checkMetrics && !TimelineStorageUtils.matchMetricFilters(
entity.getMetrics(), metricFilters)) {
return null;
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/fdaa1e4e/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/FileSystemTimelineReaderImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/FileSystemTimelineReaderImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/FileSystemTimelineReaderImpl.java
index 626c770..30d1d00 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/FileSystemTimelineReaderImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/FileSystemTimelineReaderImpl.java
@@ -44,7 +44,7 @@ import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity;
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent;
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetric;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
-import org.apache.hadoop.yarn.server.timelineservice.storage.common.TimelineReaderUtils;
+import org.apache.hadoop.yarn.server.timelineservice.storage.common.TimelineStorageUtils;
import org.apache.hadoop.yarn.webapp.YarnJacksonJaxbJsonProvider;
import org.codehaus.jackson.JsonGenerationException;
import org.codehaus.jackson.map.JsonMappingException;
@@ -321,31 +321,31 @@ public class FileSystemTimelineReaderImpl extends AbstractService
continue;
}
if (relatesTo != null && !relatesTo.isEmpty() &&
- !TimelineReaderUtils
+ !TimelineStorageUtils
.matchRelations(entity.getRelatesToEntities(), relatesTo)) {
continue;
}
if (isRelatedTo != null && !isRelatedTo.isEmpty() &&
- !TimelineReaderUtils
+ !TimelineStorageUtils
.matchRelations(entity.getIsRelatedToEntities(), isRelatedTo)) {
continue;
}
if (infoFilters != null && !infoFilters.isEmpty() &&
- !TimelineReaderUtils.matchFilters(entity.getInfo(), infoFilters)) {
+ !TimelineStorageUtils.matchFilters(entity.getInfo(), infoFilters)) {
continue;
}
if (configFilters != null && !configFilters.isEmpty() &&
- !TimelineReaderUtils.matchFilters(
+ !TimelineStorageUtils.matchFilters(
entity.getConfigs(), configFilters)) {
continue;
}
if (metricFilters != null && !metricFilters.isEmpty() &&
- !TimelineReaderUtils.matchMetricFilters(
+ !TimelineStorageUtils.matchMetricFilters(
entity.getMetrics(), metricFilters)) {
continue;
}
if (eventFilters != null && !eventFilters.isEmpty() &&
- !TimelineReaderUtils.matchEventFilters(
+ !TimelineStorageUtils.matchEventFilters(
entity.getEvents(), eventFilters)) {
continue;
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/fdaa1e4e/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/GenericEntityReader.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/GenericEntityReader.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/GenericEntityReader.java
index 42079d7..c18966f 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/GenericEntityReader.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/GenericEntityReader.java
@@ -44,8 +44,7 @@ import org.apache.hadoop.yarn.server.timelineservice.storage.apptoflow.AppToFlow
import org.apache.hadoop.yarn.server.timelineservice.storage.common.BaseTable;
import org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnPrefix;
import org.apache.hadoop.yarn.server.timelineservice.storage.common.Separator;
-import org.apache.hadoop.yarn.server.timelineservice.storage.common.TimelineReaderUtils;
-import org.apache.hadoop.yarn.server.timelineservice.storage.common.TimelineWriterUtils;
+import org.apache.hadoop.yarn.server.timelineservice.storage.common.TimelineStorageUtils;
import org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityColumn;
import org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityColumnPrefix;
import org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityRowKey;
@@ -220,7 +219,7 @@ class GenericEntityReader extends TimelineEntityReader {
if (fieldsToRetrieve.contains(Field.ALL) ||
fieldsToRetrieve.contains(Field.IS_RELATED_TO) || checkIsRelatedTo) {
readRelationship(entity, result, EntityColumnPrefix.IS_RELATED_TO, true);
- if (checkIsRelatedTo && !TimelineReaderUtils.matchRelations(
+ if (checkIsRelatedTo && !TimelineStorageUtils.matchRelations(
entity.getIsRelatedToEntities(), isRelatedTo)) {
return null;
}
@@ -235,7 +234,7 @@ class GenericEntityReader extends TimelineEntityReader {
if (fieldsToRetrieve.contains(Field.ALL) ||
fieldsToRetrieve.contains(Field.RELATES_TO) || checkRelatesTo) {
readRelationship(entity, result, EntityColumnPrefix.RELATES_TO, false);
- if (checkRelatesTo && !TimelineReaderUtils.matchRelations(
+ if (checkRelatesTo && !TimelineStorageUtils.matchRelations(
entity.getRelatesToEntities(), relatesTo)) {
return null;
}
@@ -251,7 +250,7 @@ class GenericEntityReader extends TimelineEntityReader {
fieldsToRetrieve.contains(Field.INFO) || checkInfo) {
readKeyValuePairs(entity, result, EntityColumnPrefix.INFO, false);
if (checkInfo &&
- !TimelineReaderUtils.matchFilters(entity.getInfo(), infoFilters)) {
+ !TimelineStorageUtils.matchFilters(entity.getInfo(), infoFilters)) {
return null;
}
if (!fieldsToRetrieve.contains(Field.ALL) &&
@@ -265,7 +264,7 @@ class GenericEntityReader extends TimelineEntityReader {
if (fieldsToRetrieve.contains(Field.ALL) ||
fieldsToRetrieve.contains(Field.CONFIGS) || checkConfigs) {
readKeyValuePairs(entity, result, EntityColumnPrefix.CONFIG, true);
- if (checkConfigs && !TimelineReaderUtils.matchFilters(
+ if (checkConfigs && !TimelineStorageUtils.matchFilters(
entity.getConfigs(), configFilters)) {
return null;
}
@@ -280,7 +279,7 @@ class GenericEntityReader extends TimelineEntityReader {
if (fieldsToRetrieve.contains(Field.ALL) ||
fieldsToRetrieve.contains(Field.EVENTS) || checkEvents) {
readEvents(entity, result, false);
- if (checkEvents && !TimelineReaderUtils.matchEventFilters(
+ if (checkEvents && !TimelineStorageUtils.matchEventFilters(
entity.getEvents(), eventFilters)) {
return null;
}
@@ -295,7 +294,7 @@ class GenericEntityReader extends TimelineEntityReader {
if (fieldsToRetrieve.contains(Field.ALL) ||
fieldsToRetrieve.contains(Field.METRICS) || checkMetrics) {
readMetrics(entity, result, EntityColumnPrefix.METRIC);
- if (checkMetrics && !TimelineReaderUtils.matchMetricFilters(
+ if (checkMetrics && !TimelineStorageUtils.matchMetricFilters(
entity.getMetrics(), metricFilters)) {
return null;
}
@@ -365,7 +364,7 @@ class GenericEntityReader extends TimelineEntityReader {
// the column name is of the form "eventId=timestamp=infoKey"
if (karr.length == 3) {
String id = Bytes.toString(karr[0]);
- long ts = TimelineWriterUtils.invert(Bytes.toLong(karr[1]));
+ long ts = TimelineStorageUtils.invertLong(Bytes.toLong(karr[1]));
String key = Separator.VALUES.joinEncoded(id, Long.toString(ts));
TimelineEvent event = eventsMap.get(key);
if (event == null) {
http://git-wip-us.apache.org/repos/asf/hadoop/blob/fdaa1e4e/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/HBaseTimelineWriterImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/HBaseTimelineWriterImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/HBaseTimelineWriterImpl.java
index 7c4a5da..3649865 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/HBaseTimelineWriterImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/HBaseTimelineWriterImpl.java
@@ -46,7 +46,7 @@ import org.apache.hadoop.yarn.server.timelineservice.storage.apptoflow.AppToFlow
import org.apache.hadoop.yarn.server.timelineservice.storage.apptoflow.AppToFlowTable;
import org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnPrefix;
import org.apache.hadoop.yarn.server.timelineservice.storage.common.Separator;
-import org.apache.hadoop.yarn.server.timelineservice.storage.common.TimelineWriterUtils;
+import org.apache.hadoop.yarn.server.timelineservice.storage.common.TimelineStorageUtils;
import org.apache.hadoop.yarn.server.timelineservice.storage.common.TypedBufferedMutator;
import org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityColumn;
import org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityColumnPrefix;
@@ -125,7 +125,7 @@ public class HBaseTimelineWriterImpl extends AbstractService implements
// if the entity is the application, the destination is the application
// table
- boolean isApplication = TimelineWriterUtils.isApplicationEntity(te);
+ boolean isApplication = TimelineStorageUtils.isApplicationEntity(te);
byte[] rowKey = isApplication ?
ApplicationRowKey.getRowKey(clusterId, userId, flowName, flowRunId,
appId) :
@@ -139,7 +139,7 @@ public class HBaseTimelineWriterImpl extends AbstractService implements
storeRelations(rowKey, te, isApplication);
if (isApplication) {
- if (TimelineWriterUtils.isApplicationCreated(te)) {
+ if (TimelineStorageUtils.isApplicationCreated(te)) {
onApplicationCreated(clusterId, userId, flowName, flowVersion,
flowRunId, appId, te);
}
@@ -149,7 +149,7 @@ public class HBaseTimelineWriterImpl extends AbstractService implements
// if application has finished, store it's finish time and write final
// values
// of all metrics
- if (TimelineWriterUtils.isApplicationFinished(te)) {
+ if (TimelineStorageUtils.isApplicationFinished(te)) {
onApplicationFinished(clusterId, userId, flowName, flowVersion,
flowRunId, appId, te);
}
@@ -234,7 +234,7 @@ public class HBaseTimelineWriterImpl extends AbstractService implements
Attribute attributeAppId = AggregationCompactionDimension.APPLICATION_ID
.getAttribute(appId);
FlowRunColumn.MAX_END_TIME.store(rowKey, flowRunTable, null,
- TimelineWriterUtils.getApplicationFinishedTime(te), attributeAppId);
+ TimelineStorageUtils.getApplicationFinishedTime(te), attributeAppId);
// store the final value of metrics since application has finished
Set<TimelineMetric> metrics = te.getMetrics();
@@ -406,9 +406,9 @@ public class HBaseTimelineWriterImpl extends AbstractService implements
}
byte[] columnQualifierFirst =
Bytes.toBytes(Separator.VALUES.encode(eventId));
- byte[] columnQualifierWithTsBytes =
- Separator.VALUES.join(columnQualifierFirst,
- Bytes.toBytes(TimelineWriterUtils.invert(eventTimestamp)));
+ byte[] columnQualifierWithTsBytes = Separator.VALUES.
+ join(columnQualifierFirst, Bytes.toBytes(
+ TimelineStorageUtils.invertLong(eventTimestamp)));
Map<String, Object> eventInfo = event.getInfo();
if ((eventInfo == null) || (eventInfo.size() == 0)) {
// add separator since event key is empty
@@ -418,11 +418,11 @@ public class HBaseTimelineWriterImpl extends AbstractService implements
if (isApplication) {
ApplicationColumnPrefix.EVENT.store(rowKey, applicationTable,
compoundColumnQualifierBytes, null,
- TimelineWriterUtils.EMPTY_BYTES);
+ TimelineStorageUtils.EMPTY_BYTES);
} else {
EntityColumnPrefix.EVENT.store(rowKey, entityTable,
compoundColumnQualifierBytes, null,
- TimelineWriterUtils.EMPTY_BYTES);
+ TimelineStorageUtils.EMPTY_BYTES);
}
} else {
for (Map.Entry<String, Object> info : eventInfo.entrySet()) {
http://git-wip-us.apache.org/repos/asf/hadoop/blob/fdaa1e4e/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/application/ApplicationRowKey.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/application/ApplicationRowKey.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/application/ApplicationRowKey.java
index 10e3c2e..1cf6145 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/application/ApplicationRowKey.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/application/ApplicationRowKey.java
@@ -19,7 +19,7 @@ package org.apache.hadoop.yarn.server.timelineservice.storage.application;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.yarn.server.timelineservice.storage.common.Separator;
-import org.apache.hadoop.yarn.server.timelineservice.storage.common.TimelineWriterUtils;
+import org.apache.hadoop.yarn.server.timelineservice.storage.common.TimelineStorageUtils;
/**
* Represents a rowkey for the application table.
@@ -90,7 +90,7 @@ public class ApplicationRowKey {
String flowId, Long flowRunId) {
byte[] first = Bytes.toBytes(
Separator.QUALIFIERS.joinEncoded(clusterId, userId, flowId));
- byte[] second = Bytes.toBytes(TimelineWriterUtils.invert(flowRunId));
+ byte[] second = Bytes.toBytes(TimelineStorageUtils.invertLong(flowRunId));
return Separator.QUALIFIERS.join(first, second, new byte[0]);
}
@@ -112,8 +112,8 @@ public class ApplicationRowKey {
flowId));
// Note that flowRunId is a long, so we can't encode them all at the same
// time.
- byte[] second = Bytes.toBytes(TimelineWriterUtils.invert(flowRunId));
- byte[] third = Bytes.toBytes(appId);
+ byte[] second = Bytes.toBytes(TimelineStorageUtils.invertLong(flowRunId));
+ byte[] third = TimelineStorageUtils.encodeAppId(appId);
return Separator.QUALIFIERS.join(first, second, third);
}
@@ -135,9 +135,8 @@ public class ApplicationRowKey {
String flowId =
Separator.QUALIFIERS.decode(Bytes.toString(rowKeyComponents[2]));
long flowRunId =
- TimelineWriterUtils.invert(Bytes.toLong(rowKeyComponents[3]));
- String appId =
- Separator.QUALIFIERS.decode(Bytes.toString(rowKeyComponents[4]));
+ TimelineStorageUtils.invertLong(Bytes.toLong(rowKeyComponents[3]));
+ String appId = TimelineStorageUtils.decodeAppId(rowKeyComponents[4]);
return new ApplicationRowKey(clusterId, userId, flowId, flowRunId, appId);
}
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/fdaa1e4e/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/apptoflow/AppToFlowRowKey.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/apptoflow/AppToFlowRowKey.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/apptoflow/AppToFlowRowKey.java
index ca88056..133952e 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/apptoflow/AppToFlowRowKey.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/apptoflow/AppToFlowRowKey.java
@@ -19,6 +19,7 @@ package org.apache.hadoop.yarn.server.timelineservice.storage.apptoflow;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.yarn.server.timelineservice.storage.common.Separator;
+import org.apache.hadoop.yarn.server.timelineservice.storage.common.TimelineStorageUtils;
/**
* Represents a rowkey for the app_flow table.
@@ -49,7 +50,9 @@ public class AppToFlowRowKey {
* @return byte array with the row key
*/
public static byte[] getRowKey(String clusterId, String appId) {
- return Bytes.toBytes(Separator.QUALIFIERS.joinEncoded(clusterId, appId));
+ byte[] first = Bytes.toBytes(clusterId);
+ byte[] second = TimelineStorageUtils.encodeAppId(appId);
+ return Separator.QUALIFIERS.join(first, second);
}
/**
@@ -64,7 +67,7 @@ public class AppToFlowRowKey {
}
String clusterId = Bytes.toString(rowKeyComponents[0]);
- String appId = Bytes.toString(rowKeyComponents[1]);
+ String appId = TimelineStorageUtils.decodeAppId(rowKeyComponents[1]);
return new AppToFlowRowKey(clusterId, appId);
}
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/fdaa1e4e/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/Separator.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/Separator.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/Separator.java
index 9f91af8..1e82494 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/Separator.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/Separator.java
@@ -304,7 +304,7 @@ public enum Separator {
* @return source split by this separator.
*/
public byte[][] split(byte[] source, int limit) {
- return TimelineWriterUtils.split(source, this.bytes, limit);
+ return TimelineStorageUtils.split(source, this.bytes, limit);
}
/**
@@ -315,6 +315,6 @@ public enum Separator {
* @return source split by this separator.
*/
public byte[][] split(byte[] source) {
- return TimelineWriterUtils.split(source, this.bytes);
+ return TimelineStorageUtils.split(source, this.bytes);
}
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/fdaa1e4e/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/TimelineReaderUtils.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/TimelineReaderUtils.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/TimelineReaderUtils.java
deleted file mode 100644
index 91d7ba4..0000000
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/TimelineReaderUtils.java
+++ /dev/null
@@ -1,112 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.yarn.server.timelineservice.storage.common;
-
-import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent;
-import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetric;
-
-import java.util.HashSet;
-import java.util.Map;
-import java.util.Set;
-
-
-public class TimelineReaderUtils {
- /**
- *
- * @param entityRelations the relations of an entity
- * @param relationFilters the relations for filtering
- * @return a boolean flag to indicate if both match
- */
- public static boolean matchRelations(
- Map<String, Set<String>> entityRelations,
- Map<String, Set<String>> relationFilters) {
- for (Map.Entry<String, Set<String>> relation : relationFilters.entrySet()) {
- Set<String> ids = entityRelations.get(relation.getKey());
- if (ids == null) {
- return false;
- }
- for (String id : relation.getValue()) {
- if (!ids.contains(id)) {
- return false;
- }
- }
- }
- return true;
- }
-
- /**
- *
- * @param map the map of key/value pairs in an entity
- * @param filters the map of key/value pairs for filtering
- * @return a boolean flag to indicate if both match
- */
- public static boolean matchFilters(Map<String, ? extends Object> map,
- Map<String, ? extends Object> filters) {
- for (Map.Entry<String, ? extends Object> filter : filters.entrySet()) {
- Object value = map.get(filter.getKey());
- if (value == null) {
- return false;
- }
- if (!value.equals(filter.getValue())) {
- return false;
- }
- }
- return true;
- }
-
- /**
- *
- * @param entityEvents the set of event objects in an entity
- * @param eventFilters the set of event Ids for filtering
- * @return a boolean flag to indicate if both match
- */
- public static boolean matchEventFilters(Set<TimelineEvent> entityEvents,
- Set<String> eventFilters) {
- Set<String> eventIds = new HashSet<String>();
- for (TimelineEvent event : entityEvents) {
- eventIds.add(event.getId());
- }
- for (String eventFilter : eventFilters) {
- if (!eventIds.contains(eventFilter)) {
- return false;
- }
- }
- return true;
- }
-
- /**
- *
- * @param metrics the set of metric objects in an entity
- * @param metricFilters the set of metric Ids for filtering
- * @return a boolean flag to indicate if both match
- */
- public static boolean matchMetricFilters(Set<TimelineMetric> metrics,
- Set<String> metricFilters) {
- Set<String> metricIds = new HashSet<String>();
- for (TimelineMetric metric : metrics) {
- metricIds.add(metric.getId());
- }
-
- for (String metricFilter : metricFilters) {
- if (!metricIds.contains(metricFilter)) {
- return false;
- }
- }
- return true;
- }
-}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/fdaa1e4e/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/TimelineStorageUtils.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/TimelineStorageUtils.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/TimelineStorageUtils.java
new file mode 100644
index 0000000..c1aaf19
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/TimelineStorageUtils.java
@@ -0,0 +1,475 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.hadoop.yarn.server.timelineservice.storage.common;
+
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.SortedSet;
+import java.util.Map.Entry;
+
+import org.apache.hadoop.classification.InterfaceAudience.Public;
+import org.apache.hadoop.classification.InterfaceStability.Unstable;
+import org.apache.hadoop.hbase.Tag;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity;
+import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntityType;
+import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent;
+import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetric;
+import org.apache.hadoop.yarn.server.metrics.ApplicationMetricsConstants;
+import org.apache.hadoop.yarn.server.timelineservice.storage.flow.AggregationCompactionDimension;
+import org.apache.hadoop.yarn.server.timelineservice.storage.flow.AggregationOperation;
+import org.apache.hadoop.yarn.server.timelineservice.storage.flow.Attribute;
+import org.apache.hadoop.yarn.util.ConverterUtils;
+
+/**
+ * A bunch of utility functions used across TimelineReader and TimelineWriter.
+ */
+@Public
+@Unstable
+public class TimelineStorageUtils {
+
+ /** empty bytes */
+ public static final byte[] EMPTY_BYTES = new byte[0];
+
+ /** indicator for no limits for splitting */
+ public static final int NO_LIMIT_SPLIT = -1;
+
+ /** milliseconds in one day */
+ public static final long MILLIS_ONE_DAY = 86400000L;
+
+ /**
+ * Splits the source array into multiple array segments using the given
+ * separator, up to a maximum of count items. This will naturally produce
+ * copied byte arrays for each of the split segments. To identify the split
+ * ranges without the array copies, see {@link #splitRanges(byte[], byte[])}.
+ *
+ * @param source
+ * @param separator
+ * @return byte[] array after splitting the source
+ */
+ public static byte[][] split(byte[] source, byte[] separator) {
+ return split(source, separator, NO_LIMIT_SPLIT);
+ }
+
+ /**
+ * Splits the source array into multiple array segments using the given
+ * separator, up to a maximum of count items. This will naturally produce
+ * copied byte arrays for each of the split segments. To identify the split
+ * ranges without the array copies, see {@link #splitRanges(byte[], byte[])}.
+ *
+ * @param source
+ * @param separator
+ * @param limit a non-positive value indicates no limit on number of segments.
+ * @return byte[][] after splitting the input source
+ */
+ public static byte[][] split(byte[] source, byte[] separator, int limit) {
+ List<Range> segments = splitRanges(source, separator, limit);
+
+ byte[][] splits = new byte[segments.size()][];
+ for (int i = 0; i < segments.size(); i++) {
+ Range r = segments.get(i);
+ byte[] tmp = new byte[r.length()];
+ if (tmp.length > 0) {
+ System.arraycopy(source, r.start(), tmp, 0, r.length());
+ }
+ splits[i] = tmp;
+ }
+ return splits;
+ }
+
+ /**
+ * Returns a list of ranges identifying [start, end) -- closed, open --
+ * positions within the source byte array that would be split using the
+ * separator byte array.
+ */
+ public static List<Range> splitRanges(byte[] source, byte[] separator) {
+ return splitRanges(source, separator, NO_LIMIT_SPLIT);
+ }
+
+ /**
+ * Returns a list of ranges identifying [start, end) -- closed, open --
+ * positions within the source byte array that would be split using the
+ * separator byte array.
+ *
+ * @param source the source data
+ * @param separator the separator pattern to look for
+ * @param limit the maximum number of splits to identify in the source
+ */
+ public static List<Range> splitRanges(byte[] source, byte[] separator,
+ int limit) {
+ List<Range> segments = new ArrayList<Range>();
+ if ((source == null) || (separator == null)) {
+ return segments;
+ }
+ int start = 0;
+ itersource: for (int i = 0; i < source.length; i++) {
+ for (int j = 0; j < separator.length; j++) {
+ if (source[i + j] != separator[j]) {
+ continue itersource;
+ }
+ }
+ // all separator elements matched
+ if (limit > 0 && segments.size() >= (limit - 1)) {
+ // everything else goes in one final segment
+ break;
+ }
+ segments.add(new Range(start, i));
+ start = i + separator.length;
+ // i will be incremented again in outer for loop
+ i += separator.length - 1;
+ }
+ // add in remaining to a final range
+ if (start <= source.length) {
+ segments.add(new Range(start, source.length));
+ }
+ return segments;
+ }
+
+ /**
+ * Converts a timestamp into it's inverse timestamp to be used in (row) keys
+ * where we want to have the most recent timestamp in the top of the table
+ * (scans start at the most recent timestamp first).
+ *
+ * @param key value to be inverted so that the latest version will be first in
+ * a scan.
+ * @return inverted long
+ */
+ public static long invertLong(long key) {
+ return Long.MAX_VALUE - key;
+ }
+
+ /**
+ * Converts an int into it's inverse int to be used in (row) keys
+ * where we want to have the largest int value in the top of the table
+ * (scans start at the largest int first).
+ *
+ * @param key value to be inverted so that the latest version will be first in
+ * a scan.
+ * @return inverted int
+ */
+ public static int invertInt(int key) {
+ return Integer.MAX_VALUE - key;
+ }
+
+
+ /**
+ * Converts/encodes a string app Id into a byte representation for (row) keys.
+ * For conversion, we extract cluster timestamp and sequence id from the
+ * string app id (calls {@link ConverterUtils#toApplicationId(String)} for
+ * conversion) and then store it in a byte array of length 12 (8 bytes (long)
+ * for cluster timestamp followed 4 bytes(int) for sequence id). Both cluster
+ * timestamp and sequence id are inverted so that the most recent cluster
+ * timestamp and highest sequence id appears first in the table (i.e.
+ * application id appears in a descending order).
+ *
+ * @param appIdStr application id in string format i.e.
+ * application_{cluster timestamp}_{sequence id with min 4 digits}
+ *
+ * @return encoded byte representation of app id.
+ */
+ public static byte[] encodeAppId(String appIdStr) {
+ ApplicationId appId = ConverterUtils.toApplicationId(appIdStr);
+ byte[] appIdBytes = new byte[Bytes.SIZEOF_LONG + Bytes.SIZEOF_INT];
+ byte[] clusterTs = Bytes.toBytes(invertLong(appId.getClusterTimestamp()));
+ System.arraycopy(clusterTs, 0, appIdBytes, 0, Bytes.SIZEOF_LONG);
+ byte[] seqId = Bytes.toBytes(invertInt(appId.getId()));
+ System.arraycopy(seqId, 0, appIdBytes, Bytes.SIZEOF_LONG, Bytes.SIZEOF_INT);
+ return appIdBytes;
+ }
+
+ /**
+ * Converts/decodes a 12 byte representation of app id for (row) keys to an
+ * app id in string format which can be returned back to client.
+ * For decoding, 12 bytes are interpreted as 8 bytes of inverted cluster
+ * timestamp(long) followed by 4 bytes of inverted sequence id(int). Calls
+ * {@link ApplicationId#toString} to generate string representation of app id.
+ *
+ * @param appIdBytes application id in byte representation.
+ *
+ * @return decoded app id in string format.
+ */
+ public static String decodeAppId(byte[] appIdBytes) {
+ if (appIdBytes.length != (Bytes.SIZEOF_LONG + Bytes.SIZEOF_INT)) {
+ throw new IllegalArgumentException("Invalid app id in byte format");
+ }
+ long clusterTs = invertLong(Bytes.toLong(appIdBytes, 0, Bytes.SIZEOF_LONG));
+ int seqId =
+ invertInt(Bytes.toInt(appIdBytes, Bytes.SIZEOF_LONG, Bytes.SIZEOF_INT));
+ return ApplicationId.newInstance(clusterTs, seqId).toString();
+ }
+
+ /**
+ * returns the timestamp of that day's start (which is midnight 00:00:00 AM)
+ * for a given input timestamp
+ *
+ * @param ts
+ * @return timestamp of that day's beginning (midnight)
+ */
+ public static long getTopOfTheDayTimestamp(long ts) {
+ long dayTimestamp = ts - (ts % MILLIS_ONE_DAY);
+ return dayTimestamp;
+ }
+
+ /**
+ * Combines the input array of attributes and the input aggregation operation
+ * into a new array of attributes.
+ *
+ * @param attributes
+ * @param aggOp
+ * @return array of combined attributes
+ */
+ public static Attribute[] combineAttributes(Attribute[] attributes,
+ AggregationOperation aggOp) {
+ int newLength = getNewLengthCombinedAttributes(attributes, aggOp);
+ Attribute[] combinedAttributes = new Attribute[newLength];
+
+ if (attributes != null) {
+ System.arraycopy(attributes, 0, combinedAttributes, 0, attributes.length);
+ }
+
+ if (aggOp != null) {
+ Attribute a2 = aggOp.getAttribute();
+ combinedAttributes[newLength - 1] = a2;
+ }
+ return combinedAttributes;
+ }
+
+ /**
+ * Returns a number for the new array size. The new array is the combination
+ * of input array of attributes and the input aggregation operation.
+ *
+ * @param attributes
+ * @param aggOp
+ * @return the size for the new array
+ */
+ private static int getNewLengthCombinedAttributes(Attribute[] attributes,
+ AggregationOperation aggOp) {
+ int oldLength = getAttributesLength(attributes);
+ int aggLength = getAppOpLength(aggOp);
+ return oldLength + aggLength;
+ }
+
+ private static int getAppOpLength(AggregationOperation aggOp) {
+ if (aggOp != null) {
+ return 1;
+ }
+ return 0;
+ }
+
+ private static int getAttributesLength(Attribute[] attributes) {
+ if (attributes != null) {
+ return attributes.length;
+ }
+ return 0;
+ }
+
+ /**
+ * checks if an application has finished
+ *
+ * @param te
+ * @return true if application has finished else false
+ */
+ public static boolean isApplicationFinished(TimelineEntity te) {
+ SortedSet<TimelineEvent> allEvents = te.getEvents();
+ if ((allEvents != null) && (allEvents.size() > 0)) {
+ TimelineEvent event = allEvents.last();
+ if (event.getId().equals(ApplicationMetricsConstants.FINISHED_EVENT_TYPE)) {
+ return true;
+ }
+ }
+ return false;
+ }
+
+ /**
+ * get the time at which an app finished
+ *
+ * @param te
+ * @return true if application has finished else false
+ */
+ public static long getApplicationFinishedTime(TimelineEntity te) {
+ SortedSet<TimelineEvent> allEvents = te.getEvents();
+ if ((allEvents != null) && (allEvents.size() > 0)) {
+ TimelineEvent event = allEvents.last();
+ if (event.getId().equals(ApplicationMetricsConstants.FINISHED_EVENT_TYPE)) {
+ return event.getTimestamp();
+ }
+ }
+ return 0l;
+ }
+
+ /**
+ * Checks if the input TimelineEntity object is an ApplicationEntity.
+ *
+ * @param te
+ * @return true if input is an ApplicationEntity, false otherwise
+ */
+ public static boolean isApplicationEntity(TimelineEntity te) {
+ return te.getType().equals(TimelineEntityType.YARN_APPLICATION.toString());
+ }
+
+ /**
+ * Checks for the APPLICATION_CREATED event.
+ *
+ * @param te
+ * @return true is application event exists, false otherwise
+ */
+ public static boolean isApplicationCreated(TimelineEntity te) {
+ if (isApplicationEntity(te)) {
+ for (TimelineEvent event : te.getEvents()) {
+ if (event.getId()
+ .equals(ApplicationMetricsConstants.CREATED_EVENT_TYPE)) {
+ return true;
+ }
+ }
+ }
+ return false;
+ }
+
+ /**
+ * Returns the first seen aggregation operation as seen in the list of input
+ * tags or null otherwise
+ *
+ * @param tags
+ * @return AggregationOperation
+ */
+ public static AggregationOperation getAggregationOperationFromTagsList(
+ List<Tag> tags) {
+ for (AggregationOperation aggOp : AggregationOperation.values()) {
+ for (Tag tag : tags) {
+ if (tag.getType() == aggOp.getTagType()) {
+ return aggOp;
+ }
+ }
+ }
+ return null;
+ }
+
+ /**
+ * Creates a {@link Tag} from the input attribute.
+ *
+ * @param attribute
+ * @return Tag
+ */
+ public static Tag getTagFromAttribute(Entry<String, byte[]> attribute) {
+ // attribute could be either an Aggregation Operation or
+ // an Aggregation Dimension
+ // Get the Tag type from either
+ AggregationOperation aggOp = AggregationOperation
+ .getAggregationOperation(attribute.getKey());
+ if (aggOp != null) {
+ Tag t = new Tag(aggOp.getTagType(), attribute.getValue());
+ return t;
+ }
+
+ AggregationCompactionDimension aggCompactDim = AggregationCompactionDimension
+ .getAggregationCompactionDimension(attribute.getKey());
+ if (aggCompactDim != null) {
+ Tag t = new Tag(aggCompactDim.getTagType(), attribute.getValue());
+ return t;
+ }
+ return null;
+ }
+
+ /**
+ *
+ * @param entityRelations the relations of an entity
+ * @param relationFilters the relations for filtering
+ * @return a boolean flag to indicate if both match
+ */
+ public static boolean matchRelations(
+ Map<String, Set<String>> entityRelations,
+ Map<String, Set<String>> relationFilters) {
+ for (Map.Entry<String, Set<String>> relation : relationFilters.entrySet()) {
+ Set<String> ids = entityRelations.get(relation.getKey());
+ if (ids == null) {
+ return false;
+ }
+ for (String id : relation.getValue()) {
+ if (!ids.contains(id)) {
+ return false;
+ }
+ }
+ }
+ return true;
+ }
+
+ /**
+ *
+ * @param map the map of key/value pairs in an entity
+ * @param filters the map of key/value pairs for filtering
+ * @return a boolean flag to indicate if both match
+ */
+ public static boolean matchFilters(Map<String, ? extends Object> map,
+ Map<String, ? extends Object> filters) {
+ for (Map.Entry<String, ? extends Object> filter : filters.entrySet()) {
+ Object value = map.get(filter.getKey());
+ if (value == null) {
+ return false;
+ }
+ if (!value.equals(filter.getValue())) {
+ return false;
+ }
+ }
+ return true;
+ }
+
+ /**
+ *
+ * @param entityEvents the set of event objects in an entity
+ * @param eventFilters the set of event Ids for filtering
+ * @return a boolean flag to indicate if both match
+ */
+ public static boolean matchEventFilters(Set<TimelineEvent> entityEvents,
+ Set<String> eventFilters) {
+ Set<String> eventIds = new HashSet<String>();
+ for (TimelineEvent event : entityEvents) {
+ eventIds.add(event.getId());
+ }
+ for (String eventFilter : eventFilters) {
+ if (!eventIds.contains(eventFilter)) {
+ return false;
+ }
+ }
+ return true;
+ }
+
+ /**
+ *
+ * @param metrics the set of metric objects in an entity
+ * @param metricFilters the set of metric Ids for filtering
+ * @return a boolean flag to indicate if both match
+ */
+ public static boolean matchMetricFilters(Set<TimelineMetric> metrics,
+ Set<String> metricFilters) {
+ Set<String> metricIds = new HashSet<String>();
+ for (TimelineMetric metric : metrics) {
+ metricIds.add(metric.getId());
+ }
+
+ for (String metricFilter : metricFilters) {
+ if (!metricIds.contains(metricFilter)) {
+ return false;
+ }
+ }
+ return true;
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/fdaa1e4e/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/TimelineWriterUtils.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/TimelineWriterUtils.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/TimelineWriterUtils.java
deleted file mode 100644
index 371371a..0000000
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/TimelineWriterUtils.java
+++ /dev/null
@@ -1,328 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.yarn.server.timelineservice.storage.common;
-
-import java.util.ArrayList;
-import java.util.List;
-import java.util.SortedSet;
-import java.util.Map.Entry;
-
-import org.apache.hadoop.classification.InterfaceAudience;
-import org.apache.hadoop.classification.InterfaceStability;
-import org.apache.hadoop.hbase.Tag;
-import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity;
-import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntityType;
-import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent;
-import org.apache.hadoop.yarn.server.metrics.ApplicationMetricsConstants;
-import org.apache.hadoop.yarn.server.timelineservice.storage.flow.AggregationCompactionDimension;
-import org.apache.hadoop.yarn.server.timelineservice.storage.flow.AggregationOperation;
-import org.apache.hadoop.yarn.server.timelineservice.storage.flow.Attribute;
-
-/**
- * bunch of utility functions used across TimelineWriter classes
- */
-@InterfaceAudience.Public
-@InterfaceStability.Unstable
-public class TimelineWriterUtils {
-
- /** empty bytes */
- public static final byte[] EMPTY_BYTES = new byte[0];
-
- /** indicator for no limits for splitting */
- public static final int NO_LIMIT_SPLIT = -1;
-
- /** milliseconds in one day */
- public static final long MILLIS_ONE_DAY = 86400000L;
-
- /**
- * Splits the source array into multiple array segments using the given
- * separator, up to a maximum of count items. This will naturally produce
- * copied byte arrays for each of the split segments. To identify the split
- * ranges without the array copies, see
- * {@link TimelineWriterUtils#splitRanges(byte[], byte[])}.
- *
- * @param source
- * @param separator
- * @return byte[] array after splitting the source
- */
- public static byte[][] split(byte[] source, byte[] separator) {
- return split(source, separator, NO_LIMIT_SPLIT);
- }
-
- /**
- * Splits the source array into multiple array segments using the given
- * separator, up to a maximum of count items. This will naturally produce
- * copied byte arrays for each of the split segments. To identify the split
- * ranges without the array copies, see
- * {@link TimelineWriterUtils#splitRanges(byte[], byte[])}.
- *
- * @param source
- * @param separator
- * @param limit a non-positive value indicates no limit on number of segments.
- * @return byte[][] after splitting the input source
- */
- public static byte[][] split(byte[] source, byte[] separator, int limit) {
- List<Range> segments = splitRanges(source, separator, limit);
-
- byte[][] splits = new byte[segments.size()][];
- for (int i = 0; i < segments.size(); i++) {
- Range r = segments.get(i);
- byte[] tmp = new byte[r.length()];
- if (tmp.length > 0) {
- System.arraycopy(source, r.start(), tmp, 0, r.length());
- }
- splits[i] = tmp;
- }
- return splits;
- }
-
- /**
- * Returns a list of ranges identifying [start, end) -- closed, open --
- * positions within the source byte array that would be split using the
- * separator byte array.
- */
- public static List<Range> splitRanges(byte[] source, byte[] separator) {
- return splitRanges(source, separator, NO_LIMIT_SPLIT);
- }
-
- /**
- * Returns a list of ranges identifying [start, end) -- closed, open --
- * positions within the source byte array that would be split using the
- * separator byte array.
- *
- * @param source the source data
- * @param separator the separator pattern to look for
- * @param limit the maximum number of splits to identify in the source
- */
- public static List<Range> splitRanges(byte[] source, byte[] separator,
- int limit) {
- List<Range> segments = new ArrayList<Range>();
- if ((source == null) || (separator == null)) {
- return segments;
- }
- int start = 0;
- itersource: for (int i = 0; i < source.length; i++) {
- for (int j = 0; j < separator.length; j++) {
- if (source[i + j] != separator[j]) {
- continue itersource;
- }
- }
- // all separator elements matched
- if (limit > 0 && segments.size() >= (limit - 1)) {
- // everything else goes in one final segment
- break;
- }
-
- segments.add(new Range(start, i));
- start = i + separator.length;
- // i will be incremented again in outer for loop
- i += separator.length - 1;
- }
- // add in remaining to a final range
- if (start <= source.length) {
- segments.add(new Range(start, source.length));
- }
- return segments;
- }
-
- /**
- * Converts a timestamp into it's inverse timestamp to be used in (row) keys
- * where we want to have the most recent timestamp in the top of the table
- * (scans start at the most recent timestamp first).
- *
- * @param key value to be inverted so that the latest version will be first in
- * a scan.
- * @return inverted long
- */
- public static long invert(Long key) {
- return Long.MAX_VALUE - key;
- }
-
- /**
- * returns the timestamp of that day's start (which is midnight 00:00:00 AM)
- * for a given input timestamp
- *
- * @param ts
- * @return timestamp of that day's beginning (midnight)
- */
- public static long getTopOfTheDayTimestamp(long ts) {
- long dayTimestamp = ts - (ts % MILLIS_ONE_DAY);
- return dayTimestamp;
- }
-
- /**
- * Combines the input array of attributes and the input aggregation operation
- * into a new array of attributes.
- *
- * @param attributes
- * @param aggOp
- * @return array of combined attributes
- */
- public static Attribute[] combineAttributes(Attribute[] attributes,
- AggregationOperation aggOp) {
- int newLength = getNewLengthCombinedAttributes(attributes, aggOp);
- Attribute[] combinedAttributes = new Attribute[newLength];
-
- if (attributes != null) {
- System.arraycopy(attributes, 0, combinedAttributes, 0, attributes.length);
- }
-
- if (aggOp != null) {
- Attribute a2 = aggOp.getAttribute();
- combinedAttributes[newLength - 1] = a2;
- }
- return combinedAttributes;
- }
-
- /**
- * Returns a number for the new array size. The new array is the combination
- * of input array of attributes and the input aggregation operation.
- *
- * @param attributes
- * @param aggOp
- * @return the size for the new array
- */
- private static int getNewLengthCombinedAttributes(Attribute[] attributes,
- AggregationOperation aggOp) {
- int oldLength = getAttributesLength(attributes);
- int aggLength = getAppOpLength(aggOp);
- return oldLength + aggLength;
- }
-
- private static int getAppOpLength(AggregationOperation aggOp) {
- if (aggOp != null) {
- return 1;
- }
- return 0;
- }
-
- private static int getAttributesLength(Attribute[] attributes) {
- if (attributes != null) {
- return attributes.length;
- }
- return 0;
- }
-
- /**
- * checks if an application has finished
- *
- * @param te
- * @return true if application has finished else false
- */
- public static boolean isApplicationFinished(TimelineEntity te) {
- SortedSet<TimelineEvent> allEvents = te.getEvents();
- if ((allEvents != null) && (allEvents.size() > 0)) {
- TimelineEvent event = allEvents.last();
- if (event.getId().equals(ApplicationMetricsConstants.FINISHED_EVENT_TYPE)) {
- return true;
- }
- }
- return false;
- }
-
- /**
- * get the time at which an app finished
- *
- * @param te
- * @return true if application has finished else false
- */
- public static long getApplicationFinishedTime(TimelineEntity te) {
- SortedSet<TimelineEvent> allEvents = te.getEvents();
- if ((allEvents != null) && (allEvents.size() > 0)) {
- TimelineEvent event = allEvents.last();
- if (event.getId().equals(ApplicationMetricsConstants.FINISHED_EVENT_TYPE)) {
- return event.getTimestamp();
- }
- }
- return 0l;
- }
-
- /**
- * Checks if the input TimelineEntity object is an ApplicationEntity.
- *
- * @param te
- * @return true if input is an ApplicationEntity, false otherwise
- */
- public static boolean isApplicationEntity(TimelineEntity te) {
- return te.getType().equals(TimelineEntityType.YARN_APPLICATION.toString());
- }
-
- /**
- * Checks for the APPLICATION_CREATED event.
- *
- * @param te
- * @return true is application event exists, false otherwise
- */
- public static boolean isApplicationCreated(TimelineEntity te) {
- if (isApplicationEntity(te)) {
- for (TimelineEvent event : te.getEvents()) {
- if (event.getId()
- .equals(ApplicationMetricsConstants.CREATED_EVENT_TYPE)) {
- return true;
- }
- }
- }
- return false;
- }
-
- /**
- * Returns the first seen aggregation operation as seen in the list of input
- * tags or null otherwise
- *
- * @param tags
- * @return AggregationOperation
- */
- public static AggregationOperation getAggregationOperationFromTagsList(
- List<Tag> tags) {
- for (AggregationOperation aggOp : AggregationOperation.values()) {
- for (Tag tag : tags) {
- if (tag.getType() == aggOp.getTagType()) {
- return aggOp;
- }
- }
- }
- return null;
- }
-
- /**
- * Creates a {@link Tag} from the input attribute.
- *
- * @param attribute
- * @return Tag
- */
- public static Tag getTagFromAttribute(Entry<String, byte[]> attribute) {
- // attribute could be either an Aggregation Operation or
- // an Aggregation Dimension
- // Get the Tag type from either
- AggregationOperation aggOp = AggregationOperation
- .getAggregationOperation(attribute.getKey());
- if (aggOp != null) {
- Tag t = new Tag(aggOp.getTagType(), attribute.getValue());
- return t;
- }
-
- AggregationCompactionDimension aggCompactDim = AggregationCompactionDimension
- .getAggregationCompactionDimension(attribute.getKey());
- if (aggCompactDim != null) {
- Tag t = new Tag(aggCompactDim.getTagType(), attribute.getValue());
- return t;
- }
- return null;
- }
-
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hadoop/blob/fdaa1e4e/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/entity/EntityRowKey.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/entity/EntityRowKey.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/entity/EntityRowKey.java
index 6a534ed73..e0413c0 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/entity/EntityRowKey.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/entity/EntityRowKey.java
@@ -19,7 +19,7 @@ package org.apache.hadoop.yarn.server.timelineservice.storage.entity;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.yarn.server.timelineservice.storage.common.Separator;
-import org.apache.hadoop.yarn.server.timelineservice.storage.common.TimelineWriterUtils;
+import org.apache.hadoop.yarn.server.timelineservice.storage.common.TimelineStorageUtils;
/**
* Represents a rowkey for the entity table.
@@ -90,9 +90,9 @@ public class EntityRowKey {
flowId));
// Note that flowRunId is a long, so we can't encode them all at the same
// time.
- byte[] second = Bytes.toBytes(TimelineWriterUtils.invert(flowRunId));
- byte[] third = Bytes.toBytes(Separator.QUALIFIERS.joinEncoded(appId));
- return Separator.QUALIFIERS.join(first, second, third);
+ byte[] second = Bytes.toBytes(TimelineStorageUtils.invertLong(flowRunId));
+ byte[] third = TimelineStorageUtils.encodeAppId(appId);
+ return Separator.QUALIFIERS.join(first, second, third, new byte[0]);
}
/**
@@ -114,10 +114,11 @@ public class EntityRowKey {
flowId));
// Note that flowRunId is a long, so we can't encode them all at the same
// time.
- byte[] second = Bytes.toBytes(TimelineWriterUtils.invert(flowRunId));
- byte[] third =
- Bytes.toBytes(Separator.QUALIFIERS.joinEncoded(appId, entityType, ""));
- return Separator.QUALIFIERS.join(first, second, third);
+ byte[] second = Bytes.toBytes(TimelineStorageUtils.invertLong(flowRunId));
+ byte[] third = TimelineStorageUtils.encodeAppId(appId);
+ byte[] fourth =
+ Bytes.toBytes(Separator.QUALIFIERS.joinEncoded(entityType, ""));
+ return Separator.QUALIFIERS.join(first, second, third, fourth);
}
/**
@@ -141,11 +142,11 @@ public class EntityRowKey {
flowId));
// Note that flowRunId is a long, so we can't encode them all at the same
// time.
- byte[] second = Bytes.toBytes(TimelineWriterUtils.invert(flowRunId));
- byte[] third =
- Bytes.toBytes(Separator.QUALIFIERS.joinEncoded(appId, entityType,
- entityId));
- return Separator.QUALIFIERS.join(first, second, third);
+ byte[] second = Bytes.toBytes(TimelineStorageUtils.invertLong(flowRunId));
+ byte[] third = TimelineStorageUtils.encodeAppId(appId);
+ byte[] fourth =
+ Bytes.toBytes(Separator.QUALIFIERS.joinEncoded(entityType, entityId));
+ return Separator.QUALIFIERS.join(first, second, third, fourth);
}
/**
@@ -166,9 +167,8 @@ public class EntityRowKey {
String flowId =
Separator.QUALIFIERS.decode(Bytes.toString(rowKeyComponents[2]));
long flowRunId =
- TimelineWriterUtils.invert(Bytes.toLong(rowKeyComponents[3]));
- String appId =
- Separator.QUALIFIERS.decode(Bytes.toString(rowKeyComponents[4]));
+ TimelineStorageUtils.invertLong(Bytes.toLong(rowKeyComponents[3]));
+ String appId = TimelineStorageUtils.decodeAppId(rowKeyComponents[4]);
String entityType =
Separator.QUALIFIERS.decode(Bytes.toString(rowKeyComponents[5]));
String entityId =
http://git-wip-us.apache.org/repos/asf/hadoop/blob/fdaa1e4e/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowActivityColumnPrefix.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowActivityColumnPrefix.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowActivityColumnPrefix.java
index b899e5c..38c0f3f 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowActivityColumnPrefix.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowActivityColumnPrefix.java
@@ -27,7 +27,7 @@ import org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnFamily
import org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnHelper;
import org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnPrefix;
import org.apache.hadoop.yarn.server.timelineservice.storage.common.Separator;
-import org.apache.hadoop.yarn.server.timelineservice.storage.common.TimelineWriterUtils;
+import org.apache.hadoop.yarn.server.timelineservice.storage.common.TimelineStorageUtils;
import org.apache.hadoop.yarn.server.timelineservice.storage.common.TypedBufferedMutator;
/**
@@ -114,7 +114,7 @@ public enum FlowActivityColumnPrefix implements ColumnPrefix<FlowActivityTable>
byte[] columnQualifier = ColumnHelper.getColumnQualifier(
this.columnPrefixBytes, qualifier);
- Attribute[] combinedAttributes = TimelineWriterUtils.combineAttributes(
+ Attribute[] combinedAttributes = TimelineStorageUtils.combineAttributes(
attributes, this.aggOp);
column.store(rowKey, tableMutator, columnQualifier, timestamp, inputValue,
combinedAttributes);
@@ -235,7 +235,7 @@ public enum FlowActivityColumnPrefix implements ColumnPrefix<FlowActivityTable>
byte[] columnQualifier = ColumnHelper.getColumnQualifier(
this.columnPrefixBytes, qualifier);
- Attribute[] combinedAttributes = TimelineWriterUtils.combineAttributes(
+ Attribute[] combinedAttributes = TimelineStorageUtils.combineAttributes(
attributes, this.aggOp);
column.store(rowKey, tableMutator, columnQualifier, null, inputValue,
combinedAttributes);
http://git-wip-us.apache.org/repos/asf/hadoop/blob/fdaa1e4e/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowActivityRowKey.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowActivityRowKey.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowActivityRowKey.java
index 18ca599..f7841e0 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowActivityRowKey.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowActivityRowKey.java
@@ -19,7 +19,7 @@ package org.apache.hadoop.yarn.server.timelineservice.storage.flow;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.yarn.server.timelineservice.storage.common.Separator;
-import org.apache.hadoop.yarn.server.timelineservice.storage.common.TimelineWriterUtils;
+import org.apache.hadoop.yarn.server.timelineservice.storage.common.TimelineStorageUtils;
/**
* Represents a rowkey for the flow activity table.
@@ -71,7 +71,7 @@ public class FlowActivityRowKey {
*/
public static byte[] getRowKey(String clusterId, String userId,
String flowId) {
- long dayTs = TimelineWriterUtils.getTopOfTheDayTimestamp(System
+ long dayTs = TimelineStorageUtils.getTopOfTheDayTimestamp(System
.currentTimeMillis());
return getRowKey(clusterId, dayTs, userId, flowId);
}
@@ -90,7 +90,7 @@ public class FlowActivityRowKey {
String flowId) {
return Separator.QUALIFIERS.join(
Bytes.toBytes(Separator.QUALIFIERS.encode(clusterId)),
- Bytes.toBytes(TimelineWriterUtils.invert(dayTs)),
+ Bytes.toBytes(TimelineStorageUtils.invertLong(dayTs)),
Bytes.toBytes(Separator.QUALIFIERS.encode(userId)),
Bytes.toBytes(Separator.QUALIFIERS.encode(flowId)));
}
@@ -108,7 +108,8 @@ public class FlowActivityRowKey {
String clusterId = Separator.QUALIFIERS.decode(Bytes
.toString(rowKeyComponents[0]));
- long dayTs = TimelineWriterUtils.invert(Bytes.toLong(rowKeyComponents[1]));
+ long dayTs =
+ TimelineStorageUtils.invertLong(Bytes.toLong(rowKeyComponents[1]));
String userId = Separator.QUALIFIERS.decode(Bytes
.toString(rowKeyComponents[2]));
String flowId = Separator.QUALIFIERS.decode(Bytes
http://git-wip-us.apache.org/repos/asf/hadoop/blob/fdaa1e4e/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunColumn.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunColumn.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunColumn.java
index ad30add..5079cc0 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunColumn.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunColumn.java
@@ -25,7 +25,7 @@ import org.apache.hadoop.yarn.server.timelineservice.storage.common.Column;
import org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnFamily;
import org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnHelper;
import org.apache.hadoop.yarn.server.timelineservice.storage.common.Separator;
-import org.apache.hadoop.yarn.server.timelineservice.storage.common.TimelineWriterUtils;
+import org.apache.hadoop.yarn.server.timelineservice.storage.common.TimelineStorageUtils;
import org.apache.hadoop.yarn.server.timelineservice.storage.common.TypedBufferedMutator;
/**
@@ -97,7 +97,7 @@ public enum FlowRunColumn implements Column<FlowRunTable> {
TypedBufferedMutator<FlowRunTable> tableMutator, Long timestamp,
Object inputValue, Attribute... attributes) throws IOException {
- Attribute[] combinedAttributes = TimelineWriterUtils.combineAttributes(
+ Attribute[] combinedAttributes = TimelineStorageUtils.combineAttributes(
attributes, aggOp);
column.store(rowKey, tableMutator, columnQualifierBytes, timestamp,
inputValue, combinedAttributes);
http://git-wip-us.apache.org/repos/asf/hadoop/blob/fdaa1e4e/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunColumnPrefix.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunColumnPrefix.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunColumnPrefix.java
index d55f510..b090bba 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunColumnPrefix.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunColumnPrefix.java
@@ -27,7 +27,7 @@ import org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnFamily
import org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnHelper;
import org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnPrefix;
import org.apache.hadoop.yarn.server.timelineservice.storage.common.Separator;
-import org.apache.hadoop.yarn.server.timelineservice.storage.common.TimelineWriterUtils;
+import org.apache.hadoop.yarn.server.timelineservice.storage.common.TimelineStorageUtils;
import org.apache.hadoop.yarn.server.timelineservice.storage.common.TypedBufferedMutator;
/**
@@ -112,7 +112,7 @@ public enum FlowRunColumnPrefix implements ColumnPrefix<FlowRunTable> {
byte[] columnQualifier = ColumnHelper.getColumnQualifier(
this.columnPrefixBytes, qualifier);
- Attribute[] combinedAttributes = TimelineWriterUtils.combineAttributes(
+ Attribute[] combinedAttributes = TimelineStorageUtils.combineAttributes(
attributes, this.aggOp);
column.store(rowKey, tableMutator, columnQualifier, timestamp, inputValue,
combinedAttributes);
@@ -140,7 +140,7 @@ public enum FlowRunColumnPrefix implements ColumnPrefix<FlowRunTable> {
byte[] columnQualifier = ColumnHelper.getColumnQualifier(
this.columnPrefixBytes, qualifier);
- Attribute[] combinedAttributes = TimelineWriterUtils.combineAttributes(
+ Attribute[] combinedAttributes = TimelineStorageUtils.combineAttributes(
attributes, this.aggOp);
column.store(rowKey, tableMutator, columnQualifier, timestamp, inputValue,
combinedAttributes);
http://git-wip-us.apache.org/repos/asf/hadoop/blob/fdaa1e4e/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunCoprocessor.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunCoprocessor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunCoprocessor.java
index f743e5e..1984157 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunCoprocessor.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunCoprocessor.java
@@ -43,7 +43,7 @@ import org.apache.hadoop.hbase.regionserver.HRegion;
import org.apache.hadoop.hbase.regionserver.RegionScanner;
import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.hadoop.yarn.server.timelineservice.storage.common.TimelineWriterUtils;
+import org.apache.hadoop.yarn.server.timelineservice.storage.common.TimelineStorageUtils;
import org.apache.hadoop.yarn.server.timelineservice.storage.common.TimestampGenerator;
public class FlowRunCoprocessor extends BaseRegionObserver {
@@ -89,7 +89,7 @@ public class FlowRunCoprocessor extends BaseRegionObserver {
List<Tag> tags = new ArrayList<>();
if ((attributes != null) && (attributes.size() > 0)) {
for (Map.Entry<String, byte[]> attribute : attributes.entrySet()) {
- Tag t = TimelineWriterUtils.getTagFromAttribute(attribute);
+ Tag t = TimelineStorageUtils.getTagFromAttribute(attribute);
tags.add(t);
}
byte[] tagByteArray = Tag.fromList(tags);
http://git-wip-us.apache.org/repos/asf/hadoop/blob/fdaa1e4e/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunRowKey.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunRowKey.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunRowKey.java
index 880d481..7ed3651 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunRowKey.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunRowKey.java
@@ -19,7 +19,7 @@ package org.apache.hadoop.yarn.server.timelineservice.storage.flow;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.yarn.server.timelineservice.storage.common.Separator;
-import org.apache.hadoop.yarn.server.timelineservice.storage.common.TimelineWriterUtils;
+import org.apache.hadoop.yarn.server.timelineservice.storage.common.TimelineStorageUtils;
/**
* Represents a rowkey for the flow run table.
@@ -70,7 +70,7 @@ public class FlowRunRowKey {
userId, flowId));
// Note that flowRunId is a long, so we can't encode them all at the same
// time.
- byte[] second = Bytes.toBytes(TimelineWriterUtils.invert(flowRunId));
+ byte[] second = Bytes.toBytes(TimelineStorageUtils.invertLong(flowRunId));
return Separator.QUALIFIERS.join(first, second);
}
@@ -92,7 +92,7 @@ public class FlowRunRowKey {
String flowId =
Separator.QUALIFIERS.decode(Bytes.toString(rowKeyComponents[2]));
long flowRunId =
- TimelineWriterUtils.invert(Bytes.toLong(rowKeyComponents[3]));
+ TimelineStorageUtils.invertLong(Bytes.toLong(rowKeyComponents[3]));
return new FlowRunRowKey(clusterId, userId, flowId, flowRunId);
}
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/fdaa1e4e/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowScanner.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowScanner.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowScanner.java
index 651bb3a..a537891 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowScanner.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowScanner.java
@@ -40,7 +40,7 @@ import org.apache.hadoop.hbase.regionserver.RegionScanner;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.Bytes.ByteArrayComparator;
import org.apache.hadoop.yarn.server.timeline.GenericObjectMapper;
-import org.apache.hadoop.yarn.server.timelineservice.storage.common.TimelineWriterUtils;
+import org.apache.hadoop.yarn.server.timelineservice.storage.common.TimelineStorageUtils;
/**
* Invoked via the coprocessor when a Get or a Scan is issued for flow run
@@ -136,7 +136,7 @@ class FlowScanner implements RegionScanner, Closeable {
// So all cells in one qualifier come one after the other before we see the
// next column qualifier
ByteArrayComparator comp = new ByteArrayComparator();
- byte[] currentColumnQualifier = TimelineWriterUtils.EMPTY_BYTES;
+ byte[] currentColumnQualifier = TimelineStorageUtils.EMPTY_BYTES;
AggregationOperation currentAggOp = null;
SortedSet<Cell> currentColumnCells = new TreeSet<>(KeyValue.COMPARATOR);
Set<String> alreadySeenAggDim = new HashSet<>();
@@ -163,7 +163,7 @@ class FlowScanner implements RegionScanner, Closeable {
List<Tag> tags = Tag.asList(cell.getTagsArray(), cell.getTagsOffset(),
cell.getTagsLength());
// We assume that all the operations for a particular column are the same
- return TimelineWriterUtils.getAggregationOperationFromTagsList(tags);
+ return TimelineStorageUtils.getAggregationOperationFromTagsList(tags);
}
/**
http://git-wip-us.apache.org/repos/asf/hadoop/blob/fdaa1e4e/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/TestHBaseTimelineStorage.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/TestHBaseTimelineStorage.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/TestHBaseTimelineStorage.java
index 3b0921b..701615e 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/TestHBaseTimelineStorage.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/TestHBaseTimelineStorage.java
@@ -54,7 +54,7 @@ import org.apache.hadoop.yarn.server.timelineservice.storage.application.Applica
import org.apache.hadoop.yarn.server.timelineservice.storage.application.ApplicationRowKey;
import org.apache.hadoop.yarn.server.timelineservice.storage.application.ApplicationTable;
import org.apache.hadoop.yarn.server.timelineservice.storage.common.Separator;
-import org.apache.hadoop.yarn.server.timelineservice.storage.common.TimelineWriterUtils;
+import org.apache.hadoop.yarn.server.timelineservice.storage.common.TimelineStorageUtils;
import org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityColumn;
import org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityColumnFamily;
import org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityColumnPrefix;
@@ -366,7 +366,8 @@ public class TestHBaseTimelineStorage {
String flow = "some_flow_name";
String flowVersion = "AB7822C10F1111";
long runid = 1002345678919L;
- String appName = "some app name";
+ String appName =
+ ApplicationId.newInstance(System.currentTimeMillis(), 1).toString();
hbi.write(cluster, user, flow, flowVersion, runid, appName, te);
hbi.stop();
@@ -592,7 +593,8 @@ public class TestHBaseTimelineStorage {
byte[][] karr = (byte[][])e.getKey();
assertEquals(3, karr.length);
assertEquals(eventId, Bytes.toString(karr[0]));
- assertEquals(TimelineWriterUtils.invert(expTs), Bytes.toLong(karr[1]));
+ assertEquals(
+ TimelineStorageUtils.invertLong(expTs), Bytes.toLong(karr[1]));
assertEquals(expKey, Bytes.toString(karr[2]));
Object value = e.getValue();
// there should be only one timestamp and value
@@ -667,7 +669,8 @@ public class TestHBaseTimelineStorage {
String flow = "other_flow_name";
String flowVersion = "1111F01C2287BA";
long runid = 1009876543218L;
- String appName = "some app name";
+ String appName =
+ ApplicationId.newInstance(System.currentTimeMillis(), 1).toString();
byte[] startRow =
EntityRowKey.getRowKeyPrefix(cluster, user, flow, runid, appName);
hbi.write(cluster, user, flow, flowVersion, runid, appName, entities);
@@ -700,7 +703,7 @@ public class TestHBaseTimelineStorage {
byte[][] karr = (byte[][])e.getKey();
assertEquals(3, karr.length);
assertEquals(eventId, Bytes.toString(karr[0]));
- assertEquals(TimelineWriterUtils.invert(expTs),
+ assertEquals(TimelineStorageUtils.invertLong(expTs),
Bytes.toLong(karr[1]));
// key must be empty
assertEquals(0, karr[2].length);