You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by hi...@apache.org on 2016/07/21 18:24:45 UTC
tez git commit: TEZ-3357. Change TimelineCachePlugin to handle DAG
grouping. (Harish Jaiprakash via hitesh)
Repository: tez
Updated Branches:
refs/heads/master e02e4f721 -> 9930011b0
TEZ-3357. Change TimelineCachePlugin to handle DAG grouping. (Harish Jaiprakash via hitesh)
Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/9930011b
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/9930011b
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/9930011b
Branch: refs/heads/master
Commit: 9930011b0f05d56ece049867a71fb7eebfe6442e
Parents: e02e4f7
Author: Hitesh Shah <hi...@apache.org>
Authored: Thu Jul 21 11:23:33 2016 -0700
Committer: Hitesh Shah <hi...@apache.org>
Committed: Thu Jul 21 11:23:33 2016 -0700
----------------------------------------------------------------------
CHANGES.txt | 2 +
.../apache/tez/dag/api/TezConfiguration.java | 34 +++-
.../org/apache/tez/dag/records/TezDAGID.java | 19 +++
.../logging/ats/TimelineCachePluginImpl.java | 110 ++++++++----
.../ats/TestTimelineCachePluginImpl.java | 170 ++++++++++++++++---
5 files changed, 274 insertions(+), 61 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/tez/blob/9930011b/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index dd89640..a925104 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -7,6 +7,7 @@ INCOMPATIBLE CHANGES
ALL CHANGES:
+ TEZ-3357. Change TimelineCachePlugin to handle DAG grouping.
TEZ-3348. NullPointerException in Tez MROutput while trying to write using Parquet's DeprecatedParquetOutputFormat.
TEZ-3356. Fix initializing of stats when custom ShuffleVertexManager is used.
TEZ-3235. Modify Example TestOrderedWordCount job to test the IPC limit for large dag plans.
@@ -85,6 +86,7 @@ INCOMPATIBLE CHANGES
ALL CHANGES:
+ TEZ-3357. Change TimelineCachePlugin to handle DAG grouping.
TEZ-3348. NullPointerException in Tez MROutput while trying to write using Parquet's DeprecatedParquetOutputFormat.
TEZ-3356. Fix initializing of stats when custom ShuffleVertexManager is used.
TEZ-3329. Tez ATS data is incomplete for a vertex which fails or gets killed before initialization
http://git-wip-us.apache.org/repos/asf/tez/blob/9930011b/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java b/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java
index 1de2eda..11c50cf 100644
--- a/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java
+++ b/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java
@@ -1229,6 +1229,38 @@ public class TezConfiguration extends Configuration {
"org.apache.tez.dag.history.logging.impl.SimpleHistoryLoggingService";
/**
+ * Comma separated list of Integers. These are the values that were set for the config value
+ * for {@value #TEZ_HISTORY_LOGGING_TIMELINE_NUM_DAGS_PER_GROUP}. The older values are required so that
+ * the groupIds generated previously will continue to be generated by the plugin. If an older
+ * value is not present then the UI may not show information for DAGs which were created
+ * with a different grouping value.
+ *
+ * Note: Do not add too many values here as it will affect the performance of Yarn Timeline
+ * Server/Tez UI due to the need to scan for more log files.
+ */
+ @Private
+ @Unstable
+ @ConfigurationScope(Scope.AM)
+ @ConfigurationProperty
+ public static final String TEZ_HISTORY_LOGGING_TIMELINE_CACHE_PLUGIN_OLD_NUM_DAGS_PER_GROUP =
+ TEZ_PREFIX + "history.logging.timeline-cache-plugin.old-num-dags-per-group";
+
+ /**
+ * Integer value. Number of DAGs to be grouped together. This is used by the history logging
+ * service to generate groupIds such that numDagsPerGroup will have same groupId in a given
+ * session. If the value is set to 1 then we disable grouping. This config is used to control the
+ * number of DAGs written into one log file, and hence controls number of files created in
+ * the Filesystem used by YARN Timeline.
+ */
+ @Private
+ @Unstable
+ @ConfigurationScope(Scope.AM)
+ @ConfigurationProperty(type="integer")
+ public static final String TEZ_HISTORY_LOGGING_TIMELINE_NUM_DAGS_PER_GROUP =
+ TEZ_PREFIX + "history.timeline.num-dags-per-group";
+ public static final int TEZ_HISTORY_LOGGING_TIMELINE_NUM_DAGS_PER_GROUP_DEFAULT = 1;
+
+ /**
* String value. The directory into which history data will be written. This defaults to the
* container logging directory. This is relevant only when SimpleHistoryLoggingService is being
* used for {@link TezConfiguration#TEZ_HISTORY_LOGGING_SERVICE_CLASS}
@@ -1237,7 +1269,7 @@ public class TezConfiguration extends Configuration {
@ConfigurationProperty
public static final String TEZ_SIMPLE_HISTORY_LOGGING_DIR =
TEZ_PREFIX + "simple.history.logging.dir";
-
+
/**
* Int value. Maximum errors allowed while logging history data. After crossing this limit history
* logging gets disabled. The job continues to run after this.
http://git-wip-us.apache.org/repos/asf/tez/blob/9930011b/tez-common/src/main/java/org/apache/tez/dag/records/TezDAGID.java
----------------------------------------------------------------------
diff --git a/tez-common/src/main/java/org/apache/tez/dag/records/TezDAGID.java b/tez-common/src/main/java/org/apache/tez/dag/records/TezDAGID.java
index 3828890..58ab509 100644
--- a/tez-common/src/main/java/org/apache/tez/dag/records/TezDAGID.java
+++ b/tez-common/src/main/java/org/apache/tez/dag/records/TezDAGID.java
@@ -176,6 +176,25 @@ public class TezDAGID extends TezID {
return appendTo(new StringBuilder(DAG)).toString();
}
+ // The groupId prefix.
+ private static final String DAG_GROUPID_PREFIX = "daggroup";
+
+ /**
+ * Generate a DAG group id which groups multiple DAGs into one group.
+ *
+ * @param numDagsPerGroup The number of DAGs present in one group.
+ * @return The group id to be used for grouping numDagsPerGroup into one group.
+ */
+ public String getGroupId(int numDagsPerGroup) {
+ if (numDagsPerGroup <= 1) {
+ throw new IllegalArgumentException("numDagsPerGroup has to be more than one. Got: " + numDagsPerGroup);
+ }
+ return DAG_GROUPID_PREFIX + SEPARATOR +
+ getApplicationId().getClusterTimestamp() + SEPARATOR +
+ tezAppIdFormat.get().format(getApplicationId().getId()) + SEPARATOR +
+ tezDagIdFormat.get().format(getId() / numDagsPerGroup);
+ }
+
public static TezDAGID fromString(String dagId) {
int id = 0;
int appId = 0;
http://git-wip-us.apache.org/repos/asf/tez/blob/9930011b/tez-plugins/tez-yarn-timeline-cache-plugin/src/main/java/org/apache/tez/dag/history/logging/ats/TimelineCachePluginImpl.java
----------------------------------------------------------------------
diff --git a/tez-plugins/tez-yarn-timeline-cache-plugin/src/main/java/org/apache/tez/dag/history/logging/ats/TimelineCachePluginImpl.java b/tez-plugins/tez-yarn-timeline-cache-plugin/src/main/java/org/apache/tez/dag/history/logging/ats/TimelineCachePluginImpl.java
index d81f56a..b4217a1 100644
--- a/tez-plugins/tez-yarn-timeline-cache-plugin/src/main/java/org/apache/tez/dag/history/logging/ats/TimelineCachePluginImpl.java
+++ b/tez-plugins/tez-yarn-timeline-cache-plugin/src/main/java/org/apache/tez/dag/history/logging/ats/TimelineCachePluginImpl.java
@@ -23,20 +23,26 @@ import java.util.HashSet;
import java.util.Set;
import java.util.SortedSet;
+import org.apache.hadoop.conf.Configurable;
+import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.timeline.TimelineEntityGroupId;
import org.apache.hadoop.yarn.server.timeline.NameValuePair;
import org.apache.hadoop.yarn.server.timeline.TimelineEntityGroupPlugin;
+import org.apache.tez.dag.api.TezConfiguration;
import org.apache.tez.dag.history.logging.EntityTypes;
import org.apache.tez.dag.records.TezDAGID;
import org.apache.tez.dag.records.TezTaskAttemptID;
import org.apache.tez.dag.records.TezTaskID;
import org.apache.tez.dag.records.TezVertexID;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import com.google.common.collect.Sets;
-public class TimelineCachePluginImpl extends TimelineEntityGroupPlugin {
+public class TimelineCachePluginImpl extends TimelineEntityGroupPlugin implements Configurable {
+ private static final Logger LOG = LoggerFactory.getLogger(TimelineCachePluginImpl.class);
private static Set<String> summaryEntityTypes;
private static Set<String> knownEntityTypes;
@@ -54,11 +60,27 @@ public class TimelineCachePluginImpl extends TimelineEntityGroupPlugin {
EntityTypes.TEZ_APPLICATION.name());
}
+ private Configuration conf;
+
+ private Set<Integer> allNumGroupsPerDag;
+
// Empty public constructor
public TimelineCachePluginImpl() {
+ setConf(new TezConfiguration());
+ }
+
+ private Set<TimelineEntityGroupId> createTimelineEntityGroupIds(TezDAGID dagId) {
+ ApplicationId appId = dagId.getApplicationId();
+ HashSet<TimelineEntityGroupId> groupIds = Sets.newHashSet(
+ TimelineEntityGroupId.newInstance(appId, appId.toString()),
+ TimelineEntityGroupId.newInstance(appId, dagId.toString()));
+ for (int numGroupsPerDag : allNumGroupsPerDag) {
+ groupIds.add(TimelineEntityGroupId.newInstance(appId, dagId.getGroupId(numGroupsPerDag)));
+ }
+ return groupIds;
}
- private TimelineEntityGroupId convertToTimelineEntityGroupId(String entityType, String entityId) {
+ private Set<TimelineEntityGroupId> convertToTimelineEntityGroupIds(String entityType, String entityId) {
if (entityType == null || entityType.isEmpty()
|| entityId == null || entityId.isEmpty()) {
return null;
@@ -66,27 +88,23 @@ public class TimelineCachePluginImpl extends TimelineEntityGroupPlugin {
if (entityType.equals(EntityTypes.TEZ_DAG_ID.name())) {
TezDAGID dagId = TezDAGID.fromString(entityId);
if (dagId != null) {
- return TimelineEntityGroupId.newInstance(dagId.getApplicationId(), dagId.toString());
+ return createTimelineEntityGroupIds(dagId);
}
} else if (entityType.equals(EntityTypes.TEZ_VERTEX_ID.name())) {
TezVertexID vertexID = TezVertexID.fromString(entityId);
if (vertexID != null) {
- return TimelineEntityGroupId.newInstance(vertexID.getDAGId().getApplicationId(),
- vertexID.getDAGId().toString());
+ return createTimelineEntityGroupIds(vertexID.getDAGId());
}
} else if (entityType.equals(EntityTypes.TEZ_TASK_ID.name())) {
TezTaskID taskID = TezTaskID.fromString(entityId);
if (taskID != null) {
- return TimelineEntityGroupId.newInstance(taskID.getVertexID().getDAGId().getApplicationId(),
- taskID.getVertexID().getDAGId().toString());
+ return createTimelineEntityGroupIds(taskID.getVertexID().getDAGId());
}
} else if (entityType.equals(EntityTypes.TEZ_TASK_ATTEMPT_ID.name())) {
TezTaskAttemptID taskAttemptID = TezTaskAttemptID.fromString(entityId);
if (taskAttemptID != null) {
- return TimelineEntityGroupId.newInstance(
- taskAttemptID.getTaskID().getVertexID().getDAGId().getApplicationId(),
- taskAttemptID.getTaskID().getVertexID().getDAGId().toString());
+ return createTimelineEntityGroupIds(taskAttemptID.getTaskID().getVertexID().getDAGId());
}
} else if (entityType.equals(EntityTypes.TEZ_CONTAINER_ID.name())) {
String cId = entityId;
@@ -95,9 +113,9 @@ public class TimelineCachePluginImpl extends TimelineEntityGroupPlugin {
}
ContainerId containerId = ContainerId.fromString(cId);
if (containerId != null) {
- return TimelineEntityGroupId.newInstance(
+ return Sets.newHashSet(TimelineEntityGroupId.newInstance(
containerId.getApplicationAttemptId().getApplicationId(),
- containerId.getApplicationAttemptId().getApplicationId().toString());
+ containerId.getApplicationAttemptId().getApplicationId().toString()));
}
}
return null;
@@ -113,15 +131,7 @@ public class TimelineCachePluginImpl extends TimelineEntityGroupPlugin {
|| summaryEntityTypes.contains(entityType)) {
return null;
}
- TimelineEntityGroupId groupId = convertToTimelineEntityGroupId(primaryFilter.getName(),
- primaryFilter.getValue().toString());
- if (groupId != null) {
- TimelineEntityGroupId appGroupId =
- TimelineEntityGroupId.newInstance(groupId.getApplicationId(),
- groupId.getApplicationId().toString());
- return Sets.newHashSet(groupId, appGroupId);
- }
- return null;
+ return convertToTimelineEntityGroupIds(primaryFilter.getName(), primaryFilter.getValue().toString());
}
@Override
@@ -129,14 +139,7 @@ public class TimelineCachePluginImpl extends TimelineEntityGroupPlugin {
if (!knownEntityTypes.contains(entityType) || summaryEntityTypes.contains(entityType)) {
return null;
}
- TimelineEntityGroupId groupId = convertToTimelineEntityGroupId(entityType, entityId);
- if (groupId != null) {
- TimelineEntityGroupId appGroupId =
- TimelineEntityGroupId.newInstance(groupId.getApplicationId(),
- groupId.getApplicationId().toString());
- return Sets.newHashSet(groupId, appGroupId);
- }
- return null;
+ return convertToTimelineEntityGroupIds(entityType, entityId);
}
@Override
@@ -147,20 +150,53 @@ public class TimelineCachePluginImpl extends TimelineEntityGroupPlugin {
|| entityIds == null || entityIds.isEmpty()) {
return null;
}
- Set<TimelineEntityGroupId> groupIds = new HashSet<TimelineEntityGroupId>();
- Set<ApplicationId> appIdSet = new HashSet<ApplicationId>();
+ Set<TimelineEntityGroupId> groupIds = new HashSet<TimelineEntityGroupId>();
for (String entityId : entityIds) {
- TimelineEntityGroupId groupId = convertToTimelineEntityGroupId(entityType, entityId);
+ Set<TimelineEntityGroupId> groupId = convertToTimelineEntityGroupIds(entityType, entityId);
if (groupId != null) {
- groupIds.add(groupId);
- appIdSet.add(groupId.getApplicationId());
+ groupIds.addAll(groupId);
}
}
- for (ApplicationId appId : appIdSet) {
- groupIds.add(TimelineEntityGroupId.newInstance(appId, appId.toString()));
- }
return groupIds;
}
+ @Override
+ public void setConf(Configuration conf) {
+ this.conf = conf instanceof TezConfiguration ? conf : new TezConfiguration(conf);
+
+ this.allNumGroupsPerDag = loadAllNumDagsPerGroup();
+ }
+
+ private Set<Integer> loadAllNumDagsPerGroup() {
+ Set<Integer> allNumDagsPerGroup = new HashSet<Integer>();
+
+ int numDagsPerGroup = conf.getInt(TezConfiguration.TEZ_HISTORY_LOGGING_TIMELINE_NUM_DAGS_PER_GROUP,
+ TezConfiguration.TEZ_HISTORY_LOGGING_TIMELINE_NUM_DAGS_PER_GROUP_DEFAULT);
+ if (numDagsPerGroup > 1) {
+ // Add current numDagsPerGroup from config.
+ allNumDagsPerGroup.add(numDagsPerGroup);
+ }
+
+ // Add the older values from config.
+ int [] usedNumGroups = conf.getInts(TezConfiguration.TEZ_HISTORY_LOGGING_TIMELINE_CACHE_PLUGIN_OLD_NUM_DAGS_PER_GROUP);
+ if (usedNumGroups != null) {
+ for (int i = 0; i < usedNumGroups.length; ++i) {
+ allNumDagsPerGroup.add(usedNumGroups[i]);
+ }
+ }
+
+ // Warn for performance impact
+ if (allNumDagsPerGroup.size() > 3) {
+ LOG.warn("Too many entries in " + TezConfiguration.TEZ_HISTORY_LOGGING_TIMELINE_CACHE_PLUGIN_OLD_NUM_DAGS_PER_GROUP +
+ ", this can result in slower lookup from Yarn Timeline server or slower load times in TezUI.");
+ }
+ return allNumDagsPerGroup;
+ }
+
+ @Override
+ public Configuration getConf() {
+ return conf;
+ }
+
}
http://git-wip-us.apache.org/repos/asf/tez/blob/9930011b/tez-plugins/tez-yarn-timeline-cache-plugin/src/test/java/org/apache/tez/dag/history/logging/ats/TestTimelineCachePluginImpl.java
----------------------------------------------------------------------
diff --git a/tez-plugins/tez-yarn-timeline-cache-plugin/src/test/java/org/apache/tez/dag/history/logging/ats/TestTimelineCachePluginImpl.java b/tez-plugins/tez-yarn-timeline-cache-plugin/src/test/java/org/apache/tez/dag/history/logging/ats/TestTimelineCachePluginImpl.java
index 562a66e..6f819ba 100644
--- a/tez-plugins/tez-yarn-timeline-cache-plugin/src/test/java/org/apache/tez/dag/history/logging/ats/TestTimelineCachePluginImpl.java
+++ b/tez-plugins/tez-yarn-timeline-cache-plugin/src/test/java/org/apache/tez/dag/history/logging/ats/TestTimelineCachePluginImpl.java
@@ -27,11 +27,14 @@ import java.util.Set;
import java.util.SortedSet;
import java.util.TreeSet;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.util.ReflectionUtils;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.timeline.TimelineEntityGroupId;
import org.apache.hadoop.yarn.server.timeline.NameValuePair;
+import org.apache.tez.dag.api.TezConfiguration;
import org.apache.tez.dag.history.logging.EntityTypes;
import org.apache.tez.dag.records.TezDAGID;
import org.apache.tez.dag.records.TezTaskAttemptID;
@@ -43,6 +46,7 @@ import org.junit.Test;
import com.google.common.collect.Sets;
+
public class TestTimelineCachePluginImpl {
static ApplicationId appId1;
@@ -61,8 +65,20 @@ public class TestTimelineCachePluginImpl {
static Map<String, String> typeIdMap1;
static Map<String, String> typeIdMap2;
- TimelineCachePluginImpl plugin =
- new TimelineCachePluginImpl();
+ private static TimelineCachePluginImpl createPlugin(int numDagsPerGroup, String usedNumDagsPerGroup) {
+ Configuration conf = new Configuration(false);
+ if (numDagsPerGroup > 0) {
+ conf.setInt(TezConfiguration.TEZ_HISTORY_LOGGING_TIMELINE_NUM_DAGS_PER_GROUP, numDagsPerGroup);
+ }
+ if (usedNumDagsPerGroup != null) {
+ conf.set(TezConfiguration.TEZ_HISTORY_LOGGING_TIMELINE_CACHE_PLUGIN_OLD_NUM_DAGS_PER_GROUP, usedNumDagsPerGroup);
+ }
+ if (numDagsPerGroup > 0 || usedNumDagsPerGroup != null) {
+ return ReflectionUtils.newInstance(TimelineCachePluginImpl.class, conf);
+ } else {
+ return new TimelineCachePluginImpl();
+ }
+ }
@BeforeClass
public static void beforeClass() {
@@ -94,11 +110,11 @@ public class TestTimelineCachePluginImpl {
typeIdMap2.put(EntityTypes.TEZ_VERTEX_ID.name(), vertexID2.toString());
typeIdMap2.put(EntityTypes.TEZ_TASK_ID.name(), taskID2.toString());
typeIdMap2.put(EntityTypes.TEZ_TASK_ATTEMPT_ID.name(), attemptID2.toString());
-
}
@Test
public void testGetTimelineEntityGroupIdByPrimaryFilter() {
+ TimelineCachePluginImpl plugin = createPlugin(100, null);
for (Entry<String, String> entry : typeIdMap1.entrySet()) {
NameValuePair primaryFilter = new NameValuePair(entry.getKey(), entry.getValue());
Assert.assertNull(plugin.getTimelineEntityGroupId(EntityTypes.TEZ_APPLICATION.name(),
@@ -108,19 +124,38 @@ public class TestTimelineCachePluginImpl {
Assert.assertNull(groupIds);
continue;
}
+ Assert.assertEquals(3, groupIds.size());
+ Iterator<TimelineEntityGroupId> iter = groupIds.iterator();
+ while (iter.hasNext()) {
+ TimelineEntityGroupId groupId = iter.next();
+ Assert.assertEquals(appId1, groupId.getApplicationId());
+ Assert.assertTrue(getGroupIds(dagID1, appId1, 100).contains(groupId.getTimelineEntityGroupId()));
+ }
+ }
+ }
+
+ @Test
+ public void testGetTimelineEntityGroupIdByIdDefaultConfig() {
+ TimelineCachePluginImpl plugin = createPlugin(-1, null);
+ for (Entry<String, String> entry : typeIdMap1.entrySet()) {
+ Set<TimelineEntityGroupId> groupIds = plugin.getTimelineEntityGroupId(entry.getValue(), entry.getKey());
+ if (entry.getKey().equals(EntityTypes.TEZ_DAG_ID.name())) {
+ Assert.assertNull(groupIds);
+ continue;
+ }
Assert.assertEquals(2, groupIds.size());
Iterator<TimelineEntityGroupId> iter = groupIds.iterator();
while (iter.hasNext()) {
TimelineEntityGroupId groupId = iter.next();
Assert.assertEquals(appId1, groupId.getApplicationId());
- Assert.assertTrue((dagID1.toString().equals(groupId.getTimelineEntityGroupId()))
- || (appId1.toString().equals(groupId.getTimelineEntityGroupId())));
+ Assert.assertTrue(getGroupIds(dagID1, appId1).contains(groupId.getTimelineEntityGroupId()));
}
}
}
@Test
- public void testGetTimelineEntityGroupIdById() {
+ public void testGetTimelineEntityGroupIdByIdNoGroupingConf() {
+ TimelineCachePluginImpl plugin = createPlugin(1, null);
for (Entry<String, String> entry : typeIdMap1.entrySet()) {
Set<TimelineEntityGroupId> groupIds = plugin.getTimelineEntityGroupId(entry.getValue(), entry.getKey());
if (entry.getKey().equals(EntityTypes.TEZ_DAG_ID.name())) {
@@ -132,14 +167,90 @@ public class TestTimelineCachePluginImpl {
while (iter.hasNext()) {
TimelineEntityGroupId groupId = iter.next();
Assert.assertEquals(appId1, groupId.getApplicationId());
- Assert.assertTrue((dagID1.toString().equals(groupId.getTimelineEntityGroupId()))
- || (appId1.toString().equals(groupId.getTimelineEntityGroupId())));
+ Assert.assertTrue(getGroupIds(dagID1, appId1).contains(groupId.getTimelineEntityGroupId()));
+ }
+ }
+ }
+
+ @Test
+ public void testGetTimelineEntityGroupIdById() {
+ TimelineCachePluginImpl plugin = createPlugin(100, null);
+ for (Entry<String, String> entry : typeIdMap1.entrySet()) {
+ Set<TimelineEntityGroupId> groupIds = plugin.getTimelineEntityGroupId(entry.getValue(), entry.getKey());
+ if (entry.getKey().equals(EntityTypes.TEZ_DAG_ID.name())) {
+ Assert.assertNull(groupIds);
+ continue;
+ }
+ Assert.assertEquals(3, groupIds.size());
+ Iterator<TimelineEntityGroupId> iter = groupIds.iterator();
+ while (iter.hasNext()) {
+ TimelineEntityGroupId groupId = iter.next();
+ Assert.assertEquals(appId1, groupId.getApplicationId());
+ Assert.assertTrue(getGroupIds(dagID1, appId1, 100).contains(groupId.getTimelineEntityGroupId()));
+ }
+ }
+ }
+
+ @Test
+ public void testGetTimelineEntityGroupIdByIdWithOldGroupIdsSingle() {
+ TimelineCachePluginImpl plugin = createPlugin(100, "50");
+ for (Entry<String, String> entry : typeIdMap2.entrySet()) {
+ Set<TimelineEntityGroupId> groupIds = plugin.getTimelineEntityGroupId(entry.getValue(), entry.getKey());
+ if (entry.getKey().equals(EntityTypes.TEZ_DAG_ID.name())) {
+ Assert.assertNull(groupIds);
+ continue;
+ }
+ Assert.assertEquals(4, groupIds.size());
+ Iterator<TimelineEntityGroupId> iter = groupIds.iterator();
+ while (iter.hasNext()) {
+ TimelineEntityGroupId groupId = iter.next();
+ Assert.assertEquals(appId2, groupId.getApplicationId());
+ Assert.assertTrue(getGroupIds(dagID2, appId2, 100, 50).contains(groupId.getTimelineEntityGroupId()));
+ }
+ }
+ }
+
+ @Test
+ public void testGetTimelineEntityGroupIdByIdWithOldGroupIdsMultiple() {
+ TimelineCachePluginImpl plugin = createPlugin(100, "25, 50");
+ for (Entry<String, String> entry : typeIdMap2.entrySet()) {
+ Set<TimelineEntityGroupId> groupIds = plugin.getTimelineEntityGroupId(entry.getValue(), entry.getKey());
+ if (entry.getKey().equals(EntityTypes.TEZ_DAG_ID.name())) {
+ Assert.assertNull(groupIds);
+ continue;
+ }
+ Assert.assertEquals(5, groupIds.size());
+ Iterator<TimelineEntityGroupId> iter = groupIds.iterator();
+ while (iter.hasNext()) {
+ TimelineEntityGroupId groupId = iter.next();
+ Assert.assertEquals(appId2, groupId.getApplicationId());
+ Assert.assertTrue(getGroupIds(dagID2, appId2, 100, 25, 50).contains(groupId.getTimelineEntityGroupId()));
+ }
+ }
+ }
+
+ @Test
+ public void testGetTimelineEntityGroupIdByIdWithOldGroupIdsEmpty() {
+ TimelineCachePluginImpl plugin = createPlugin(100, "");
+ for (Entry<String, String> entry : typeIdMap2.entrySet()) {
+ Set<TimelineEntityGroupId> groupIds = plugin.getTimelineEntityGroupId(entry.getValue(), entry.getKey());
+ if (entry.getKey().equals(EntityTypes.TEZ_DAG_ID.name())) {
+ Assert.assertNull(groupIds);
+ continue;
+ }
+ Assert.assertEquals(3, groupIds.size());
+ Iterator<TimelineEntityGroupId> iter = groupIds.iterator();
+ while (iter.hasNext()) {
+ TimelineEntityGroupId groupId = iter.next();
+ Assert.assertEquals(appId2, groupId.getApplicationId());
+ Assert.assertTrue(getGroupIds(dagID2, appId2, 100).contains(groupId.getTimelineEntityGroupId()));
}
}
}
@Test
public void testGetTimelineEntityGroupIdByIds() {
+ TimelineCachePluginImpl plugin = createPlugin(100, null);
for (Entry<String, String> entry : typeIdMap1.entrySet()) {
SortedSet<String> entityIds = new TreeSet<String>();
entityIds.add(entry.getValue());
@@ -150,31 +261,36 @@ public class TestTimelineCachePluginImpl {
Assert.assertNull(groupIds);
continue;
}
- Assert.assertEquals(4, groupIds.size());
+ Assert.assertEquals(6, groupIds.size());
int found = 0;
Iterator<TimelineEntityGroupId> iter = groupIds.iterator();
while (iter.hasNext()) {
TimelineEntityGroupId groupId = iter.next();
- if (groupId.getApplicationId().equals(appId1)
- && groupId.getTimelineEntityGroupId().equals(dagID1.toString())) {
- ++found;
- } else if (groupId.getApplicationId().equals(appId2)
- && groupId.getTimelineEntityGroupId().equals(dagID2.toString())) {
- ++found;
- } else if (groupId.getApplicationId().equals(appId1)
- && groupId.getTimelineEntityGroupId().equals(appId1.toString())) {
- ++found;
- } else if (groupId.getApplicationId().equals(appId2)
- && groupId.getTimelineEntityGroupId().equals(appId2.toString())) {
- ++found;
+ if (groupId.getApplicationId().equals(appId1)) {
+ String entityGroupId = groupId.getTimelineEntityGroupId();
+ if (getGroupIds(dagID1, appId1, 100).contains(entityGroupId)) {
+ ++found;
+ } else {
+ Assert.fail("Unexpected group id: " + entityGroupId);
+ }
+ } else if (groupId.getApplicationId().equals(appId2)) {
+ String entityGroupId = groupId.getTimelineEntityGroupId();
+ if (getGroupIds(dagID2, appId2, 100).contains(entityGroupId)) {
+ ++found;
+ } else {
+ Assert.fail("Unexpected group id: " + entityGroupId);
+ }
+ } else {
+ Assert.fail("Unexpected appId: " + groupId.getApplicationId());
}
}
- Assert.assertEquals("All groupIds not returned", 4, found);
+ Assert.assertEquals("All groupIds not returned", 6, found);
}
}
@Test
public void testInvalidIds() {
+ TimelineCachePluginImpl plugin = createPlugin(-1, null);
Assert.assertNull(plugin.getTimelineEntityGroupId(EntityTypes.TEZ_DAG_ID.name(),
vertexID1.toString()));
Assert.assertNull(plugin.getTimelineEntityGroupId(EntityTypes.TEZ_VERTEX_ID.name(),
@@ -190,6 +306,7 @@ public class TestTimelineCachePluginImpl {
@Test
public void testInvalidTypeRequests() {
+ TimelineCachePluginImpl plugin = createPlugin(-1, null);
Assert.assertNull(plugin.getTimelineEntityGroupId(EntityTypes.TEZ_APPLICATION.name(),
appId1.toString()));
Assert.assertNull(plugin.getTimelineEntityGroupId(EntityTypes.TEZ_APPLICATION_ATTEMPT.name(),
@@ -206,7 +323,7 @@ public class TestTimelineCachePluginImpl {
@Test
public void testContainerIdConversion() {
-
+ TimelineCachePluginImpl plugin = createPlugin(-1, null);
String entityType = EntityTypes.TEZ_CONTAINER_ID.name();
SortedSet<String> entityIds = new TreeSet<String>();
entityIds.add("tez_" + cId1.toString());
@@ -255,6 +372,13 @@ public class TestTimelineCachePluginImpl {
}
}
Assert.assertEquals("All groupIds not returned", 1, found);
+ }
+ private Set<String> getGroupIds(TezDAGID dagId, ApplicationId appId, int ... allNumDagsPerGroup) {
+ HashSet<String> groupIds = Sets.newHashSet(dagId.toString(), appId.toString());
+ for (int numDagsPerGroup : allNumDagsPerGroup) {
+ groupIds.add(dagId.getGroupId(numDagsPerGroup));
+ }
+ return groupIds;
}
}