You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ambari.apache.org by nc...@apache.org on 2015/10/02 23:12:15 UTC

[11/32] ambari git commit: AMBARI-13291 - JPA Threads Stuck In ConcurrencyManager Freezes Ambari (jonathanhurley)

AMBARI-13291 - JPA Threads Stuck In ConcurrencyManager Freezes Ambari (jonathanhurley)


Project: http://git-wip-us.apache.org/repos/asf/ambari/repo
Commit: http://git-wip-us.apache.org/repos/asf/ambari/commit/8cd16dc8
Tree: http://git-wip-us.apache.org/repos/asf/ambari/tree/8cd16dc8
Diff: http://git-wip-us.apache.org/repos/asf/ambari/diff/8cd16dc8

Branch: refs/heads/branch-dev-patch-upgrade
Commit: 8cd16dc8a4c30104f3a0094ac1416ef4fa365498
Parents: 805a2e8
Author: Jonathan Hurley <jh...@hortonworks.com>
Authored: Thu Oct 1 12:44:27 2015 -0400
Committer: Jonathan Hurley <jh...@hortonworks.com>
Committed: Thu Oct 1 14:19:51 2015 -0400

----------------------------------------------------------------------
 .../apache/ambari/annotations/Experimental.java | 35 ++++++++++++
 .../actionmanager/ActionDBAccessorImpl.java     | 48 ++++++++++++-----
 .../server/configuration/Configuration.java     | 57 ++++++++++++++------
 .../server/configuration/ConfigurationTest.java | 18 ++++++-
 4 files changed, 127 insertions(+), 31 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ambari/blob/8cd16dc8/ambari-server/src/main/java/org/apache/ambari/annotations/Experimental.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/annotations/Experimental.java b/ambari-server/src/main/java/org/apache/ambari/annotations/Experimental.java
new file mode 100644
index 0000000..5a4915a
--- /dev/null
+++ b/ambari-server/src/main/java/org/apache/ambari/annotations/Experimental.java
@@ -0,0 +1,35 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ambari.annotations;
+
+import java.lang.annotation.ElementType;
+import java.lang.annotation.Retention;
+import java.lang.annotation.RetentionPolicy;
+import java.lang.annotation.Target;
+
+/**
+ * The {@code Experimental} annotation is used to mark an area of code which
+ * contains untested functionality. This allows areas of code
+ * which may not be completed to be quickly located.
+ */
+@Retention(RetentionPolicy.SOURCE)
+@Target({ ElementType.TYPE, ElementType.METHOD, ElementType.CONSTRUCTOR,
+    ElementType.ANNOTATION_TYPE, ElementType.PACKAGE, ElementType.FIELD,
+    ElementType.LOCAL_VARIABLE })
+public @interface Experimental {
+}

http://git-wip-us.apache.org/repos/asf/ambari/blob/8cd16dc8/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionDBAccessorImpl.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionDBAccessorImpl.java b/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionDBAccessorImpl.java
index 0f439de..d482ce9 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionDBAccessorImpl.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionDBAccessorImpl.java
@@ -27,9 +27,11 @@ import java.util.List;
 import java.util.Map;
 import java.util.concurrent.TimeUnit;
 
+import org.apache.ambari.annotations.Experimental;
 import org.apache.ambari.server.AmbariException;
 import org.apache.ambari.server.agent.CommandReport;
 import org.apache.ambari.server.agent.ExecutionCommand;
+import org.apache.ambari.server.configuration.Configuration;
 import org.apache.ambari.server.orm.dao.ClusterDAO;
 import org.apache.ambari.server.orm.dao.ExecutionCommandDAO;
 import org.apache.ambari.server.orm.dao.HostDAO;
