You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by va...@apache.org on 2016/03/04 15:16:14 UTC
hadoop git commit: YARN-4700. ATS storage has one extra record each
time the RM got restarted. (Naganarasimha G R via Varun Saxena)
Repository: hadoop
Updated Branches:
refs/heads/YARN-2928 d491ef080 -> 85513ea85
YARN-4700. ATS storage has one extra record each time the RM got restarted. (Naganarasimha G R via Varun Saxena)
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/85513ea8
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/85513ea8
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/85513ea8
Branch: refs/heads/YARN-2928
Commit: 85513ea8596d5690601123f3b780886896e5bbdf
Parents: d491ef0
Author: Varun Saxena <va...@apache.org>
Authored: Fri Mar 4 19:42:22 2016 +0530
Committer: Varun Saxena <va...@apache.org>
Committed: Fri Mar 4 19:42:22 2016 +0530
----------------------------------------------------------------------
hadoop-yarn-project/CHANGES.txt | 3 ++
.../storage/HBaseTimelineWriterImpl.java | 47 +++++++++--------
.../storage/common/TimelineStorageUtils.java | 35 +++----------
.../storage/flow/FlowActivityRowKey.java | 27 +++-------
...stTimelineReaderWebServicesHBaseStorage.java | 25 +++++----
.../storage/flow/TestFlowDataGenerator.java | 22 ++++----
.../flow/TestHBaseStorageFlowActivity.java | 53 ++++++++++++--------
.../storage/flow/TestHBaseStorageFlowRun.java | 4 +-
8 files changed, 99 insertions(+), 117 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/85513ea8/hadoop-yarn-project/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt
index 4c77b67..4b7fd2c 100644
--- a/hadoop-yarn-project/CHANGES.txt
+++ b/hadoop-yarn-project/CHANGES.txt
@@ -235,6 +235,9 @@ Branch YARN-2928: Timeline Server Next Generation: Phase 1
YARN-4409. Fix javadoc and checkstyle issues in timelineservice code (Varun
Saxena via sjlee)
+ YARN-4700. ATS storage has one extra record each time the RM got restarted.
+ (Naganarasimha G R via varunsaxena)
+
Trunk - Unreleased
INCOMPATIBLE CHANGES
http://git-wip-us.apache.org/repos/asf/hadoop/blob/85513ea8/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/HBaseTimelineWriterImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/HBaseTimelineWriterImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/HBaseTimelineWriterImpl.java
index 997b175..1afe878 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/HBaseTimelineWriterImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/HBaseTimelineWriterImpl.java
@@ -36,6 +36,7 @@ import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity;
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent;
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetric;
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineWriteResponse;
+import org.apache.hadoop.yarn.server.metrics.ApplicationMetricsConstants;
import org.apache.hadoop.yarn.server.timeline.GenericObjectMapper;
import org.apache.hadoop.yarn.server.timelineservice.storage.application.ApplicationColumn;
import org.apache.hadoop.yarn.server.timelineservice.storage.application.ApplicationColumnPrefix;
@@ -53,11 +54,11 @@ import org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityColumn
import org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityRowKey;
import org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityTable;
import org.apache.hadoop.yarn.server.timelineservice.storage.flow.AggregationCompactionDimension;
+import org.apache.hadoop.yarn.server.timelineservice.storage.flow.AggregationOperation;
import org.apache.hadoop.yarn.server.timelineservice.storage.flow.Attribute;
import org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowActivityColumnPrefix;
import org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowActivityRowKey;
import org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowActivityTable;
-import org.apache.hadoop.yarn.server.timelineservice.storage.flow.AggregationOperation;
import org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowRunColumn;
import org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowRunColumnPrefix;
import org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowRunRowKey;
@@ -140,19 +141,22 @@ public class HBaseTimelineWriterImpl extends AbstractService implements
storeRelations(rowKey, te, isApplication);
if (isApplication) {
- if (TimelineStorageUtils.isApplicationCreated(te)) {
+ TimelineEvent event = TimelineStorageUtils.getApplicationEvent(te,
+ ApplicationMetricsConstants.CREATED_EVENT_TYPE);
+ if (event != null) {
onApplicationCreated(clusterId, userId, flowName, flowVersion,
- flowRunId, appId, te);
+ flowRunId, appId, te, event.getTimestamp());
}
// if it's an application entity, store metrics
storeFlowMetricsAppRunning(clusterId, userId, flowName, flowRunId,
appId, te);
// if application has finished, store it's finish time and write final
- // values
- // of all metrics
- if (TimelineStorageUtils.isApplicationFinished(te)) {
+ // values of all metrics
+ event = TimelineStorageUtils.getApplicationEvent(te,
+ ApplicationMetricsConstants.FINISHED_EVENT_TYPE);
+ if (event != null) {
onApplicationFinished(clusterId, userId, flowName, flowVersion,
- flowRunId, appId, te);
+ flowRunId, appId, te, event.getTimestamp());
}
}
}
@@ -161,7 +165,7 @@ public class HBaseTimelineWriterImpl extends AbstractService implements
private void onApplicationCreated(String clusterId, String userId,
String flowName, String flowVersion, long flowRunId, String appId,
- TimelineEntity te) throws IOException {
+ TimelineEntity te, long appCreatedTimeStamp) throws IOException {
// store in App to flow table
storeInAppToFlowTable(clusterId, userId, flowName, flowRunId, appId, te);
// store in flow run table
@@ -169,7 +173,7 @@ public class HBaseTimelineWriterImpl extends AbstractService implements
flowRunId, appId, te);
// store in flow activity table
storeInFlowActivityTable(clusterId, userId, flowName, flowVersion,
- flowRunId, appId, te);
+ flowRunId, appId, appCreatedTimeStamp);
}
/*
@@ -178,8 +182,9 @@ public class HBaseTimelineWriterImpl extends AbstractService implements
*/
private void storeInFlowActivityTable(String clusterId, String userId,
String flowName, String flowVersion, long flowRunId, String appId,
- TimelineEntity te) throws IOException {
- byte[] rowKey = FlowActivityRowKey.getRowKey(clusterId, userId, flowName);
+ long activityTimeStamp) throws IOException {
+ byte[] rowKey = FlowActivityRowKey.getRowKey(clusterId, activityTimeStamp,
+ userId, flowName);
byte[] qualifier = GenericObjectMapper.write(flowRunId);
FlowActivityColumnPrefix.RUN_ID.store(rowKey, flowActivityTable, qualifier,
null, flowVersion,
@@ -214,28 +219,28 @@ public class HBaseTimelineWriterImpl extends AbstractService implements
*/
private void onApplicationFinished(String clusterId, String userId,
String flowName, String flowVersion, long flowRunId, String appId,
- TimelineEntity te) throws IOException {
+ TimelineEntity te, long appFinishedTimeStamp) throws IOException {
// store in flow run table
storeAppFinishedInFlowRunTable(clusterId, userId, flowName, flowRunId,
- appId, te);
+ appId, te, appFinishedTimeStamp);
// indicate in the flow activity table that the app has finished
storeInFlowActivityTable(clusterId, userId, flowName, flowVersion,
- flowRunId, appId, te);
+ flowRunId, appId, appFinishedTimeStamp);
}
/*
* Update the {@link FlowRunTable} with Application Finished information
*/
private void storeAppFinishedInFlowRunTable(String clusterId, String userId,
- String flowName, long flowRunId, String appId, TimelineEntity te)
- throws IOException {
- byte[] rowKey = FlowRunRowKey.getRowKey(clusterId, userId, flowName,
- flowRunId);
- Attribute attributeAppId = AggregationCompactionDimension.APPLICATION_ID
- .getAttribute(appId);
+ String flowName, long flowRunId, String appId, TimelineEntity te,
+ long appFinishedTimeStamp) throws IOException {
+ byte[] rowKey =
+ FlowRunRowKey.getRowKey(clusterId, userId, flowName, flowRunId);
+ Attribute attributeAppId =
+ AggregationCompactionDimension.APPLICATION_ID.getAttribute(appId);
FlowRunColumn.MAX_END_TIME.store(rowKey, flowRunTable, null,
- TimelineStorageUtils.getApplicationFinishedTime(te), attributeAppId);
+ appFinishedTimeStamp, attributeAppId);
// store the final value of metrics since application has finished
Set<TimelineMetric> metrics = te.getMetrics();
http://git-wip-us.apache.org/repos/asf/hadoop/blob/85513ea8/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/TimelineStorageUtils.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/TimelineStorageUtils.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/TimelineStorageUtils.java
index 2328bba..605dbe7 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/TimelineStorageUtils.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/TimelineStorageUtils.java
@@ -21,9 +21,9 @@ import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
+import java.util.Map.Entry;
import java.util.Set;
import java.util.SortedSet;
-import java.util.Map.Entry;
import org.apache.hadoop.classification.InterfaceAudience.Public;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
@@ -308,24 +308,6 @@ public final class TimelineStorageUtils {
}
/**
- * get the time at which an app finished.
- *
- * @param te TimelineEntity object.
- * @return true if application has finished else false
- */
- public static long getApplicationFinishedTime(TimelineEntity te) {
- SortedSet<TimelineEvent> allEvents = te.getEvents();
- if ((allEvents != null) && (allEvents.size() > 0)) {
- TimelineEvent event = allEvents.last();
- if (event.getId().equals(
- ApplicationMetricsConstants.FINISHED_EVENT_TYPE)) {
- return event.getTimestamp();
- }
- }
- return 0L;
- }
-
- /**
* Checks if the input TimelineEntity object is an ApplicationEntity.
*
* @param te TimelineEntity object.
@@ -336,21 +318,20 @@ public final class TimelineStorageUtils {
}
/**
- * Checks for the APPLICATION_CREATED event.
- *
* @param te TimelineEntity object.
- * @return true is application event exists, false otherwise
+ * @param eventId event with this id needs to be fetched
+ * @return TimelineEvent if TimelineEntity contains the desired event.
*/
- public static boolean isApplicationCreated(TimelineEntity te) {
+ public static TimelineEvent getApplicationEvent(TimelineEntity te,
+ String eventId) {
if (isApplicationEntity(te)) {
for (TimelineEvent event : te.getEvents()) {
- if (event.getId()
- .equals(ApplicationMetricsConstants.CREATED_EVENT_TYPE)) {
- return true;
+ if (event.getId().equals(eventId)) {
+ return event;
}
}
}
- return false;
+ return null;
}
/**
http://git-wip-us.apache.org/repos/asf/hadoop/blob/85513ea8/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowActivityRowKey.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowActivityRowKey.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowActivityRowKey.java
index 80b3287..2726ae2 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowActivityRowKey.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowActivityRowKey.java
@@ -83,36 +83,21 @@ public class FlowActivityRowKey {
/**
* Constructs a row key for the flow activity table as follows:
* {@code clusterId!dayTimestamp!user!flowName}.
- * Will insert into current day's record in the table. Uses current time to
- * store top of the day timestamp.
*
* @param clusterId Cluster Id.
- * @param userId User Id.
- * @param flowName Flow Name.
- * @return byte array with the row key prefix
- */
- public static byte[] getRowKey(String clusterId, String userId,
- String flowName) {
- long dayTs = TimelineStorageUtils.getTopOfTheDayTimestamp(System
- .currentTimeMillis());
- return getRowKey(clusterId, dayTs, userId, flowName);
- }
-
- /**
- * Constructs a row key for the flow activity table as follows:
- * {@code clusterId!dayTimestamp!user!flowName}.
- *
- * @param clusterId Cluster Id.
- * @param dayTs Top of the day timestamp.
+ * @param eventTs event's TimeStamp.
* @param userId User Id.
* @param flowName Flow Name.
* @return byte array for the row key
*/
- public static byte[] getRowKey(String clusterId, long dayTs, String userId,
+ public static byte[] getRowKey(String clusterId, long eventTs, String userId,
String flowName) {
+ // convert it to Day's time stamp
+ eventTs = TimelineStorageUtils.getTopOfTheDayTimestamp(eventTs);
+
return Separator.QUALIFIERS.join(
Bytes.toBytes(Separator.QUALIFIERS.encode(clusterId)),
- Bytes.toBytes(TimelineStorageUtils.invertLong(dayTs)),
+ Bytes.toBytes(TimelineStorageUtils.invertLong(eventTs)),
Bytes.toBytes(Separator.QUALIFIERS.encode(userId)),
Bytes.toBytes(Separator.QUALIFIERS.encode(flowName)));
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/85513ea8/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/reader/TestTimelineReaderWebServicesHBaseStorage.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/reader/TestTimelineReaderWebServicesHBaseStorage.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/reader/TestTimelineReaderWebServicesHBaseStorage.java
index 9eaa3de..009b488 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/reader/TestTimelineReaderWebServicesHBaseStorage.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/reader/TestTimelineReaderWebServicesHBaseStorage.java
@@ -62,8 +62,8 @@ import org.junit.Test;
import com.sun.jersey.api.client.Client;
import com.sun.jersey.api.client.ClientResponse;
-import com.sun.jersey.api.client.GenericType;
import com.sun.jersey.api.client.ClientResponse.Status;
+import com.sun.jersey.api.client.GenericType;
import com.sun.jersey.api.client.config.ClientConfig;
import com.sun.jersey.api.client.config.DefaultClientConfig;
import com.sun.jersey.client.urlconnection.HttpURLConnectionFactory;
@@ -128,15 +128,14 @@ public class TestTimelineReaderWebServicesHBaseStorage {
TimelineEvent event = new TimelineEvent();
event.setId(ApplicationMetricsConstants.CREATED_EVENT_TYPE);
- Long expTs = 1436512802000L;
- event.setTimestamp(expTs);
+ event.setTimestamp(cTime);
String expKey = "foo_event";
Object expVal = "test";
event.addInfo(expKey, expVal);
entity.addEvent(event);
TimelineEvent event11 = new TimelineEvent();
event11.setId(ApplicationMetricsConstants.FINISHED_EVENT_TYPE);
- expTs = 1436512802010L;
+ Long expTs = 1425019501000L;
event11.setTimestamp(expTs);
entity.addEvent(event11);
@@ -165,7 +164,7 @@ public class TestTimelineReaderWebServicesHBaseStorage {
entity1.addMetrics(metrics);
TimelineEvent event1 = new TimelineEvent();
event1.setId(ApplicationMetricsConstants.CREATED_EVENT_TYPE);
- event1.setTimestamp(expTs);
+ event1.setTimestamp(cTime);
event1.addInfo(expKey, expVal);
entity1.addEvent(event1);
te1.addEntity(entity1);
@@ -182,7 +181,7 @@ public class TestTimelineReaderWebServicesHBaseStorage {
entity3.setCreatedTime(cTime);
TimelineEvent event2 = new TimelineEvent();
event2.setId(ApplicationMetricsConstants.CREATED_EVENT_TYPE);
- event2.setTimestamp(1436512802037L);
+ event2.setTimestamp(cTime);
event2.addInfo("foo_event", "test");
entity3.addEvent(event2);
te3.addEntity(entity3);
@@ -196,7 +195,7 @@ public class TestTimelineReaderWebServicesHBaseStorage {
entity4.setCreatedTime(cTime);
TimelineEvent event4 = new TimelineEvent();
event4.setId(ApplicationMetricsConstants.CREATED_EVENT_TYPE);
- event4.setTimestamp(1436512802037L);
+ event4.setTimestamp(cTime);
event4.addInfo("foo_event", "test");
entity4.addEvent(event4);
te4.addEntity(entity4);
@@ -785,10 +784,14 @@ public class TestTimelineReaderWebServicesHBaseStorage {
assertNotNull(entities);
assertEquals(1, entities.size());
+ long firstFlowActivity =
+ TimelineStorageUtils.getTopOfTheDayTimestamp(1425016501000L);
+
DateFormat fmt = TimelineReaderWebServices.DATE_FORMAT.get();
uri = URI.create("http://localhost:" + serverPort + "/ws/v2/" +
- "timeline/clusters/cluster1/flows?daterange=" + fmt.format(dayTs) +
- "-" + fmt.format(dayTs + (2*86400000L)));
+ "timeline/clusters/cluster1/flows?daterange="
+ + fmt.format(firstFlowActivity) + "-"
+ + fmt.format(dayTs));
resp = getResponse(client, uri);
entities = resp.getEntity(new GenericType<Set<FlowActivityEntity>>(){});
assertNotNull(entities);
@@ -810,7 +813,7 @@ public class TestTimelineReaderWebServicesHBaseStorage {
uri = URI.create("http://localhost:" + serverPort + "/ws/v2/" +
"timeline/clusters/cluster1/flows?daterange=-" +
- fmt.format(dayTs + (2*86400000L)));
+ fmt.format(dayTs));
resp = getResponse(client, uri);
entities = resp.getEntity(new GenericType<Set<FlowActivityEntity>>(){});
assertNotNull(entities);
@@ -818,7 +821,7 @@ public class TestTimelineReaderWebServicesHBaseStorage {
uri = URI.create("http://localhost:" + serverPort + "/ws/v2/" +
"timeline/clusters/cluster1/flows?daterange=" +
- fmt.format(dayTs - (2*86400000L)) + "-");
+ fmt.format(firstFlowActivity) + "-");
resp = getResponse(client, uri);
entities = resp.getEntity(new GenericType<Set<FlowActivityEntity>>(){});
assertNotNull(entities);
http://git-wip-us.apache.org/repos/asf/hadoop/blob/85513ea8/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/TestFlowDataGenerator.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/TestFlowDataGenerator.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/TestFlowDataGenerator.java
index a4c06f2..d45df57 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/TestFlowDataGenerator.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/TestFlowDataGenerator.java
@@ -103,8 +103,7 @@ class TestFlowDataGenerator {
String type = TimelineEntityType.YARN_APPLICATION.toString();
entity.setId(id);
entity.setType(type);
- long cTime = 20000000000000L;
- long mTime = 1425026901000L;
+ long cTime = 1425026901000L;
entity.setCreatedTime(cTime);
// add metrics
Set<TimelineMetric> metrics = new HashSet<>();
@@ -125,8 +124,7 @@ class TestFlowDataGenerator {
TimelineEvent event = new TimelineEvent();
event.setId(ApplicationMetricsConstants.CREATED_EVENT_TYPE);
- long expTs = 1436512802000L;
- event.setTimestamp(expTs);
+ event.setTimestamp(cTime);
String expKey = "foo_event";
Object expVal = "test";
event.addInfo(expKey, expVal);
@@ -134,7 +132,8 @@ class TestFlowDataGenerator {
event = new TimelineEvent();
event.setId(ApplicationMetricsConstants.FINISHED_EVENT_TYPE);
- event.setTimestamp(1436512801000L);
+ long expTs = cTime + 21600000;// start time + 6hrs
+ event.setTimestamp(expTs);
event.addInfo(expKey, expVal);
entity.addEvent(event);
@@ -149,8 +148,7 @@ class TestFlowDataGenerator {
entity.setType(type);
TimelineEvent event = new TimelineEvent();
event.setId(ApplicationMetricsConstants.CREATED_EVENT_TYPE);
- long endTs = 1439379885000L;
- event.setTimestamp(endTs);
+ event.setTimestamp(startTs);
String expKey = "foo_event_greater";
String expVal = "test_app_greater";
event.addInfo(expKey, expVal);
@@ -181,25 +179,23 @@ class TestFlowDataGenerator {
entity.setCreatedTime(startTs);
TimelineEvent event = new TimelineEvent();
event.setId(ApplicationMetricsConstants.CREATED_EVENT_TYPE);
- event.setTimestamp(System.currentTimeMillis());
+ event.setTimestamp(startTs);
entity.addEvent(event);
return entity;
}
- static TimelineEntity getFlowApp1() {
+ static TimelineEntity getFlowApp1(long appCreatedTime) {
TimelineEntity entity = new TimelineEntity();
String id = "flowActivity_test";
String type = TimelineEntityType.YARN_APPLICATION.toString();
entity.setId(id);
entity.setType(type);
- long cTime = 1425016501000L;
- entity.setCreatedTime(cTime);
+ entity.setCreatedTime(appCreatedTime);
TimelineEvent event = new TimelineEvent();
event.setId(ApplicationMetricsConstants.CREATED_EVENT_TYPE);
- long expTs = 1436512802000L;
- event.setTimestamp(expTs);
+ event.setTimestamp(appCreatedTime);
String expKey = "foo_event";
Object expVal = "test";
event.addInfo(expKey, expVal);
http://git-wip-us.apache.org/repos/asf/hadoop/blob/85513ea8/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/TestHBaseStorageFlowActivity.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/TestHBaseStorageFlowActivity.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/TestHBaseStorageFlowActivity.java
index 9161902..6b23b6c 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/TestHBaseStorageFlowActivity.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/TestHBaseStorageFlowActivity.java
@@ -112,9 +112,9 @@ public class TestHBaseStorageFlowActivity {
String flowVersion = "CF7022C10F1354";
long runid = 1002345678919L;
String appName = "application_100000000000_1111";
- long minStartTs = 10000000000000L;
- long greaterStartTs = 30000000000000L;
- long endTs = 1439750690000L;
+ long minStartTs = 1424995200300L;
+ long greaterStartTs = 1424995200300L + 864000L;
+ long endTs = 1424995200300L + 86000000L;;
TimelineEntity entityMinStartTime = TestFlowDataGenerator
.getEntityMinStartTime(minStartTs);
@@ -155,7 +155,8 @@ public class TestHBaseStorageFlowActivity {
// check in flow activity table
Table table1 = conn.getTable(TableName
.valueOf(FlowActivityTable.DEFAULT_TABLE_NAME));
- byte[] startRow = FlowActivityRowKey.getRowKey(cluster, user, flow);
+ byte[] startRow =
+ FlowActivityRowKey.getRowKey(cluster, minStartTs, user, flow);
Get g = new Get(startRow);
Result r1 = table1.get(g);
assertNotNull(r1);
@@ -169,8 +170,7 @@ public class TestHBaseStorageFlowActivity {
assertEquals(cluster, flowActivityRowKey.getClusterId());
assertEquals(user, flowActivityRowKey.getUserId());
assertEquals(flow, flowActivityRowKey.getFlowName());
- long dayTs = TimelineStorageUtils.getTopOfTheDayTimestamp(System
- .currentTimeMillis());
+ long dayTs = TimelineStorageUtils.getTopOfTheDayTimestamp(minStartTs);
assertEquals(dayTs, flowActivityRowKey.getDayTimestamp());
assertEquals(1, values.size());
checkFlowActivityRunId(runid, flowVersion, values);
@@ -216,7 +216,9 @@ public class TestHBaseStorageFlowActivity {
long runid = 1001111178919L;
TimelineEntities te = new TimelineEntities();
- TimelineEntity entityApp1 = TestFlowDataGenerator.getFlowApp1();
+ long appCreatedTime = 1425016501000L;
+ TimelineEntity entityApp1 =
+ TestFlowDataGenerator.getFlowApp1(appCreatedTime);
te.addEntity(entityApp1);
HBaseTimelineWriterImpl hbi = null;
@@ -231,7 +233,8 @@ public class TestHBaseStorageFlowActivity {
hbi.close();
}
// check flow activity
- checkFlowActivityTable(cluster, user, flow, flowVersion, runid, c1);
+ checkFlowActivityTable(cluster, user, flow, flowVersion, runid, c1,
+ appCreatedTime);
// use the reader to verify the data
HBaseTimelineReaderImpl hbr = null;
@@ -262,13 +265,16 @@ public class TestHBaseStorageFlowActivity {
}
private void checkFlowActivityTable(String cluster, String user, String flow,
- String flowVersion, long runid, Configuration c1) throws IOException {
+ String flowVersion, long runid, Configuration c1, long appCreatedTime)
+ throws IOException {
Scan s = new Scan();
s.addFamily(FlowActivityColumnFamily.INFO.getBytes());
- byte[] startRow = FlowActivityRowKey.getRowKey(cluster, user, flow);
+ byte[] startRow =
+ FlowActivityRowKey.getRowKey(cluster, appCreatedTime, user, flow);
s.setStartRow(startRow);
String clusterStop = cluster + "1";
- byte[] stopRow = FlowActivityRowKey.getRowKey(clusterStop, user, flow);
+ byte[] stopRow =
+ FlowActivityRowKey.getRowKey(clusterStop, appCreatedTime, user, flow);
s.setStopRow(stopRow);
Connection conn = ConnectionFactory.createConnection(c1);
Table table1 = conn.getTable(TableName
@@ -288,8 +294,7 @@ public class TestHBaseStorageFlowActivity {
assertEquals(cluster, flowActivityRowKey.getClusterId());
assertEquals(user, flowActivityRowKey.getUserId());
assertEquals(flow, flowActivityRowKey.getFlowName());
- long dayTs = TimelineStorageUtils.getTopOfTheDayTimestamp(System
- .currentTimeMillis());
+ long dayTs = TimelineStorageUtils.getTopOfTheDayTimestamp(appCreatedTime);
assertEquals(dayTs, flowActivityRowKey.getDayTimestamp());
assertEquals(1, values.size());
checkFlowActivityRunId(runid, flowVersion, values);
@@ -319,7 +324,9 @@ public class TestHBaseStorageFlowActivity {
long runid3 = 3333333333333L;
TimelineEntities te = new TimelineEntities();
- TimelineEntity entityApp1 = TestFlowDataGenerator.getFlowApp1();
+ long appCreatedTime = 1425016501000L;
+ TimelineEntity entityApp1 =
+ TestFlowDataGenerator.getFlowApp1(appCreatedTime);
te.addEntity(entityApp1);
HBaseTimelineWriterImpl hbi = null;
@@ -348,7 +355,7 @@ public class TestHBaseStorageFlowActivity {
}
// check flow activity
checkFlowActivityTableSeveralRuns(cluster, user, flow, c1, flowVersion1,
- runid1, flowVersion2, runid2, flowVersion3, runid3);
+ runid1, flowVersion2, runid2, flowVersion3, runid3, appCreatedTime);
// use the timeline reader to verify data
HBaseTimelineReaderImpl hbr = null;
@@ -369,8 +376,8 @@ public class TestHBaseStorageFlowActivity {
assertEquals(cluster, flowActivity.getCluster());
assertEquals(user, flowActivity.getUser());
assertEquals(flow, flowActivity.getFlowName());
- long dayTs = TimelineStorageUtils.getTopOfTheDayTimestamp(System
- .currentTimeMillis());
+ long dayTs =
+ TimelineStorageUtils.getTopOfTheDayTimestamp(appCreatedTime);
assertEquals(dayTs, flowActivity.getDate().getTime());
Set<FlowRunEntity> flowRuns = flowActivity.getFlowRuns();
assertEquals(3, flowRuns.size());
@@ -395,14 +402,17 @@ public class TestHBaseStorageFlowActivity {
private void checkFlowActivityTableSeveralRuns(String cluster, String user,
String flow, Configuration c1, String flowVersion1, long runid1,
- String flowVersion2, long runid2, String flowVersion3, long runid3)
+ String flowVersion2, long runid2, String flowVersion3, long runid3,
+ long appCreatedTime)
throws IOException {
Scan s = new Scan();
s.addFamily(FlowActivityColumnFamily.INFO.getBytes());
- byte[] startRow = FlowActivityRowKey.getRowKey(cluster, user, flow);
+ byte[] startRow =
+ FlowActivityRowKey.getRowKey(cluster, appCreatedTime, user, flow);
s.setStartRow(startRow);
String clusterStop = cluster + "1";
- byte[] stopRow = FlowActivityRowKey.getRowKey(clusterStop, user, flow);
+ byte[] stopRow =
+ FlowActivityRowKey.getRowKey(clusterStop, appCreatedTime, user, flow);
s.setStopRow(stopRow);
Connection conn = ConnectionFactory.createConnection(c1);
Table table1 = conn.getTable(TableName
@@ -419,8 +429,7 @@ public class TestHBaseStorageFlowActivity {
assertEquals(cluster, flowActivityRowKey.getClusterId());
assertEquals(user, flowActivityRowKey.getUserId());
assertEquals(flow, flowActivityRowKey.getFlowName());
- long dayTs = TimelineStorageUtils.getTopOfTheDayTimestamp(System
- .currentTimeMillis());
+ long dayTs = TimelineStorageUtils.getTopOfTheDayTimestamp(appCreatedTime);
assertEquals(dayTs, flowActivityRowKey.getDayTimestamp());
Map<byte[], byte[]> values = result
http://git-wip-us.apache.org/repos/asf/hadoop/blob/85513ea8/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/TestHBaseStorageFlowRun.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/TestHBaseStorageFlowRun.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/TestHBaseStorageFlowRun.java
index 9504799..b234bfd 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/TestHBaseStorageFlowRun.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/TestHBaseStorageFlowRun.java
@@ -52,8 +52,8 @@ import org.apache.hadoop.yarn.server.timelineservice.reader.filter.TimelineFilte
import org.apache.hadoop.yarn.server.timelineservice.reader.filter.TimelinePrefixFilter;
import org.apache.hadoop.yarn.server.timelineservice.storage.HBaseTimelineReaderImpl;
import org.apache.hadoop.yarn.server.timelineservice.storage.HBaseTimelineWriterImpl;
-import org.apache.hadoop.yarn.server.timelineservice.storage.TimelineSchemaCreator;
import org.apache.hadoop.yarn.server.timelineservice.storage.TimelineReader.Field;
+import org.apache.hadoop.yarn.server.timelineservice.storage.TimelineSchemaCreator;
import org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnHelper;
import org.junit.AfterClass;
import org.junit.BeforeClass;
@@ -114,7 +114,7 @@ public class TestHBaseStorageFlowRun {
String flowVersion = "CF7022C10F1354";
long runid = 1002345678919L;
String appName = "application_100000000000_1111";
- long minStartTs = 10000000000000L;
+ long minStartTs = 1425026900000L;
long greaterStartTs = 30000000000000L;
long endTs = 1439750690000L;
TimelineEntity entityMinStartTime = TestFlowDataGenerator