You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by zh...@apache.org on 2017/03/28 19:48:25 UTC

[44/50] [abbrv] tez git commit: TEZ-3665. TestATSV15HistoryLoggingService should use mocked TimelineClient (zhiyuany)

TEZ-3665. TestATSV15HistoryLoggingService should use mocked TimelineClient (zhiyuany)


Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/cbcb3a78
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/cbcb3a78
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/cbcb3a78

Branch: refs/heads/TEZ-1190
Commit: cbcb3a786e1af6a9ee61648ec9614686123182c3
Parents: 293c593
Author: Zhiyuan Yang <zh...@apache.org>
Authored: Wed Mar 22 21:14:51 2017 -0700
Committer: Zhiyuan Yang <zh...@apache.org>
Committed: Wed Mar 22 21:14:51 2017 -0700

----------------------------------------------------------------------
 .../ats/TestATSV15HistoryLoggingService.java    | 144 ++++++++-----------
 1 file changed, 58 insertions(+), 86 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tez/blob/cbcb3a78/tez-plugins/tez-yarn-timeline-history-with-fs/src/test/java/org/apache/tez/dag/history/logging/ats/TestATSV15HistoryLoggingService.java
----------------------------------------------------------------------
diff --git a/tez-plugins/tez-yarn-timeline-history-with-fs/src/test/java/org/apache/tez/dag/history/logging/ats/TestATSV15HistoryLoggingService.java b/tez-plugins/tez-yarn-timeline-history-with-fs/src/test/java/org/apache/tez/dag/history/logging/ats/TestATSV15HistoryLoggingService.java
index cbded35..ef5da81 100644
--- a/tez-plugins/tez-yarn-timeline-history-with-fs/src/test/java/org/apache/tez/dag/history/logging/ats/TestATSV15HistoryLoggingService.java
+++ b/tez-plugins/tez-yarn-timeline-history-with-fs/src/test/java/org/apache/tez/dag/history/logging/ats/TestATSV15HistoryLoggingService.java
@@ -24,6 +24,9 @@ import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
 
+import static org.mockito.Matchers.anyObject;
+import static org.mockito.Matchers.anyString;
+import static org.mockito.Matchers.anyVararg;
 import static org.mockito.Mockito.any;
 import static org.mockito.Mockito.eq;
 import static org.mockito.Mockito.mock;
@@ -68,13 +71,20 @@ import org.apache.tez.dag.records.TezTaskID;
 import org.apache.tez.dag.records.TezVertexID;
 import org.apache.tez.hadoop.shim.HadoopShim;
 import org.junit.Test;