@@ -105,6 +107,9 @@ public class ActionDBAccessorImpl implements ActionDBAccessor {
   @Inject
   RequestScheduleDAO requestScheduleDAO;
 
+  @Inject
+  Configuration configuration;
+
   private Cache<Long, HostRoleCommand> hostRoleCommandCache;
   private long cacheLimit; //may be exceeded to store tasks from one request
 
@@ -211,23 +216,38 @@ public class ActionDBAccessorImpl implements ActionDBAccessor {
    * {@inheritDoc}
    */
   @Override
+  @Experimental
   public List<Stage> getStagesInProgress() {
-    List<StageEntity> stageEntities = stageDAO.findByCommandStatuses(HostRoleStatus.IN_PROGRESS_STATUSES);
-    ParallelLoopResult<Stage> loopResult = Parallel.forLoop(stageEntities, new LoopBody<StageEntity, Stage>() {
-      @Override
-      public Stage run(StageEntity stageEntity) {
-        return stageFactory.createExisting(stageEntity);
+    List<StageEntity> stageEntities = stageDAO.findByCommandStatuses(
+        HostRoleStatus.IN_PROGRESS_STATUSES);
+
+    // experimentally enable parallel stage processing
+    @Experimental
+    boolean useConcurrentStageProcessing = configuration.isExperimentalConcurrentStageProcessingEnabled();
+    if (useConcurrentStageProcessing) {
+      ParallelLoopResult<Stage> loopResult = Parallel.forLoop(stageEntities,
+          new LoopBody<StageEntity, Stage>() {
+            @Override
+            public Stage run(StageEntity stageEntity) {
+              return stageFactory.createExisting(stageEntity);
+            }
+          });
+      if (loopResult.getIsCompleted()) {
+        return loopResult.getResult();
+      } else {
+        // Fetch any missing results sequentially
+        List<Stage> stages = loopResult.getResult();
+        for (int i = 0; i < stages.size(); i++) {
+          if (stages.get(i) == null) {
+            stages.set(i, stageFactory.createExisting(stageEntities.get(i)));
+          }
+        }
+        return stages;
       }
-    });
-    if(loopResult.getIsCompleted()) {
-      return loopResult.getResult();
     } else {
-      // Fetch any missing results sequentially
-      List<Stage> stages = loopResult.getResult();
-      for(int i = 0; i < stages.size(); i++) {
-        if(stages.get(i) == null) {
-          stages.set(i, stageFactory.createExisting(stageEntities.get(i)));
-        }
+      List<Stage> stages = new ArrayList<>(stageEntities.size());
+      for (StageEntity stageEntity : stageEntities) {
+        stages.add(stageFactory.createExisting(stageEntity));
       }
       return stages;
     }

http://git-wip-us.apache.org/repos/asf/ambari/blob/8cd16dc8/ambari-server/src/main/java/org/apache/ambari/server/configuration/Configuration.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/configuration/Configuration.java b/ambari-server/src/main/java/org/apache/ambari/server/configuration/Configuration.java
index 67dcfa5..5bc3276 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/configuration/Configuration.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/configuration/Configuration.java
@@ -17,15 +17,27 @@
  */
 package org.apache.ambari.server.configuration;
 
-import com.google.inject.Inject;
-import com.google.inject.Singleton;
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.EnumSet;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Properties;
+
+import org.apache.ambari.annotations.Experimental;
 import org.apache.ambari.server.AmbariException;
+import org.apache.ambari.server.actionmanager.Stage;
 import org.apache.ambari.server.orm.JPATableGenerationStrategy;
 import org.apache.ambari.server.orm.PersistenceType;
+import org.apache.ambari.server.orm.entities.StageEntity;
 import org.apache.ambari.server.security.ClientSecurityType;
 import org.apache.ambari.server.security.authorization.LdapServerProperties;
 import org.apache.ambari.server.security.encryption.CredentialProvider;
 import org.apache.ambari.server.state.stack.OsFamily;
+import org.apache.ambari.server.utils.Parallel;
 import org.apache.ambari.server.utils.ShellCommandUtil;
 import org.apache.commons.io.FileUtils;
 import org.apache.commons.lang.RandomStringUtils;
@@ -33,15 +45,8 @@ import org.apache.commons.lang.StringUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.io.File;
-import java.io.FileNotFoundException;
-import java.io.IOException;
-import java.io.InputStream;
-import java.util.EnumSet;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.Map.Entry;
-import java.util.Properties;
+import com.google.inject.Inject;
+import com.google.inject.Singleton;
 
 
 /**
@@ -187,7 +192,7 @@ public class Configuration {
   public static final String SERVER_JDBC_CONNECTION_POOL_IDLE_TEST_INTERVAL = "server.jdbc.connection-pool.idle-test-interval";
   public static final String SERVER_JDBC_CONNECTION_POOL_ACQUISITION_RETRY_ATTEMPTS = "server.jdbc.connection-pool.acquisition-retry-attempts";
   public static final String SERVER_JDBC_CONNECTION_POOL_ACQUISITION_RETRY_DELAY = "server.jdbc.connection-pool.acquisition-retry-delay";
-  
+
   public static final String SERVER_JDBC_RCA_USER_NAME_KEY = "server.jdbc.rca.user.name";
   public static final String SERVER_JDBC_RCA_USER_PASSWD_KEY = "server.jdbc.rca.user.passwd";
   public static final String SERVER_JDBC_RCA_DRIVER_KEY = "server.jdbc.rca.driver";
@@ -430,6 +435,14 @@ public class Configuration {
   private static final String TIMELINE_METRICS_CACHE_HEAP_PERCENT = "server.timeline.metrics.cache.heap.percent";
   private static final String DEFAULT_TIMELINE_METRICS_CACHE_HEAP_PERCENT = "15%";
 
+  // experimental options
+
+  /**
+   * Governs the use of {@link Parallel} to process {@link StageEntity}
+   * instances into {@link Stage}.
+   */
+  protected static final String EXPERIMENTAL_CONCURRENCY_STAGE_PROCESSING_ENABLED = "experimental.concurrency.stage_processing.enabled";
+
   /**
    * The full path to the XML file that describes the different alert templates.
    */
@@ -1522,7 +1535,7 @@ public class Configuration {
     }
     return value;
   }
-  
+
   /**
    * @param isPackageInstallationTask true, if task is for installing packages
    * @return default task timeout in seconds (string representation). This value
@@ -1813,7 +1826,7 @@ public class Configuration {
         SERVER_JDBC_CONNECTION_POOL_MAX_IDLE_TIME_EXCESS,
         DEFAULT_JDBC_POOL_EXCESS_MAX_IDLE_TIME_SECONDS));
   }
-  
+
   /**
    * Gets the number of connections that should be retrieved when the pool size
    * must increase. It's wise to set this higher than 1 since the assumption is
@@ -1838,7 +1851,7 @@ public class Configuration {
         SERVER_JDBC_CONNECTION_POOL_ACQUISITION_RETRY_ATTEMPTS,
         DEFAULT_JDBC_POOL_ACQUISITION_RETRY_ATTEMPTS));
   }
-  
+
   /**
    * Gets the delay in milliseconds between connection acquire attempts.
    *
@@ -1962,4 +1975,18 @@ public class Configuration {
 
     return percent.trim().endsWith("%") ? percent.trim() : percent.trim() + "%";
   }
+
+  /**
+   * Gets whether to use experiemental concurrent processing to convert
+   * {@link StageEntity} instances into {@link Stage} instances. The default is
+   * {@code false}.
+   *
+   * @return {code true} if the experimental feature is enabled, {@code false}
+   *         otherwise.
+   */
+  @Experimental
+  public boolean isExperimentalConcurrentStageProcessingEnabled() {
+    return Boolean.parseBoolean(properties.getProperty(
+        EXPERIMENTAL_CONCURRENCY_STAGE_PROCESSING_ENABLED, Boolean.FALSE.toString()));
+  }
 }

http://git-wip-us.apache.org/repos/asf/ambari/blob/8cd16dc8/ambari-server/src/test/java/org/apache/ambari/server/configuration/ConfigurationTest.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/test/java/org/apache/ambari/server/configuration/ConfigurationTest.java b/ambari-server/src/test/java/org/apache/ambari/server/configuration/ConfigurationTest.java
index 074fbb4..bd02749 100644
--- a/ambari-server/src/test/java/org/apache/ambari/server/configuration/ConfigurationTest.java
+++ b/ambari-server/src/test/java/org/apache/ambari/server/configuration/ConfigurationTest.java
@@ -32,8 +32,6 @@ import java.lang.reflect.Method;
 import java.util.Map;
 import java.util.Properties;
 
-import junit.framework.Assert;
-
 import org.apache.ambari.server.AmbariException;
 import org.apache.ambari.server.configuration.Configuration.ConnectionPoolType;
 import org.apache.ambari.server.configuration.Configuration.DatabaseType;
@@ -58,6 +56,8 @@ import com.google.inject.Guice;
 import com.google.inject.Inject;
 import com.google.inject.Injector;
 
+import junit.framework.Assert;
+
 @RunWith(PowerMockRunner.class)
 @PrepareForTest({ Configuration.class })
 @PowerMockIgnore( {"javax.management.*", "javax.crypto.*"})
@@ -496,4 +496,18 @@ public class ConfigurationTest {
     ambariProperties.setProperty(Configuration.AGENT_PACKAGE_PARALLEL_COMMANDS_LIMIT_KEY, "0");
     Assert.assertEquals(1, configuration.getAgentPackageParallelCommandsLimit());
   }
+
+  @Test
+  public void testExperimentalConcurrentStageProcessing() throws Exception {
+    final Properties ambariProperties = new Properties();
+    final Configuration configuration = new Configuration(ambariProperties);
+
+    Assert.assertFalse(configuration.isExperimentalConcurrentStageProcessingEnabled());
+
+    ambariProperties.setProperty(Configuration.EXPERIMENTAL_CONCURRENCY_STAGE_PROCESSING_ENABLED,
+        Boolean.TRUE.toString());
+
+    Assert.assertTrue(configuration.isExperimentalConcurrentStageProcessingEnabled());
+
+  }
 }