You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by zj...@apache.org on 2015/03/03 20:32:20 UTC

[42/43] hadoop git commit: YARN-3210. Refactored timeline aggregator according to new code organization proposed in YARN-3166. Contributed by Li Lu.

http://git-wip-us.apache.org/repos/asf/hadoop/blob/d3ff7f06/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/aggregator/TestPerNodeAggregatorServer.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/aggregator/TestPerNodeAggregatorServer.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/aggregator/TestPerNodeAggregatorServer.java
deleted file mode 100644
index 902047d..0000000
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/aggregator/TestPerNodeAggregatorServer.java
+++ /dev/null
@@ -1,149 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.yarn.server.timelineservice.aggregator;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
-import static org.mockito.Mockito.doReturn;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.spy;
-import static org.mockito.Mockito.when;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.util.ExitUtil;
-import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
-import org.apache.hadoop.yarn.api.records.ApplicationId;
-import org.apache.hadoop.yarn.api.records.ContainerId;
-import org.apache.hadoop.yarn.server.api.ContainerInitializationContext;
-import org.apache.hadoop.yarn.server.api.ContainerTerminationContext;
-import org.junit.Test;
-
-public class TestPerNodeAggregatorServer {
-  private ApplicationAttemptId appAttemptId;
-
-  public TestPerNodeAggregatorServer() {
-    ApplicationId appId =
-        ApplicationId.newInstance(System.currentTimeMillis(), 1);
-    appAttemptId = ApplicationAttemptId.newInstance(appId, 1);
-  }
-
-  @Test
-  public void testAddApplication() throws Exception {
-    PerNodeAggregatorServer aggregator = createAggregatorAndAddApplication();
-    // aggregator should have a single app
-    assertTrue(aggregator.hasApplication(
-        appAttemptId.getApplicationId().toString()));
-    aggregator.close();
-  }
-
-  @Test
-  public void testAddApplicationNonAMContainer() throws Exception {
-    PerNodeAggregatorServer aggregator = createAggregator();
-
-    ContainerId containerId = getContainerId(2L); // not an AM
-    ContainerInitializationContext context =
-        mock(ContainerInitializationContext.class);
-    when(context.getContainerId()).thenReturn(containerId);
-    aggregator.initializeContainer(context);
-    // aggregator should not have that app
-    assertFalse(aggregator.hasApplication(
-        appAttemptId.getApplicationId().toString()));
-  }
-
-  @Test
-  public void testRemoveApplication() throws Exception {
-    PerNodeAggregatorServer aggregator = createAggregatorAndAddApplication();
-    // aggregator should have a single app
-    String appIdStr = appAttemptId.getApplicationId().toString();
-    assertTrue(aggregator.hasApplication(appIdStr));
-
-    ContainerId containerId = getAMContainerId();
-    ContainerTerminationContext context =
-        mock(ContainerTerminationContext.class);
-    when(context.getContainerId()).thenReturn(containerId);
-    aggregator.stopContainer(context);
-    // aggregator should not have that app
-    assertFalse(aggregator.hasApplication(appIdStr));
-    aggregator.close();
-  }
-
-  @Test
-  public void testRemoveApplicationNonAMContainer() throws Exception {
-    PerNodeAggregatorServer aggregator = createAggregatorAndAddApplication();
-    // aggregator should have a single app
-    String appIdStr = appAttemptId.getApplicationId().toString();
-    assertTrue(aggregator.hasApplication(appIdStr));
-
-    ContainerId containerId = getContainerId(2L); // not an AM
-    ContainerTerminationContext context =
-        mock(ContainerTerminationContext.class);
-    when(context.getContainerId()).thenReturn(containerId);
-    aggregator.stopContainer(context);
-    // aggregator should still have that app
-    assertTrue(aggregator.hasApplication(appIdStr));
-    aggregator.close();
-  }
-
-  @Test(timeout = 60000)
-  public void testLaunch() throws Exception {
-    ExitUtil.disableSystemExit();
-    PerNodeAggregatorServer server = null;
-    try {
-      server =
-          PerNodeAggregatorServer.launchServer(new String[0]);
-    } catch (ExitUtil.ExitException e) {
-      assertEquals(0, e.status);
-      ExitUtil.resetFirstExitException();
-      fail();
-    } finally {
-      if (server != null) {
-        server.stop();
-      }
-    }
-  }
-
-  private PerNodeAggregatorServer createAggregatorAndAddApplication() {
-    PerNodeAggregatorServer aggregator = createAggregator();
-    // create an AM container
-    ContainerId containerId = getAMContainerId();
-    ContainerInitializationContext context =
-        mock(ContainerInitializationContext.class);
-    when(context.getContainerId()).thenReturn(containerId);
-    aggregator.initializeContainer(context);
-    return aggregator;
-  }
-
-  private PerNodeAggregatorServer createAggregator() {
-    AppLevelServiceManager serviceManager = spy(new AppLevelServiceManager());
-    doReturn(new Configuration()).when(serviceManager).getConfig();
-    PerNodeAggregatorServer aggregator =
-        spy(new PerNodeAggregatorServer(serviceManager));
-    return aggregator;
-  }
-
-  private ContainerId getAMContainerId() {
-    return getContainerId(1L);
-  }
-
-  private ContainerId getContainerId(long id) {
-    return ContainerId.newContainerId(appAttemptId, id);
-  }
-}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/d3ff7f06/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/aggregator/TestPerNodeTimelineAggregatorsAuxService.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/aggregator/TestPerNodeTimelineAggregatorsAuxService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/aggregator/TestPerNodeTimelineAggregatorsAuxService.java
new file mode 100644
index 0000000..1c89ead
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/aggregator/TestPerNodeTimelineAggregatorsAuxService.java
@@ -0,0 +1,150 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.timelineservice.aggregator;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.when;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.util.ExitUtil;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.server.api.ContainerInitializationContext;
+import org.apache.hadoop.yarn.server.api.ContainerTerminationContext;
+import org.junit.Test;
+
+public class TestPerNodeTimelineAggregatorsAuxService {
+  private ApplicationAttemptId appAttemptId;
+
+  public TestPerNodeTimelineAggregatorsAuxService() {
+    ApplicationId appId =
+        ApplicationId.newInstance(System.currentTimeMillis(), 1);
+    appAttemptId = ApplicationAttemptId.newInstance(appId, 1);
+  }
+
+  @Test
+  public void testAddApplication() throws Exception {
+    PerNodeTimelineAggregatorsAuxService auxService = createAggregatorAndAddApplication();
+    // auxService should have a single app
+    assertTrue(auxService.hasApplication(
+        appAttemptId.getApplicationId().toString()));
+    auxService.close();
+  }
+
+  @Test
+  public void testAddApplicationNonAMContainer() throws Exception {
+    PerNodeTimelineAggregatorsAuxService auxService = createAggregator();
+
+    ContainerId containerId = getContainerId(2L); // not an AM
+    ContainerInitializationContext context =
+        mock(ContainerInitializationContext.class);
+    when(context.getContainerId()).thenReturn(containerId);
+    auxService.initializeContainer(context);
+    // auxService should not have that app
+    assertFalse(auxService.hasApplication(
+        appAttemptId.getApplicationId().toString()));
+  }
+
+  @Test
+  public void testRemoveApplication() throws Exception {
+    PerNodeTimelineAggregatorsAuxService auxService = createAggregatorAndAddApplication();
+    // auxService should have a single app
+    String appIdStr = appAttemptId.getApplicationId().toString();
+    assertTrue(auxService.hasApplication(appIdStr));
+
+    ContainerId containerId = getAMContainerId();
+    ContainerTerminationContext context =
+        mock(ContainerTerminationContext.class);
+    when(context.getContainerId()).thenReturn(containerId);
+    auxService.stopContainer(context);
+    // auxService should not have that app
+    assertFalse(auxService.hasApplication(appIdStr));
+    auxService.close();
+  }
+
+  @Test
+  public void testRemoveApplicationNonAMContainer() throws Exception {
+    PerNodeTimelineAggregatorsAuxService auxService = createAggregatorAndAddApplication();
+    // auxService should have a single app
+    String appIdStr = appAttemptId.getApplicationId().toString();
+    assertTrue(auxService.hasApplication(appIdStr));
+
+    ContainerId containerId = getContainerId(2L); // not an AM
+    ContainerTerminationContext context =
+        mock(ContainerTerminationContext.class);
+    when(context.getContainerId()).thenReturn(containerId);
+    auxService.stopContainer(context);
+    // auxService should still have that app
+    assertTrue(auxService.hasApplication(appIdStr));
+    auxService.close();
+  }
+
+  @Test(timeout = 60000)
+  public void testLaunch() throws Exception {
+    ExitUtil.disableSystemExit();
+    PerNodeTimelineAggregatorsAuxService auxService = null;
+    try {
+      auxService =
+          PerNodeTimelineAggregatorsAuxService.launchServer(new String[0]);
+    } catch (ExitUtil.ExitException e) {
+      assertEquals(0, e.status);
+      ExitUtil.resetFirstExitException();
+      fail();
+    } finally {
+      if (auxService != null) {
+        auxService.stop();
+      }
+    }
+  }
+
+  private PerNodeTimelineAggregatorsAuxService createAggregatorAndAddApplication() {
+    PerNodeTimelineAggregatorsAuxService auxService = createAggregator();
+    // create an AM container
+    ContainerId containerId = getAMContainerId();
+    ContainerInitializationContext context =
+        mock(ContainerInitializationContext.class);
+    when(context.getContainerId()).thenReturn(containerId);
+    auxService.initializeContainer(context);
+    return auxService;
+  }
+
+  private PerNodeTimelineAggregatorsAuxService createAggregator() {
+    TimelineAggregatorsCollection
+        aggregatorsCollection = spy(new TimelineAggregatorsCollection());
+    doReturn(new Configuration()).when(aggregatorsCollection).getConfig();
+    PerNodeTimelineAggregatorsAuxService auxService =
+        spy(new PerNodeTimelineAggregatorsAuxService(aggregatorsCollection));
+    return auxService;
+  }
+
+  private ContainerId getAMContainerId() {
+    return getContainerId(1L);
+  }
+
+  private ContainerId getContainerId(long id) {
+    return ContainerId.newContainerId(appAttemptId, id);
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/d3ff7f06/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/aggregator/TestTimelineAggregator.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/aggregator/TestTimelineAggregator.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/aggregator/TestTimelineAggregator.java
new file mode 100644
index 0000000..821e455
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/aggregator/TestTimelineAggregator.java
@@ -0,0 +1,23 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.timelineservice.aggregator;
+
+public class TestTimelineAggregator {
+
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/d3ff7f06/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/aggregator/TestTimelineAggregatorsCollection.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/aggregator/TestTimelineAggregatorsCollection.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/aggregator/TestTimelineAggregatorsCollection.java
new file mode 100644
index 0000000..cec1d71
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/aggregator/TestTimelineAggregatorsCollection.java
@@ -0,0 +1,108 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.timelineservice.aggregator;
+
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.spy;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+
+import com.sun.jersey.core.impl.provider.entity.XMLJAXBElementProvider;
+import org.apache.hadoop.conf.Configuration;
+import org.junit.Test;
+
+public class TestTimelineAggregatorsCollection {
+
+  @Test(timeout=60000)
+  public void testMultithreadedAdd() throws Exception {
+    final TimelineAggregatorsCollection aggregatorCollection =
+        spy(new TimelineAggregatorsCollection());
+    doReturn(new Configuration()).when(aggregatorCollection).getConfig();
+
+    final int NUM_APPS = 5;
+    List<Callable<Boolean>> tasks = new ArrayList<Callable<Boolean>>();
+    for (int i = 0; i < NUM_APPS; i++) {
+      final String appId = String.valueOf(i);
+      Callable<Boolean> task = new Callable<Boolean>() {
+        public Boolean call() {
+          AppLevelTimelineAggregator aggregator =
+              new AppLevelTimelineAggregator(appId);
+          return (aggregatorCollection.putIfAbsent(appId, aggregator) == aggregator);
+        }
+      };
+      tasks.add(task);
+    }
+    ExecutorService executor = Executors.newFixedThreadPool(NUM_APPS);
+    try {
+      List<Future<Boolean>> futures = executor.invokeAll(tasks);
+      for (Future<Boolean> future: futures) {
+        assertTrue(future.get());
+      }
+    } finally {
+      executor.shutdownNow();
+    }
+    // check the keys
+    for (int i = 0; i < NUM_APPS; i++) {
+      assertTrue(aggregatorCollection.containsKey(String.valueOf(i)));
+    }
+  }
+
+  @Test
+  public void testMultithreadedAddAndRemove() throws Exception {
+    final TimelineAggregatorsCollection aggregatorCollection =
+        spy(new TimelineAggregatorsCollection());
+    doReturn(new Configuration()).when(aggregatorCollection).getConfig();
+
+    final int NUM_APPS = 5;
+    List<Callable<Boolean>> tasks = new ArrayList<Callable<Boolean>>();
+    for (int i = 0; i < NUM_APPS; i++) {
+      final String appId = String.valueOf(i);
+      Callable<Boolean> task = new Callable<Boolean>() {
+        public Boolean call() {
+          AppLevelTimelineAggregator aggregator =
+              new AppLevelTimelineAggregator(appId);
+          boolean successPut =
+              (aggregatorCollection.putIfAbsent(appId, aggregator) == aggregator);
+          return successPut && aggregatorCollection.remove(appId);
+        }
+      };
+      tasks.add(task);
+    }
+    ExecutorService executor = Executors.newFixedThreadPool(NUM_APPS);
+    try {
+      List<Future<Boolean>> futures = executor.invokeAll(tasks);
+      for (Future<Boolean> future: futures) {
+        assertTrue(future.get());
+      }
+    } finally {
+      executor.shutdownNow();
+    }
+    // check the keys
+    for (int i = 0; i < NUM_APPS; i++) {
+      assertFalse(aggregatorCollection.containsKey(String.valueOf(i)));
+    }
+  }
+}