You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by zj...@apache.org on 2015/03/03 20:32:20 UTC
[42/43] hadoop git commit: YARN-3210. Refactored timeline aggregator
according to new code organization proposed in YARN-3166. Contributed by Li
Lu.
http://git-wip-us.apache.org/repos/asf/hadoop/blob/d3ff7f06/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/aggregator/TestPerNodeAggregatorServer.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/aggregator/TestPerNodeAggregatorServer.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/aggregator/TestPerNodeAggregatorServer.java
deleted file mode 100644
index 902047d..0000000
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/aggregator/TestPerNodeAggregatorServer.java
+++ /dev/null
@@ -1,149 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.yarn.server.timelineservice.aggregator;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
-import static org.mockito.Mockito.doReturn;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.spy;
-import static org.mockito.Mockito.when;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.util.ExitUtil;
-import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
-import org.apache.hadoop.yarn.api.records.ApplicationId;
-import org.apache.hadoop.yarn.api.records.ContainerId;
-import org.apache.hadoop.yarn.server.api.ContainerInitializationContext;
-import org.apache.hadoop.yarn.server.api.ContainerTerminationContext;
-import org.junit.Test;
-
-public class TestPerNodeAggregatorServer {
- private ApplicationAttemptId appAttemptId;
-
- public TestPerNodeAggregatorServer() {
- ApplicationId appId =
- ApplicationId.newInstance(System.currentTimeMillis(), 1);
- appAttemptId = ApplicationAttemptId.newInstance(appId, 1);
- }
-
- @Test
- public void testAddApplication() throws Exception {
- PerNodeAggregatorServer aggregator = createAggregatorAndAddApplication();
- // aggregator should have a single app
- assertTrue(aggregator.hasApplication(
- appAttemptId.getApplicationId().toString()));
- aggregator.close();
- }
-
- @Test
- public void testAddApplicationNonAMContainer() throws Exception {
- PerNodeAggregatorServer aggregator = createAggregator();
-
- ContainerId containerId = getContainerId(2L); // not an AM
- ContainerInitializationContext context =
- mock(ContainerInitializationContext.class);
- when(context.getContainerId()).thenReturn(containerId);
- aggregator.initializeContainer(context);
- // aggregator should not have that app
- assertFalse(aggregator.hasApplication(
- appAttemptId.getApplicationId().toString()));
- }
-
- @Test
- public void testRemoveApplication() throws Exception {
- PerNodeAggregatorServer aggregator = createAggregatorAndAddApplication();
- // aggregator should have a single app
- String appIdStr = appAttemptId.getApplicationId().toString();
- assertTrue(aggregator.hasApplication(appIdStr));
-
- ContainerId containerId = getAMContainerId();
- ContainerTerminationContext context =
- mock(ContainerTerminationContext.class);
- when(context.getContainerId()).thenReturn(containerId);
- aggregator.stopContainer(context);
- // aggregator should not have that app
- assertFalse(aggregator.hasApplication(appIdStr));
- aggregator.close();
- }
-
- @Test
- public void testRemoveApplicationNonAMContainer() throws Exception {
- PerNodeAggregatorServer aggregator = createAggregatorAndAddApplication();
- // aggregator should have a single app
- String appIdStr = appAttemptId.getApplicationId().toString();
- assertTrue(aggregator.hasApplication(appIdStr));
-
- ContainerId containerId = getContainerId(2L); // not an AM
- ContainerTerminationContext context =
- mock(ContainerTerminationContext.class);
- when(context.getContainerId()).thenReturn(containerId);
- aggregator.stopContainer(context);
- // aggregator should still have that app
- assertTrue(aggregator.hasApplication(appIdStr));
- aggregator.close();
- }
-
- @Test(timeout = 60000)
- public void testLaunch() throws Exception {
- ExitUtil.disableSystemExit();
- PerNodeAggregatorServer server = null;
- try {
- server =
- PerNodeAggregatorServer.launchServer(new String[0]);
- } catch (ExitUtil.ExitException e) {
- assertEquals(0, e.status);
- ExitUtil.resetFirstExitException();
- fail();
- } finally {
- if (server != null) {
- server.stop();
- }
- }
- }
-
- private PerNodeAggregatorServer createAggregatorAndAddApplication() {
- PerNodeAggregatorServer aggregator = createAggregator();
- // create an AM container
- ContainerId containerId = getAMContainerId();
- ContainerInitializationContext context =
- mock(ContainerInitializationContext.class);
- when(context.getContainerId()).thenReturn(containerId);
- aggregator.initializeContainer(context);
- return aggregator;
- }
-
- private PerNodeAggregatorServer createAggregator() {
- AppLevelServiceManager serviceManager = spy(new AppLevelServiceManager());
- doReturn(new Configuration()).when(serviceManager).getConfig();
- PerNodeAggregatorServer aggregator =
- spy(new PerNodeAggregatorServer(serviceManager));
- return aggregator;
- }
-
- private ContainerId getAMContainerId() {
- return getContainerId(1L);
- }
-
- private ContainerId getContainerId(long id) {
- return ContainerId.newContainerId(appAttemptId, id);
- }
-}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/d3ff7f06/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/aggregator/TestPerNodeTimelineAggregatorsAuxService.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/aggregator/TestPerNodeTimelineAggregatorsAuxService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/aggregator/TestPerNodeTimelineAggregatorsAuxService.java
new file mode 100644
index 0000000..1c89ead
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/aggregator/TestPerNodeTimelineAggregatorsAuxService.java
@@ -0,0 +1,150 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.timelineservice.aggregator;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.when;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.util.ExitUtil;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.server.api.ContainerInitializationContext;
+import org.apache.hadoop.yarn.server.api.ContainerTerminationContext;
+import org.junit.Test;
+
+public class TestPerNodeTimelineAggregatorsAuxService {
+ private ApplicationAttemptId appAttemptId;
+
+ public TestPerNodeTimelineAggregatorsAuxService() {
+ ApplicationId appId =
+ ApplicationId.newInstance(System.currentTimeMillis(), 1);
+ appAttemptId = ApplicationAttemptId.newInstance(appId, 1);
+ }
+
+ @Test
+ public void testAddApplication() throws Exception {
+ PerNodeTimelineAggregatorsAuxService auxService = createAggregatorAndAddApplication();
+ // auxService should have a single app
+ assertTrue(auxService.hasApplication(
+ appAttemptId.getApplicationId().toString()));
+ auxService.close();
+ }
+
+ @Test
+ public void testAddApplicationNonAMContainer() throws Exception {
+ PerNodeTimelineAggregatorsAuxService auxService = createAggregator();
+
+ ContainerId containerId = getContainerId(2L); // not an AM
+ ContainerInitializationContext context =
+ mock(ContainerInitializationContext.class);
+ when(context.getContainerId()).thenReturn(containerId);
+ auxService.initializeContainer(context);
+ // auxService should not have that app
+ assertFalse(auxService.hasApplication(
+ appAttemptId.getApplicationId().toString()));
+ }
+
+ @Test
+ public void testRemoveApplication() throws Exception {
+ PerNodeTimelineAggregatorsAuxService auxService = createAggregatorAndAddApplication();
+ // auxService should have a single app
+ String appIdStr = appAttemptId.getApplicationId().toString();
+ assertTrue(auxService.hasApplication(appIdStr));
+
+ ContainerId containerId = getAMContainerId();
+ ContainerTerminationContext context =
+ mock(ContainerTerminationContext.class);
+ when(context.getContainerId()).thenReturn(containerId);
+ auxService.stopContainer(context);
+ // auxService should not have that app
+ assertFalse(auxService.hasApplication(appIdStr));
+ auxService.close();
+ }
+
+ @Test
+ public void testRemoveApplicationNonAMContainer() throws Exception {
+ PerNodeTimelineAggregatorsAuxService auxService = createAggregatorAndAddApplication();
+ // auxService should have a single app
+ String appIdStr = appAttemptId.getApplicationId().toString();
+ assertTrue(auxService.hasApplication(appIdStr));
+
+ ContainerId containerId = getContainerId(2L); // not an AM
+ ContainerTerminationContext context =
+ mock(ContainerTerminationContext.class);
+ when(context.getContainerId()).thenReturn(containerId);
+ auxService.stopContainer(context);
+ // auxService should still have that app
+ assertTrue(auxService.hasApplication(appIdStr));
+ auxService.close();
+ }
+
+ @Test(timeout = 60000)
+ public void testLaunch() throws Exception {
+ ExitUtil.disableSystemExit();
+ PerNodeTimelineAggregatorsAuxService auxService = null;
+ try {
+ auxService =
+ PerNodeTimelineAggregatorsAuxService.launchServer(new String[0]);
+ } catch (ExitUtil.ExitException e) {
+ assertEquals(0, e.status);
+ ExitUtil.resetFirstExitException();
+ fail();
+ } finally {
+ if (auxService != null) {
+ auxService.stop();
+ }
+ }
+ }
+
+ private PerNodeTimelineAggregatorsAuxService createAggregatorAndAddApplication() {
+ PerNodeTimelineAggregatorsAuxService auxService = createAggregator();
+ // create an AM container
+ ContainerId containerId = getAMContainerId();
+ ContainerInitializationContext context =
+ mock(ContainerInitializationContext.class);
+ when(context.getContainerId()).thenReturn(containerId);
+ auxService.initializeContainer(context);
+ return auxService;
+ }
+
+ private PerNodeTimelineAggregatorsAuxService createAggregator() {
+ TimelineAggregatorsCollection
+ aggregatorsCollection = spy(new TimelineAggregatorsCollection());
+ doReturn(new Configuration()).when(aggregatorsCollection).getConfig();
+ PerNodeTimelineAggregatorsAuxService auxService =
+ spy(new PerNodeTimelineAggregatorsAuxService(aggregatorsCollection));
+ return auxService;
+ }
+
+ private ContainerId getAMContainerId() {
+ return getContainerId(1L);
+ }
+
+ private ContainerId getContainerId(long id) {
+ return ContainerId.newContainerId(appAttemptId, id);
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/d3ff7f06/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/aggregator/TestTimelineAggregator.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/aggregator/TestTimelineAggregator.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/aggregator/TestTimelineAggregator.java
new file mode 100644
index 0000000..821e455
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/aggregator/TestTimelineAggregator.java
@@ -0,0 +1,23 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.timelineservice.aggregator;
+
+public class TestTimelineAggregator {
+
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/d3ff7f06/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/aggregator/TestTimelineAggregatorsCollection.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/aggregator/TestTimelineAggregatorsCollection.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/aggregator/TestTimelineAggregatorsCollection.java
new file mode 100644
index 0000000..cec1d71
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/aggregator/TestTimelineAggregatorsCollection.java
@@ -0,0 +1,108 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.timelineservice.aggregator;
+
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.spy;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+
+import com.sun.jersey.core.impl.provider.entity.XMLJAXBElementProvider;
+import org.apache.hadoop.conf.Configuration;
+import org.junit.Test;
+
+public class TestTimelineAggregatorsCollection {
+
+ @Test(timeout=60000)
+ public void testMultithreadedAdd() throws Exception {
+ final TimelineAggregatorsCollection aggregatorCollection =
+ spy(new TimelineAggregatorsCollection());
+ doReturn(new Configuration()).when(aggregatorCollection).getConfig();
+
+ final int NUM_APPS = 5;
+ List<Callable<Boolean>> tasks = new ArrayList<Callable<Boolean>>();
+ for (int i = 0; i < NUM_APPS; i++) {
+ final String appId = String.valueOf(i);
+ Callable<Boolean> task = new Callable<Boolean>() {
+ public Boolean call() {
+ AppLevelTimelineAggregator aggregator =
+ new AppLevelTimelineAggregator(appId);
+ return (aggregatorCollection.putIfAbsent(appId, aggregator) == aggregator);
+ }
+ };
+ tasks.add(task);
+ }
+ ExecutorService executor = Executors.newFixedThreadPool(NUM_APPS);
+ try {
+ List<Future<Boolean>> futures = executor.invokeAll(tasks);
+ for (Future<Boolean> future: futures) {
+ assertTrue(future.get());
+ }
+ } finally {
+ executor.shutdownNow();
+ }
+ // check the keys
+ for (int i = 0; i < NUM_APPS; i++) {
+ assertTrue(aggregatorCollection.containsKey(String.valueOf(i)));
+ }
+ }
+
+ @Test
+ public void testMultithreadedAddAndRemove() throws Exception {
+ final TimelineAggregatorsCollection aggregatorCollection =
+ spy(new TimelineAggregatorsCollection());
+ doReturn(new Configuration()).when(aggregatorCollection).getConfig();
+
+ final int NUM_APPS = 5;
+ List<Callable<Boolean>> tasks = new ArrayList<Callable<Boolean>>();
+ for (int i = 0; i < NUM_APPS; i++) {
+ final String appId = String.valueOf(i);
+ Callable<Boolean> task = new Callable<Boolean>() {
+ public Boolean call() {
+ AppLevelTimelineAggregator aggregator =
+ new AppLevelTimelineAggregator(appId);
+ boolean successPut =
+ (aggregatorCollection.putIfAbsent(appId, aggregator) == aggregator);
+ return successPut && aggregatorCollection.remove(appId);
+ }
+ };
+ tasks.add(task);
+ }
+ ExecutorService executor = Executors.newFixedThreadPool(NUM_APPS);
+ try {
+ List<Future<Boolean>> futures = executor.invokeAll(tasks);
+ for (Future<Boolean> future: futures) {
+ assertTrue(future.get());
+ }
+ } finally {
+ executor.shutdownNow();
+ }
+ // check the keys
+ for (int i = 0; i < NUM_APPS; i++) {
+ assertFalse(aggregatorCollection.containsKey(String.valueOf(i)));
+ }
+ }
+}