+import org.mockito.Matchers;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
 
 public class TestATSV15HistoryLoggingService {
   private static ApplicationId appId = ApplicationId.newInstance(1000l, 1);
   private static ApplicationAttemptId attemptId = ApplicationAttemptId.newInstance(appId, 1);
   private static String user = "TEST_USER";
 
-  private InMemoryTimelineClient timelineClient;
+  private TimelineClient timelineClient;
+  Map<TimelineEntityGroupId, List<TimelineEntity>> entityLog;
+  final TimelineEntityGroupId DEFAULT_GROUP_ID =
+    TimelineEntityGroupId.newInstance(ApplicationId.newInstance(0, -1), "");
+
   private AppContext appContext;
 
   @Test(timeout=2000)
@@ -91,14 +101,14 @@ public class TestATSV15HistoryLoggingService {
       Thread.sleep(100);
     }
 
-    assertEquals(2, timelineClient.entityLog.size());
+    assertEquals(2, entityLog.size());
 
-    List<TimelineEntity> amEvents = timelineClient.entityLog.get(
+    List<TimelineEntity> amEvents = entityLog.get(
         TimelineEntityGroupId.newInstance(appId, appId.toString()));
     assertNotNull(amEvents);
     assertEquals(1, amEvents.size());
 
-    List<TimelineEntity> nonGroupedDagEvents = timelineClient.entityLog.get(
+    List<TimelineEntity> nonGroupedDagEvents = entityLog.get(
         TimelineEntityGroupId.newInstance(appId, dagId1.toString()));
     assertNotNull(nonGroupedDagEvents);
     assertEquals(4, nonGroupedDagEvents.size());
@@ -119,14 +129,14 @@ public class TestATSV15HistoryLoggingService {
       Thread.sleep(100);
     }
 
-    assertEquals(2, timelineClient.entityLog.size());
+    assertEquals(2, entityLog.size());
 
-    List<TimelineEntity> amEvents = timelineClient.entityLog.get(
+    List<TimelineEntity> amEvents = entityLog.get(
         TimelineEntityGroupId.newInstance(appId, appId.toString()));
     assertNotNull(amEvents);
     assertEquals(1, amEvents.size());
 
-    List<TimelineEntity> nonGroupedDagEvents = timelineClient.entityLog.get(
+    List<TimelineEntity> nonGroupedDagEvents = entityLog.get(
         TimelineEntityGroupId.newInstance(appId, dagId1.toString()));
     assertNotNull(nonGroupedDagEvents);
     assertEquals(4, nonGroupedDagEvents.size());
@@ -161,27 +171,27 @@ public class TestATSV15HistoryLoggingService {
     assertEquals(dagId1.getGroupId(numDagsPerGroup), dagId2.getGroupId(numDagsPerGroup));
     assertNotEquals(dagId2.getGroupId(numDagsPerGroup), dagId3.getGroupId(numDagsPerGroup));
 
-    assertEquals(3, timelineClient.entityLog.size());
+    assertEquals(3, entityLog.size());
 
-    List<TimelineEntity> amEvents = timelineClient.entityLog.get(
+    List<TimelineEntity> amEvents = entityLog.get(
         TimelineEntityGroupId.newInstance(appId, appId.toString()));
     assertNotNull(amEvents);
     assertEquals(3, amEvents.size());
 
-    List<TimelineEntity> nonGroupedDagEvents = timelineClient.entityLog.get(
+    List<TimelineEntity> nonGroupedDagEvents = entityLog.get(
         TimelineEntityGroupId.newInstance(appId, dagId1.toString()));
     assertNull(nonGroupedDagEvents);
 
-    List<TimelineEntity> groupedDagEvents = timelineClient.entityLog.get(
+    List<TimelineEntity> groupedDagEvents = entityLog.get(
         TimelineEntityGroupId.newInstance(appId, dagId1.getGroupId(numDagsPerGroup)));
     assertNotNull(groupedDagEvents);
     assertEquals(8, groupedDagEvents.size());
 
-    nonGroupedDagEvents = timelineClient.entityLog.get(
+    nonGroupedDagEvents = entityLog.get(
         TimelineEntityGroupId.newInstance(appId, dagId3.toString()));
     assertNull(nonGroupedDagEvents);
 
-    groupedDagEvents = timelineClient.entityLog.get(
+    groupedDagEvents = entityLog.get(
         TimelineEntityGroupId.newInstance(appId, dagId3.getGroupId(numDagsPerGroup)));
     assertNotNull(groupedDagEvents);
     assertEquals(4, groupedDagEvents.size());
@@ -219,7 +229,7 @@ public class TestATSV15HistoryLoggingService {
 
     // All calls made with session domain id.
     verify(historyACLPolicyManager, times(5)).updateTimelineEntityDomain(any(), eq("session-id"));
-    assertTrue(timelineClient.entityLog.size() > 0);
+    assertTrue(entityLog.size() > 0);
 
     service.stop();
   }
@@ -252,7 +262,7 @@ public class TestATSV15HistoryLoggingService {
 
     // History logging is disabled.
     verify(historyACLPolicyManager, times(0)).updateTimelineEntityDomain(any(), (String)any());
-    assertEquals(0, timelineClient.entityLog.size());
+    assertEquals(0, entityLog.size());
 
     service.stop();
   }
@@ -285,7 +295,7 @@ public class TestATSV15HistoryLoggingService {
 
     // No domain updates but history logging happened.
     verify(historyACLPolicyManager, times(0)).updateTimelineEntityDomain(any(), (String)any());
-    assertTrue(timelineClient.entityLog.size() > 0);
+    assertTrue(entityLog.size() > 0);
 
     service.stop();
   }
@@ -370,7 +380,7 @@ public class TestATSV15HistoryLoggingService {
 
     // No history logging calls were done
     verify(historyACLPolicyManager, times(0)).updateTimelineEntityDomain(any(), (String)any());
-    assertEquals(0, timelineClient.entityLog.size());
+    assertEquals(0, entityLog.size());
 
     service.stop();
   }
@@ -413,12 +423,12 @@ public class TestATSV15HistoryLoggingService {
     // AM events sent, dag events are not sent.
     verify(historyACLPolicyManager, times(1)).updateTimelineEntityDomain(any(), eq("session-id"));
     verify(historyACLPolicyManager, times(0)).updateTimelineEntityDomain(any(), eq("dag-id"));
-    assertEquals(1, timelineClient.entityLog.size());
+    assertEquals(1, entityLog.size());
 
     service.stop();
   }
 
-  private ATSV15HistoryLoggingService createService(int numDagsPerGroup) {
+  private ATSV15HistoryLoggingService createService(int numDagsPerGroup) throws IOException, YarnException {
     ATSV15HistoryLoggingService service = new ATSV15HistoryLoggingService();
     appContext = mock(AppContext.class);
     when(appContext.getApplicationID()).thenReturn(appId);
@@ -433,13 +443,40 @@ public class TestATSV15HistoryLoggingService {
     service.init(conf);
 
     // Set timeline service.
-    timelineClient = new InMemoryTimelineClient();
-    timelineClient.init(conf);
+    timelineClient = mock(TimelineClient.class);
+    entityLog = new HashMap<>();
+    //timelineClient.init(conf);
+    when(timelineClient.getDelegationToken(anyString())).thenReturn(null);
+    when(timelineClient.renewDelegationToken(Matchers.<Token<TimelineDelegationTokenIdentifier>>any())).thenReturn(0L);
+    when(timelineClient.putEntities(Matchers.<TimelineEntity>anyVararg())).thenAnswer(new Answer() {
+      @Override
+      public TimelinePutResponse answer(InvocationOnMock invocation) throws Throwable {
+        return putEntityHelper(DEFAULT_GROUP_ID, invocation.getArguments(), 0);
+      }
+    });
+    when(timelineClient.putEntities(any(ApplicationAttemptId.class), any(TimelineEntityGroupId.class), Matchers.<TimelineEntity>anyVararg())).thenAnswer(new Answer() {
+      @Override
+      public TimelinePutResponse answer(InvocationOnMock invocation) throws Throwable {
+        return putEntityHelper(invocation.getArgumentAt(1, TimelineEntityGroupId.class), invocation.getArguments(), 2);
+      }
+    });
     service.timelineClient = timelineClient;
 
     return service;
   }
 
+  private TimelinePutResponse putEntityHelper(TimelineEntityGroupId groupId, Object[] args, int firstEntityIdx) {
+    List<TimelineEntity> groupEntities = entityLog.get(groupId);
+    if (groupEntities == null) {
+      groupEntities = new ArrayList<>();
+      entityLog.put(groupId, groupEntities);
+    }
+    for (int i = firstEntityIdx; i < args.length; i++) {
+      groupEntities.add((TimelineEntity) args[i]);
+    }
+    return null;
+  }
+
   private List<DAGHistoryEvent> makeHistoryEvents(TezDAGID dagId,
                                                   ATSV15HistoryLoggingService service) {
     List<DAGHistoryEvent> historyEvents = new ArrayList<>();
@@ -463,69 +500,4 @@ public class TestATSV15HistoryLoggingService {
             null, null)));
     return historyEvents;
   }
-
-  private static class InMemoryTimelineClient extends TimelineClient {
-    Map<TimelineEntityGroupId, List<TimelineEntity>> entityLog = new HashMap<>();
-
-    protected InMemoryTimelineClient() {
-      super("InMemoryTimelineClient");
-    }
-
-    @Override
-    public void flush() throws IOException {
-    }
-
-    public static final ApplicationId DEFAULT_APP_ID = ApplicationId.newInstance(0, -1);
-    public static final TimelineEntityGroupId DEFAULT_GROUP_ID =
-        TimelineEntityGroupId.newInstance(DEFAULT_APP_ID, "");
-
-    @Override
-    public synchronized TimelinePutResponse putEntities(TimelineEntity... entities)
-        throws IOException, YarnException {
-      return putEntities(null, DEFAULT_GROUP_ID, entities);
-    }
-
-    @Override
-    public TimelinePutResponse putEntities(ApplicationAttemptId appAttemptId,
-        TimelineEntityGroupId groupId,
-        TimelineEntity... entities) throws IOException, YarnException {
-      List<TimelineEntity> groupEntities = entityLog.get(groupId);
-      if (groupEntities == null) {
-        groupEntities = new ArrayList<>();
-        entityLog.put(groupId, groupEntities);
-      }
-      for (TimelineEntity entity : entities) {
-        groupEntities.add(entity);
-      }
-      return null;
-    }
-
-    @Override
-    public void putDomain(TimelineDomain domain) throws IOException, YarnException {
-      throw new UnsupportedOperationException();
-    }
-
-    @Override
-    public void putDomain(ApplicationAttemptId appAttemptId, TimelineDomain domain)
-        throws IOException, YarnException {
-      throw new UnsupportedOperationException();
-    }
-
-    @Override
-    public Token<TimelineDelegationTokenIdentifier> getDelegationToken(String renewer)
-        throws IOException, YarnException {
-      return null;
-    }
-
-    @Override
-    public long renewDelegationToken(Token<TimelineDelegationTokenIdentifier> timelineDT)
-        throws IOException, YarnException {
-      return 0;
-    }
-
-    @Override
-    public void cancelDelegationToken(Token<TimelineDelegationTokenIdentifier> timelineDT)
-        throws IOException, YarnException {
-    }
-  }
 }