You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ambari.apache.org by nc...@apache.org on 2015/10/02 23:12:15 UTC
[11/32] ambari git commit: AMBARI-13291 - JPA Threads Stuck In
ConcurrencyManager Freezes Ambari (jonathanhurley)
AMBARI-13291 - JPA Threads Stuck In ConcurrencyManager Freezes Ambari (jonathanhurley)
Project: http://git-wip-us.apache.org/repos/asf/ambari/repo
Commit: http://git-wip-us.apache.org/repos/asf/ambari/commit/8cd16dc8
Tree: http://git-wip-us.apache.org/repos/asf/ambari/tree/8cd16dc8
Diff: http://git-wip-us.apache.org/repos/asf/ambari/diff/8cd16dc8
Branch: refs/heads/branch-dev-patch-upgrade
Commit: 8cd16dc8a4c30104f3a0094ac1416ef4fa365498
Parents: 805a2e8
Author: Jonathan Hurley <jh...@hortonworks.com>
Authored: Thu Oct 1 12:44:27 2015 -0400
Committer: Jonathan Hurley <jh...@hortonworks.com>
Committed: Thu Oct 1 14:19:51 2015 -0400
----------------------------------------------------------------------
.../apache/ambari/annotations/Experimental.java | 35 ++++++++++++
.../actionmanager/ActionDBAccessorImpl.java | 48 ++++++++++++-----
.../server/configuration/Configuration.java | 57 ++++++++++++++------
.../server/configuration/ConfigurationTest.java | 18 ++++++-
4 files changed, 127 insertions(+), 31 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ambari/blob/8cd16dc8/ambari-server/src/main/java/org/apache/ambari/annotations/Experimental.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/annotations/Experimental.java b/ambari-server/src/main/java/org/apache/ambari/annotations/Experimental.java
new file mode 100644
index 0000000..5a4915a
--- /dev/null
+++ b/ambari-server/src/main/java/org/apache/ambari/annotations/Experimental.java
@@ -0,0 +1,35 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ambari.annotations;
+
+import java.lang.annotation.ElementType;
+import java.lang.annotation.Retention;
+import java.lang.annotation.RetentionPolicy;
+import java.lang.annotation.Target;
+
+/**
+ * The {@code Experimental} annotation is used to mark an area of code which
+ * contains untested functionality. This allows areas of code
+ * which may not be completed to be quickly located.
+ */
+@Retention(RetentionPolicy.SOURCE)
+@Target({ ElementType.TYPE, ElementType.METHOD, ElementType.CONSTRUCTOR,
+ ElementType.ANNOTATION_TYPE, ElementType.PACKAGE, ElementType.FIELD,
+ ElementType.LOCAL_VARIABLE })
+public @interface Experimental {
+}
http://git-wip-us.apache.org/repos/asf/ambari/blob/8cd16dc8/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionDBAccessorImpl.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionDBAccessorImpl.java b/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionDBAccessorImpl.java
index 0f439de..d482ce9 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionDBAccessorImpl.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionDBAccessorImpl.java
@@ -27,9 +27,11 @@ import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
+import org.apache.ambari.annotations.Experimental;
import org.apache.ambari.server.AmbariException;
import org.apache.ambari.server.agent.CommandReport;
import org.apache.ambari.server.agent.ExecutionCommand;
+import org.apache.ambari.server.configuration.Configuration;
import org.apache.ambari.server.orm.dao.ClusterDAO;
import org.apache.ambari.server.orm.dao.ExecutionCommandDAO;
import org.apache.ambari.server.orm.dao.HostDAO;
@@ -105,6 +107,9 @@ public class ActionDBAccessorImpl implements ActionDBAccessor {
@Inject
RequestScheduleDAO requestScheduleDAO;
+ @Inject
+ Configuration configuration;
+
private Cache<Long, HostRoleCommand> hostRoleCommandCache;
private long cacheLimit; //may be exceeded to store tasks from one request
@@ -211,23 +216,38 @@ public class ActionDBAccessorImpl implements ActionDBAccessor {
* {@inheritDoc}
*/
@Override
+ @Experimental
public List<Stage> getStagesInProgress() {
- List<StageEntity> stageEntities = stageDAO.findByCommandStatuses(HostRoleStatus.IN_PROGRESS_STATUSES);
- ParallelLoopResult<Stage> loopResult = Parallel.forLoop(stageEntities, new LoopBody<StageEntity, Stage>() {
- @Override
- public Stage run(StageEntity stageEntity) {
- return stageFactory.createExisting(stageEntity);
+ List<StageEntity> stageEntities = stageDAO.findByCommandStatuses(
+ HostRoleStatus.IN_PROGRESS_STATUSES);
+
+ // experimentally enable parallel stage processing
+ @Experimental
+ boolean useConcurrentStageProcessing = configuration.isExperimentalConcurrentStageProcessingEnabled();
+ if (useConcurrentStageProcessing) {
+ ParallelLoopResult<Stage> loopResult = Parallel.forLoop(stageEntities,
+ new LoopBody<StageEntity, Stage>() {
+ @Override
+ public Stage run(StageEntity stageEntity) {
+ return stageFactory.createExisting(stageEntity);
+ }
+ });
+ if (loopResult.getIsCompleted()) {
+ return loopResult.getResult();
+ } else {
+ // Fetch any missing results sequentially
+ List<Stage> stages = loopResult.getResult();
+ for (int i = 0; i < stages.size(); i++) {
+ if (stages.get(i) == null) {
+ stages.set(i, stageFactory.createExisting(stageEntities.get(i)));
+ }
+ }
+ return stages;
}
- });
- if(loopResult.getIsCompleted()) {
- return loopResult.getResult();
} else {
- // Fetch any missing results sequentially
- List<Stage> stages = loopResult.getResult();
- for(int i = 0; i < stages.size(); i++) {
- if(stages.get(i) == null) {
- stages.set(i, stageFactory.createExisting(stageEntities.get(i)));
- }
+ List<Stage> stages = new ArrayList<>(stageEntities.size());
+ for (StageEntity stageEntity : stageEntities) {
+ stages.add(stageFactory.createExisting(stageEntity));
}
return stages;
}
http://git-wip-us.apache.org/repos/asf/ambari/blob/8cd16dc8/ambari-server/src/main/java/org/apache/ambari/server/configuration/Configuration.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/configuration/Configuration.java b/ambari-server/src/main/java/org/apache/ambari/server/configuration/Configuration.java
index 67dcfa5..5bc3276 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/configuration/Configuration.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/configuration/Configuration.java
@@ -17,15 +17,27 @@
*/
package org.apache.ambari.server.configuration;
-import com.google.inject.Inject;
-import com.google.inject.Singleton;
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.EnumSet;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Properties;
+
+import org.apache.ambari.annotations.Experimental;
import org.apache.ambari.server.AmbariException;
+import org.apache.ambari.server.actionmanager.Stage;
import org.apache.ambari.server.orm.JPATableGenerationStrategy;
import org.apache.ambari.server.orm.PersistenceType;
+import org.apache.ambari.server.orm.entities.StageEntity;
import org.apache.ambari.server.security.ClientSecurityType;
import org.apache.ambari.server.security.authorization.LdapServerProperties;
import org.apache.ambari.server.security.encryption.CredentialProvider;
import org.apache.ambari.server.state.stack.OsFamily;
+import org.apache.ambari.server.utils.Parallel;
import org.apache.ambari.server.utils.ShellCommandUtil;
import org.apache.commons.io.FileUtils;
import org.apache.commons.lang.RandomStringUtils;
@@ -33,15 +45,8 @@ import org.apache.commons.lang.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.io.File;
-import java.io.FileNotFoundException;
-import java.io.IOException;
-import java.io.InputStream;
-import java.util.EnumSet;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.Map.Entry;
-import java.util.Properties;
+import com.google.inject.Inject;
+import com.google.inject.Singleton;
/**
@@ -187,7 +192,7 @@ public class Configuration {
public static final String SERVER_JDBC_CONNECTION_POOL_IDLE_TEST_INTERVAL = "server.jdbc.connection-pool.idle-test-interval";
public static final String SERVER_JDBC_CONNECTION_POOL_ACQUISITION_RETRY_ATTEMPTS = "server.jdbc.connection-pool.acquisition-retry-attempts";
public static final String SERVER_JDBC_CONNECTION_POOL_ACQUISITION_RETRY_DELAY = "server.jdbc.connection-pool.acquisition-retry-delay";
-
+
public static final String SERVER_JDBC_RCA_USER_NAME_KEY = "server.jdbc.rca.user.name";
public static final String SERVER_JDBC_RCA_USER_PASSWD_KEY = "server.jdbc.rca.user.passwd";
public static final String SERVER_JDBC_RCA_DRIVER_KEY = "server.jdbc.rca.driver";
@@ -430,6 +435,14 @@ public class Configuration {
private static final String TIMELINE_METRICS_CACHE_HEAP_PERCENT = "server.timeline.metrics.cache.heap.percent";
private static final String DEFAULT_TIMELINE_METRICS_CACHE_HEAP_PERCENT = "15%";
+ // experimental options
+
+ /**
+ * Governs the use of {@link Parallel} to process {@link StageEntity}
+ * instances into {@link Stage}.
+ */
+ protected static final String EXPERIMENTAL_CONCURRENCY_STAGE_PROCESSING_ENABLED = "experimental.concurrency.stage_processing.enabled";
+
/**
* The full path to the XML file that describes the different alert templates.
*/
@@ -1522,7 +1535,7 @@ public class Configuration {
}
return value;
}
-
+
/**
* @param isPackageInstallationTask true, if task is for installing packages
* @return default task timeout in seconds (string representation). This value
@@ -1813,7 +1826,7 @@ public class Configuration {
SERVER_JDBC_CONNECTION_POOL_MAX_IDLE_TIME_EXCESS,
DEFAULT_JDBC_POOL_EXCESS_MAX_IDLE_TIME_SECONDS));
}
-
+
/**
* Gets the number of connections that should be retrieved when the pool size
* must increase. It's wise to set this higher than 1 since the assumption is
@@ -1838,7 +1851,7 @@ public class Configuration {
SERVER_JDBC_CONNECTION_POOL_ACQUISITION_RETRY_ATTEMPTS,
DEFAULT_JDBC_POOL_ACQUISITION_RETRY_ATTEMPTS));
}
-
+
/**
* Gets the delay in milliseconds between connection acquire attempts.
*
@@ -1962,4 +1975,18 @@ public class Configuration {
return percent.trim().endsWith("%") ? percent.trim() : percent.trim() + "%";
}
+
+ /**
+ * Gets whether to use experiemental concurrent processing to convert
+ * {@link StageEntity} instances into {@link Stage} instances. The default is
+ * {@code false}.
+ *
+ * @return {code true} if the experimental feature is enabled, {@code false}
+ * otherwise.
+ */
+ @Experimental
+ public boolean isExperimentalConcurrentStageProcessingEnabled() {
+ return Boolean.parseBoolean(properties.getProperty(
+ EXPERIMENTAL_CONCURRENCY_STAGE_PROCESSING_ENABLED, Boolean.FALSE.toString()));
+ }
}
http://git-wip-us.apache.org/repos/asf/ambari/blob/8cd16dc8/ambari-server/src/test/java/org/apache/ambari/server/configuration/ConfigurationTest.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/test/java/org/apache/ambari/server/configuration/ConfigurationTest.java b/ambari-server/src/test/java/org/apache/ambari/server/configuration/ConfigurationTest.java
index 074fbb4..bd02749 100644
--- a/ambari-server/src/test/java/org/apache/ambari/server/configuration/ConfigurationTest.java
+++ b/ambari-server/src/test/java/org/apache/ambari/server/configuration/ConfigurationTest.java
@@ -32,8 +32,6 @@ import java.lang.reflect.Method;
import java.util.Map;
import java.util.Properties;
-import junit.framework.Assert;
-
import org.apache.ambari.server.AmbariException;
import org.apache.ambari.server.configuration.Configuration.ConnectionPoolType;
import org.apache.ambari.server.configuration.Configuration.DatabaseType;
@@ -58,6 +56,8 @@ import com.google.inject.Guice;
import com.google.inject.Inject;
import com.google.inject.Injector;
+import junit.framework.Assert;
+
@RunWith(PowerMockRunner.class)
@PrepareForTest({ Configuration.class })
@PowerMockIgnore( {"javax.management.*", "javax.crypto.*"})
@@ -496,4 +496,18 @@ public class ConfigurationTest {
ambariProperties.setProperty(Configuration.AGENT_PACKAGE_PARALLEL_COMMANDS_LIMIT_KEY, "0");
Assert.assertEquals(1, configuration.getAgentPackageParallelCommandsLimit());
}
+
+ @Test
+ public void testExperimentalConcurrentStageProcessing() throws Exception {
+ final Properties ambariProperties = new Properties();
+ final Configuration configuration = new Configuration(ambariProperties);
+
+ Assert.assertFalse(configuration.isExperimentalConcurrentStageProcessingEnabled());
+
+ ambariProperties.setProperty(Configuration.EXPERIMENTAL_CONCURRENCY_STAGE_PROCESSING_ENABLED,
+ Boolean.TRUE.toString());
+
+ Assert.assertTrue(configuration.isExperimentalConcurrentStageProcessingEnabled());
+
+ }
}