You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ambari.apache.org by jl...@apache.org on 2015/09/19 02:23:42 UTC
ambari git commit: AMBARI-13065: RU: Core Slaves restart schedule is
extremely slow on very large cluster (jluniya)
Repository: ambari
Updated Branches:
refs/heads/trunk 02cdab026 -> 25fd9a627
AMBARI-13065: RU: Core Slaves restart schedule is extremely slow on very large cluster (jluniya)
Project: http://git-wip-us.apache.org/repos/asf/ambari/repo
Commit: http://git-wip-us.apache.org/repos/asf/ambari/commit/25fd9a62
Tree: http://git-wip-us.apache.org/repos/asf/ambari/tree/25fd9a62
Diff: http://git-wip-us.apache.org/repos/asf/ambari/diff/25fd9a62
Branch: refs/heads/trunk
Commit: 25fd9a627e6a451fdf734ff8cdbd86c74f10634c
Parents: 02cdab0
Author: Jayush Luniya <jl...@hortonworks.com>
Authored: Fri Sep 18 17:23:32 2015 -0700
Committer: Jayush Luniya <jl...@hortonworks.com>
Committed: Fri Sep 18 17:23:32 2015 -0700
----------------------------------------------------------------------
.../actionmanager/ActionDBAccessorImpl.java | 70 ++++--
.../server/actionmanager/ActionScheduler.java | 10 +-
.../ambari/server/actionmanager/Request.java | 40 ++-
.../apache/ambari/server/utils/LoopBody.java | 27 +++
.../apache/ambari/server/utils/Parallel.java | 242 +++++++++++++++++++
.../ambari/server/utils/ParallelLoopResult.java | 63 +++++
.../AmbariManagementControllerTest.java | 3 +-
.../ambari/server/utils/TestParallel.java | 188 ++++++++++++++
8 files changed, 613 insertions(+), 30 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ambari/blob/25fd9a62/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionDBAccessorImpl.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionDBAccessorImpl.java b/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionDBAccessorImpl.java
index 3cd0681..8768590 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionDBAccessorImpl.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionDBAccessorImpl.java
@@ -48,6 +48,9 @@ import org.apache.ambari.server.orm.entities.RoleSuccessCriteriaEntity;
import org.apache.ambari.server.orm.entities.StageEntity;
import org.apache.ambari.server.state.Clusters;
import org.apache.ambari.server.state.Host;
+import org.apache.ambari.server.utils.LoopBody;
+import org.apache.ambari.server.utils.Parallel;
+import org.apache.ambari.server.utils.ParallelLoopResult;
import org.apache.ambari.server.utils.StageUtils;
import org.apache.commons.lang.StringUtils;
import org.slf4j.Logger;
@@ -134,11 +137,25 @@ public class ActionDBAccessorImpl implements ActionDBAccessor {
*/
@Override
public List<Stage> getAllStages(long requestId) {
- List<Stage> stages = new ArrayList<Stage>();
- for (StageEntity stageEntity : stageDAO.findByRequestId(requestId)) {
- stages.add(stageFactory.createExisting(stageEntity));
+ List<StageEntity> stageEntities = stageDAO.findByRequestId(requestId);
+ ParallelLoopResult<Stage> loopResult = Parallel.forLoop(stageEntities, new LoopBody<StageEntity, Stage>() {
+ @Override
+ public Stage run(StageEntity stageEntity) {
+ return stageFactory.createExisting(stageEntity);
+ }
+ });
+ if(loopResult.getIsCompleted()) {
+ return loopResult.getResult();
+ } else {
+ // Fetch any missing results sequentially
+ List<Stage> stages = loopResult.getResult();
+ for(int i = 0; i < stages.size(); i++) {
+ if(stages.get(i) == null) {
+ stages.set(i, stageFactory.createExisting(stageEntities.get(i)));
+ }
+ }
+ return stages;
}
- return stages;
}
@Override
@@ -207,15 +224,25 @@ public class ActionDBAccessorImpl implements ActionDBAccessor {
*/
@Override
public List<Stage> getStagesInProgress() {
- List<Stage> stages = new ArrayList<Stage>();
-
List<StageEntity> stageEntities = stageDAO.findByCommandStatuses(HostRoleStatus.IN_PROGRESS_STATUSES);
-
- for (StageEntity stageEntity : stageEntities) {
- stages.add(stageFactory.createExisting(stageEntity));
+ ParallelLoopResult<Stage> loopResult = Parallel.forLoop(stageEntities, new LoopBody<StageEntity, Stage>() {
+ @Override
+ public Stage run(StageEntity stageEntity) {
+ return stageFactory.createExisting(stageEntity);
+ }
+ });
+ if(loopResult.getIsCompleted()) {
+ return loopResult.getResult();
+ } else {
+ // Fetch any missing results sequentially
+ List<Stage> stages = loopResult.getResult();
+ for(int i = 0; i < stages.size(); i++) {
+ if(stages.get(i) == null) {
+ stages.set(i, stageFactory.createExisting(stageEntities.get(i)));
+ }
+ }
+ return stages;
}
-
- return stages;
}
/**
@@ -676,11 +703,24 @@ public class ActionDBAccessorImpl implements ActionDBAccessor {
@Override
public List<Request> getRequests(Collection<Long> requestIds){
List<RequestEntity> requestEntities = requestDAO.findByPks(requestIds);
- List<Request> requests = new ArrayList<Request>(requestEntities.size());
- for (RequestEntity requestEntity : requestEntities) {
- requests.add(requestFactory.createExisting(requestEntity));
+ ParallelLoopResult<Request> loopResult = Parallel.forLoop(requestEntities, new LoopBody<RequestEntity, Request>() {
+ @Override
+ public Request run(RequestEntity requestEntity) {
+ return requestFactory.createExisting(requestEntity);
+ }
+ });
+ if(loopResult.getIsCompleted()) {
+ return loopResult.getResult();
+ } else {
+ // Fetch any missing results sequentially
+ List<Request> requests = loopResult.getResult();
+ for(int i = 0; i < requests.size(); i++) {
+ if(requests.get(i) == null) {
+ requests.set(i, requestFactory.createExisting(requestEntities.get(i)));
+ }
+ }
+ return requests;
}
- return requests;
}
@Override
http://git-wip-us.apache.org/repos/asf/ambari/blob/25fd9a62/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionScheduler.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionScheduler.java b/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionScheduler.java
index e752b05..3f289b2 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionScheduler.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionScheduler.java
@@ -469,14 +469,8 @@ class ActionScheduler implements Runnable {
if (prevStageId > 0) {
// Find previous stage instance
- List<Stage> allStages = db.getAllStages(stage.getRequestId());
- Stage prevStage = null;
- for (Stage s : allStages) {
- if (s.getStageId() == prevStageId) {
- prevStage = s;
- break;
- }
- }
+ String actionId = StageUtils.getActionId(stage.getRequestId(), prevStageId);
+ Stage prevStage = db.getStage(actionId);
// If the previous stage is skippable then we shouldn't automatically fail the given stage
if (prevStage == null || prevStage.isSkippable()) {
http://git-wip-us.apache.org/repos/asf/ambari/blob/25fd9a62/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/Request.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/Request.java b/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/Request.java
index faebb20..26447e6 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/Request.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/Request.java
@@ -20,6 +20,7 @@ package org.apache.ambari.server.actionmanager;
import java.util.ArrayList;
import java.util.Collection;
+import java.util.Collections;
import java.util.List;
import com.google.inject.Inject;
@@ -36,6 +37,9 @@ import org.apache.ambari.server.orm.entities.RequestOperationLevelEntity;
import org.apache.ambari.server.orm.entities.RequestResourceFilterEntity;
import org.apache.ambari.server.orm.entities.StageEntity;
import org.apache.ambari.server.state.Clusters;
+import org.apache.ambari.server.utils.LoopBody;
+import org.apache.ambari.server.utils.Parallel;
+import org.apache.ambari.server.utils.ParallelLoopResult;
import org.apache.commons.lang.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -158,7 +162,7 @@ public class Request {
/**
* Load existing request from database
*/
- public Request(@Assisted RequestEntity entity, StageFactory stageFactory, Clusters clusters){
+ public Request(@Assisted RequestEntity entity, final StageFactory stageFactory, Clusters clusters){
if (entity == null) {
throw new RuntimeException("Request entity cannot be null.");
}
@@ -189,13 +193,37 @@ public class Request {
this.requestScheduleId = entity.getRequestScheduleEntity().getScheduleId();
}
- for (StageEntity stageEntity : entity.getStages()) {
- Stage stage = stageFactory.createExisting(stageEntity);
- stages.add(stage);
+ Collection<StageEntity> stageEntities = entity.getStages();
+ if(stageEntities == null || stageEntities.isEmpty()) {
+ stages = Collections.emptyList();
+ } else {
+ List<StageEntity> stageEntityList;
+ if(stageEntities instanceof List) {
+ stageEntityList = (List<StageEntity>) stageEntities;
+ } else {
+ stageEntityList = new ArrayList<StageEntity>(stageEntities);
+ }
+ ParallelLoopResult<Stage> loopResult = Parallel.forLoop(stageEntityList, new LoopBody<StageEntity, Stage>() {
+ @Override
+ public Stage run(StageEntity stageEntity) {
+ return stageFactory.createExisting(stageEntity);
+ }
+ });
+ List<Stage> stageList;
+ if(loopResult.getIsCompleted()) {
+ stageList = loopResult.getResult();
+ } else {
+ // Fetch any missing results sequentially
+ stageList = loopResult.getResult();
+ for(int i = 0; i < stages.size(); i++) {
+ if(stageList.get(i) == null) {
+ stageList.set(i, stageFactory.createExisting(stageEntityList.get(i)));
+ }
+ }
+ }
+ stages = stageList;
}
-
resourceFilters = filtersFromEntity(entity);
-
operationLevel = operationLevelFromEntity(entity);
}
http://git-wip-us.apache.org/repos/asf/ambari/blob/25fd9a62/ambari-server/src/main/java/org/apache/ambari/server/utils/LoopBody.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/utils/LoopBody.java b/ambari-server/src/main/java/org/apache/ambari/server/utils/LoopBody.java
new file mode 100644
index 0000000..0a93814
--- /dev/null
+++ b/ambari-server/src/main/java/org/apache/ambari/server/utils/LoopBody.java
@@ -0,0 +1,27 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ambari.server.utils;
+
+/**
+ * Interface for loop body invoked during each iteration of parallel loops
+ * @param <T> The type of source data that will be operated upon
+ * @param <R> The type of result data
+ */
+public interface LoopBody <T, R> {
+ R run(T t);
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ambari/blob/25fd9a62/ambari-server/src/main/java/org/apache/ambari/server/utils/Parallel.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/utils/Parallel.java b/ambari-server/src/main/java/org/apache/ambari/server/utils/Parallel.java
new file mode 100644
index 0000000..a67ee5c
--- /dev/null
+++ b/ambari-server/src/main/java/org/apache/ambari/server/utils/Parallel.java
@@ -0,0 +1,242 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ambari.server.utils;
+
+import java.util.Arrays;
+import java.util.Collections;
+
+import java.util.concurrent.Callable;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.CompletionService;
+import java.util.concurrent.ExecutorCompletionService;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.SynchronousQueue;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.ThreadFactory;
+import java.util.List;
+import java.util.LinkedList;
+import java.util.concurrent.Future;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * This class provides support for parallel loops.
+ * Iterations in the loop run in parallel in parallel loops.
+ */
+public class Parallel {
+
+ /**
+ * Max pool size
+ */
+ private static final int MAX_POOL_SIZE = Math.max(2, Runtime.getRuntime().availableProcessors());
+
+ /**
+ * Keep alive time (1 sec)
+ */
+ private static final int KEEP_ALIVE_TIME_MILLISECONDS = 1000;
+
+ /**
+ * Poll duration (10 secs)
+ */
+ private static final int POLL_DURATION_MILLISECONDS = 10000;
+
+ /**
+ * Logger
+ */
+ private static final Logger LOG = LoggerFactory.getLogger(Parallel.class);
+
+ /**
+ * Thread pool executor
+ */
+ private static ExecutorService executor = initExecutor();
+
+ /**
+ * Initialize executor
+ *
+ * @return
+ */
+ private static ExecutorService initExecutor() {
+
+ BlockingQueue<Runnable> blockingQueue = new SynchronousQueue<Runnable>(); // Using synchronous queue
+
+ // Create thread pool
+ ThreadPoolExecutor threadPool = new ThreadPoolExecutor(
+ 0, // Core pool size
+ MAX_POOL_SIZE, // Max pool size
+ KEEP_ALIVE_TIME_MILLISECONDS, // Keep alive time for idle threads
+ TimeUnit.MILLISECONDS,
+ blockingQueue, // Using synchronous queue
+ new ParallelLoopsThreadFactory(), // Thread pool factory to use
+ new ThreadPoolExecutor.CallerRunsPolicy() // Rejected tasks will run on calling thread.
+ );
+ threadPool.allowCoreThreadTimeOut(true);
+ LOG.debug(
+ "Parallel library initialized: ThreadCount = {}, CurrentPoolSize = {}, CorePoolSize = {}, MaxPoolSize = {}",
+ Thread.activeCount(), threadPool.getPoolSize(), threadPool.getCorePoolSize(), threadPool.getMaximumPoolSize());
+ return threadPool;
+ }
+
+ /**
+ * Executes a "for" parallel loop operation over all items in the data source in which iterations run in parallel.
+ *
+ * @param source Data source to iterate over
+ * @param loopBody The loop body that is invoked once per iteration
+ * @param <T> The type of data in the source
+ * @param <R> The type of data to be returned
+ * @return {@link ParallelLoopResult} Parallel loop result
+ */
+ public static <T, R> ParallelLoopResult<R> forLoop(
+ List<T> source,
+ final LoopBody<T, R> loopBody) {
+
+ if(source == null || source.isEmpty()) {
+ return new ParallelLoopResult<R>(true, (List<R>) Collections.emptyList());
+ }
+ return forLoop(source, 0, source.size(), loopBody);
+ }
+
+ /**
+ * Executes a "for" parallel loop operation in which iterations run in parallel.
+ *
+ * @param source Data source to iterate over
+ * @param startIndex The loop start index, inclusive
+ * @param endIndex The loop end index, exclusive
+ * @param loopBody The loop body that is invoked once per iteration
+ * @param <T> The type of data in the source
+ * @param <R> The type of data to be returned
+ * @return {@link ParallelLoopResult} Parallel loop result
+ *
+ */
+ public static <T, R> ParallelLoopResult<R> forLoop(
+ final List<T> source,
+ int startIndex,
+ int endIndex,
+ final LoopBody<T, R> loopBody) {
+
+ if(source == null || source.isEmpty() || startIndex == endIndex) {
+ return new ParallelLoopResult<R>(true, (List<R>) Collections.emptyList());
+ }
+ if(startIndex < 0 || startIndex >= source.size()) {
+ throw new IndexOutOfBoundsException("startIndex is out of bounds");
+ }
+ if(endIndex < 0 || endIndex > source.size()) {
+ throw new IndexOutOfBoundsException("endIndex is out of bounds");
+ }
+ if(startIndex > endIndex) {
+ throw new IndexOutOfBoundsException("startIndex > endIndex");
+ }
+ if(source.size() == 1 || (endIndex - startIndex) == 1) {
+ // Don't spawn a new thread for a single element list
+ List<R> result = Collections.singletonList(loopBody.run(source.get(startIndex)));
+ return new ParallelLoopResult<R>(true, result);
+ }
+
+ // Create a completion service for each call
+ CompletionService<ResultWrapper<R>> completionService = new ExecutorCompletionService<ResultWrapper<R>>(executor);
+
+ List<Future<ResultWrapper<R>>> futures = new LinkedList<Future<ResultWrapper<R>>>();
+ for (int i = startIndex; i < endIndex; i++) {
+ final Integer k = i;
+ Future<ResultWrapper<R>> future = completionService.submit(new Callable<ResultWrapper<R>>() {
+ @Override
+ public ResultWrapper<R> call() throws Exception {
+ ResultWrapper<R> res = new ResultWrapper<R>();
+ res.index = k;
+ res.result = loopBody.run(source.get(k));
+ return res;
+ }
+ });
+ futures.add(future);
+ }
+
+ boolean completed = true;
+ R[] result = (R[]) new Object[futures.size()];
+ try {
+ for (int i = 0; i < futures.size(); i++) {
+ Future<ResultWrapper<R>> futureResult = completionService.poll(POLL_DURATION_MILLISECONDS, TimeUnit.MILLISECONDS);
+ if (futureResult == null) {
+ // Time out! no progress was made during the last poll duration. Abort the threads and cancel the threads.
+ LOG.error("Completion service in Parallel.forLoop timed out!");
+ completed = false;
+ for(int fIndex = 0; fIndex < futures.size(); fIndex++) {
+ Future<ResultWrapper<R>> future = futures.get(fIndex);
+ if(future.isDone()) {
+ LOG.debug(" Task - {} has already completed", fIndex);
+ } else if(future.isCancelled()) {
+ LOG.debug(" Task - {} has already been cancelled", fIndex);
+ } else if(!future.cancel(true)) {
+ LOG.debug(" Task - {} could not be cancelled", fIndex);
+ } else {
+ LOG.debug(" Task - {} successfully cancelled", fIndex);
+ }
+ }
+ break;
+ } else {
+ ResultWrapper<R> res = futureResult.get();
+ if(res.result != null) {
+ result[res.index] = res.result;
+ } else {
+ LOG.error("Result is null for {}", res.index);
+ completed = false;
+ }
+ }
+ }
+ } catch (InterruptedException e) {
+ LOG.error("Caught InterruptedException in Parallel.forLoop", e);
+ completed = false;
+ } catch (ExecutionException e) {
+ LOG.error("Caught ExecutionException in Parallel.forLoop", e);
+ completed = false;
+ }
+ // Return parallel loop result
+ return new ParallelLoopResult<R>(completed, Arrays.asList(result));
+ }
+
+ /**
+ * A custom {@link ThreadFactory} for the threads that will handle
+ * {@link org.apache.ambari.server.utils.Parallel} loop iterations.
+ */
+ private static final class ParallelLoopsThreadFactory implements ThreadFactory {
+
+ private static final AtomicInteger threadId = new AtomicInteger(1);
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public Thread newThread(Runnable r) {
+ Thread thread = Executors.defaultThreadFactory().newThread(r);
+ thread.setName("parallel-loop-" + threadId.getAndIncrement());
+ return thread;
+ }
+ }
+
+ /**
+ * Result wrapper for Parallel.forLoop used internally
+ * @param <R> Type of result to wrap
+ */
+ private static final class ResultWrapper<R> {
+ int index;
+ R result;
+ }
+}
http://git-wip-us.apache.org/repos/asf/ambari/blob/25fd9a62/ambari-server/src/main/java/org/apache/ambari/server/utils/ParallelLoopResult.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/utils/ParallelLoopResult.java b/ambari-server/src/main/java/org/apache/ambari/server/utils/ParallelLoopResult.java
new file mode 100644
index 0000000..85ff706
--- /dev/null
+++ b/ambari-server/src/main/java/org/apache/ambari/server/utils/ParallelLoopResult.java
@@ -0,0 +1,63 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ambari.server.utils;
+
+import java.util.List;
+
+/**
+ * Provides completion status and results of a {@link Parallel} loop
+ * @param <R> Result type
+ */
+public class ParallelLoopResult<R> {
+ private boolean isCompleted;
+ private List<R> result;
+
+ /**
+ * Flag to indicate if the parallel loop completed all iterations
+ * @return
+ */
+ public boolean getIsCompleted() {
+ return isCompleted;
+ }
+
+ /**
+ * Flag to indicate if the parallel loop completed all iterations
+ * @return
+ */
+ public void setIsCompleted(boolean completed) {
+ isCompleted = completed;
+ }
+
+ public List<R> getResult() {
+ return result;
+ }
+
+ public void setResult(List<R> result) {
+ this.result = result;
+ }
+
+ /**
+ * Constructor
+ * @param completed Indicates if the parallel loop completed all iterations
+ * @param result Results of parallel loop. Results could be partially completed.
+ */
+ public ParallelLoopResult(boolean completed, List<R> result) {
+ isCompleted = completed;
+ this.result = result;
+ }
+}
http://git-wip-us.apache.org/repos/asf/ambari/blob/25fd9a62/ambari-server/src/test/java/org/apache/ambari/server/controller/AmbariManagementControllerTest.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/test/java/org/apache/ambari/server/controller/AmbariManagementControllerTest.java b/ambari-server/src/test/java/org/apache/ambari/server/controller/AmbariManagementControllerTest.java
index 12bca8c..6318552 100644
--- a/ambari-server/src/test/java/org/apache/ambari/server/controller/AmbariManagementControllerTest.java
+++ b/ambari-server/src/test/java/org/apache/ambari/server/controller/AmbariManagementControllerTest.java
@@ -5486,7 +5486,8 @@ public class AmbariManagementControllerTest {
long requestId2 = startService(clusterName, serviceName1, true, true);
long requestId3 = startService(clusterName, serviceName2, true, true);
- stages = actionDB.getAllStages(requestId2);
+ stages = new ArrayList<>();
+ stages.addAll(actionDB.getAllStages(requestId2));
stages.addAll(actionDB.getAllStages(requestId3));
HostRoleCommand hdfsCmdHost3 = null;
HostRoleCommand hdfsCmdHost2 = null;
http://git-wip-us.apache.org/repos/asf/ambari/blob/25fd9a62/ambari-server/src/test/java/org/apache/ambari/server/utils/TestParallel.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/test/java/org/apache/ambari/server/utils/TestParallel.java b/ambari-server/src/test/java/org/apache/ambari/server/utils/TestParallel.java
new file mode 100644
index 0000000..0628f20
--- /dev/null
+++ b/ambari-server/src/test/java/org/apache/ambari/server/utils/TestParallel.java
@@ -0,0 +1,188 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ambari.server.utils;
+
+import org.junit.Test;
+import junit.framework.Assert;
+
+
+import java.util.Arrays;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Collections;
+import java.util.LinkedList;
+
+/**
+ * Tests parallel loops
+ */
+public class TestParallel {
+
+ /**
+ * Tests {@link org.apache.ambari.server.utils.Parallel} forLoop base cases.
+ * @throws Exception
+ */
+ @Test
+ public void testParallelForLoopBaseCases() throws Exception {
+
+ ParallelLoopResult<Integer> nullLoopResult = Parallel.forLoop(
+ null,
+ new LoopBody<Integer, Integer>() {
+ @Override
+ public Integer run(Integer integer) {
+ return integer;
+ }
+ });
+ Assert.assertTrue(nullLoopResult.getIsCompleted());
+ Assert.assertTrue(nullLoopResult.getResult().isEmpty());
+
+ ParallelLoopResult<Integer> emptyLoopResult = Parallel.forLoop(
+ new ArrayList<Integer>(),
+ new LoopBody<Integer, Integer>() {
+ @Override
+ public Integer run(Integer integer) {
+ return integer;
+ }
+ });
+ Assert.assertTrue(emptyLoopResult.getIsCompleted());
+ Assert.assertTrue(emptyLoopResult.getResult().isEmpty());
+
+ ParallelLoopResult<Integer> singleElementLoopResult = Parallel.forLoop(
+ Collections.singletonList(7),
+ new LoopBody<Integer, Integer>() {
+ @Override
+ public Integer run(Integer integer) {
+ return integer;
+ }
+ });
+ Assert.assertTrue(singleElementLoopResult.getIsCompleted());
+ List<Integer> singleElementList = singleElementLoopResult.getResult();
+ Assert.assertTrue(singleElementLoopResult.getIsCompleted());
+ Assert.assertFalse(singleElementList.isEmpty());
+ Assert.assertEquals(1, singleElementList.size());
+ Assert.assertNotNull(singleElementList.get(0));
+ }
+
+ /**
+ * Tests Parallel.forLoop
+ * @throws Exception
+ */
+ @Test
+ public void testParallelForLoop() throws Exception {
+ final List<Integer> input = new LinkedList<Integer>();
+ for(int i = 0; i < 10; i++) {
+ input.add(i);
+ }
+
+ ParallelLoopResult<Integer> loopResult = Parallel.forLoop(input, new LoopBody<Integer, Integer>() {
+ @Override
+ public Integer run(Integer in1) {
+ return in1 * in1;
+ }
+ });
+ Assert.assertTrue(loopResult.getIsCompleted());
+ Assert.assertNotNull(loopResult.getResult());
+
+ List<Integer> output = loopResult.getResult();
+ Assert.assertEquals(input.size(), output.size());
+ for(int i = 0; i < input.size(); i++) {
+ Assert.assertEquals( i * i, (int)output.get(i));
+ }
+ }
+
+ /**
+ * Tests nested {@link org.apache.ambari.server.utils.Parallel} forLoop
+ * @throws Exception
+ */
+ @Test
+ public void testNestedParallelForLoopIterationFailures() throws Exception {
+ final List<Integer> input = new LinkedList<Integer>();
+ for(int i = 0; i < 10; i++) {
+ input.add(i);
+ }
+ final ParallelLoopResult<Integer>[] innerLoopResults = new ParallelLoopResult[input.size()];
+ ParallelLoopResult<Integer> loopResult = Parallel.forLoop(input, new LoopBody<Integer, Integer>() {
+ @Override
+ public Integer run(final Integer in1) {
+ int sq = in1 * in1;
+ ParallelLoopResult<Integer> innerLoopResult = Parallel.forLoop(input, new LoopBody<Integer, Integer>() {
+ @Override
+ public Integer run(Integer in2) {
+ return in1 * in2;
+ }
+ });
+ innerLoopResults[in1] = innerLoopResult;
+ return in1 * in1;
+ }
+ });
+ Assert.assertNotNull(loopResult);
+ Assert.assertTrue(loopResult.getIsCompleted());
+ List<Integer> output = loopResult.getResult();
+ Assert.assertNotNull(output);
+ Assert.assertEquals(input.size(), output.size());
+
+ for(int i = 0; i < input.size(); i++) {
+ Assert.assertEquals(i * i, (int) output.get(i));
+ ParallelLoopResult<Integer> innerLoopResult = innerLoopResults[i];
+ Assert.assertNotNull(innerLoopResult);
+ Assert.assertTrue(innerLoopResult.getIsCompleted());
+ List<Integer> innerOutput = innerLoopResult.getResult();
+ Assert.assertNotNull(innerOutput);
+ Assert.assertEquals(input.size(), innerOutput.size());
+
+ for(int j = 0; j < input.size(); j++) {
+ Assert.assertEquals(i*j, (int) innerOutput.get(j));
+ }
+ }
+ }
+
+ /**
+ * Tests {@link org.apache.ambari.server.utils.Parallel} forLoop iteration failures
+ * @throws Exception
+ */
+ @Test
+ public void testParallelForLoopIterationFailures() throws Exception {
+ final List<Integer> input = new LinkedList<Integer>();
+ for(int i = 0; i < 10; i++) {
+ input.add(i);
+ }
+ final List<Integer> failForList = Arrays.asList(new Integer[] { 2, 5, 7});
+ ParallelLoopResult<Integer> loopResult = Parallel.forLoop(input, new LoopBody<Integer, Integer>() {
+ @Override
+ public Integer run(Integer in1) {
+ if(failForList.contains(in1)) {
+ // Return null
+ return null;
+ }
+ return in1 * in1;
+ }
+ });
+ Assert.assertFalse(loopResult.getIsCompleted());
+ Assert.assertNotNull(loopResult.getResult());
+ List<Integer> output = loopResult.getResult();
+ Assert.assertEquals(input.size(), output.size());
+
+ for(int i = 0; i < input.size(); i++) {
+ if(failForList.contains(i)) {
+ Assert.assertNull(output.get(i));
+ output.set(i, i * i);
+ } else {
+ Assert.assertEquals(i * i, (int) output.get(i));
+ }
+ }
+ }
+}