You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ambari.apache.org by ds...@apache.org on 2018/09/26 10:54:04 UTC

[ambari] branch trunk updated: [AMBARI-24682] Rolling Restarts: Option to specify number of failures per batch (dsen) (#2371)

This is an automated email from the ASF dual-hosted git repository.

dsen pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/ambari.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 46711f8  [AMBARI-24682] Rolling Restarts: Option to specify number of failures per batch (dsen) (#2371)
46711f8 is described below

commit 46711f8259e388868ce2a58845e7edc92c199b1a
Author: Dmitry Sen <ds...@apache.org>
AuthorDate: Wed Sep 26 13:53:58 2018 +0300

    [AMBARI-24682] Rolling Restarts: Option to specify number of failures per batch (dsen) (#2371)
---
 .../internal/RequestScheduleResourceProvider.java  |  7 ++
 .../server/orm/entities/RequestScheduleEntity.java | 11 ++++
 .../server/scheduler/ExecutionScheduleManager.java | 16 +++--
 .../server/state/scheduler/BatchRequestJob.java    |  7 +-
 .../server/state/scheduler/BatchSettings.java      | 10 +++
 .../state/scheduler/RequestExecutionImpl.java      |  2 +
 .../ambari/server/upgrade/SchemaUpgradeHelper.java |  1 +
 .../ambari/server/upgrade/UpgradeCatalog280.java   | 73 +++++++++++++++++++++
 .../src/main/resources/Ambari-DDL-Derby-CREATE.sql |  1 +
 .../src/main/resources/Ambari-DDL-MySQL-CREATE.sql |  1 +
 .../main/resources/Ambari-DDL-Oracle-CREATE.sql    |  1 +
 .../main/resources/Ambari-DDL-Postgres-CREATE.sql  |  1 +
 .../resources/Ambari-DDL-SQLAnywhere-CREATE.sql    |  1 +
 .../main/resources/Ambari-DDL-SQLServer-CREATE.sql |  1 +
 .../scheduler/ExecutionScheduleManagerTest.java    | 49 ++++++++++++++
 .../ambari/server/state/RequestExecutionTest.java  |  5 ++
 .../state/scheduler/BatchRequestJobTest.java       |  1 +
 .../server/upgrade/UpgradeCatalog280Test.java      | 74 ++++++++++++++++++++++
 18 files changed, 256 insertions(+), 6 deletions(-)

diff --git a/ambari-server/src/main/java/org/apache/ambari/server/controller/internal/RequestScheduleResourceProvider.java b/ambari-server/src/main/java/org/apache/ambari/server/controller/internal/RequestScheduleResourceProvider.java
index 15ae9dd..18ce73b 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/controller/internal/RequestScheduleResourceProvider.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/controller/internal/RequestScheduleResourceProvider.java
@@ -78,6 +78,7 @@ public class RequestScheduleResourceProvider extends AbstractControllerResourceP
 
   public static final String BATCH_SEPARATION_IN_SECONDS_PROPERTY_ID = "batch_separation_in_seconds";
   public static final String TASK_FAILURE_TOLERANCE_PROPERTY_ID = "task_failure_tolerance";
+  public static final String TASK_FAILURE_TOLERANCE_PER_BATCH_PROPERTY_ID = "task_failure_tolerance_per_batch";
   public static final String TASK_FAILURE_TOLERANCE_LIMIT_PROPERTY_ID = "task_failure_tolerance_limit";
   public static final String REQUESTS_PROPERTY_ID = "requests";
 
@@ -122,6 +123,7 @@ public class RequestScheduleResourceProvider extends AbstractControllerResourceP
           BATCH_SEPARATION_IN_SECONDS_PROPERTY_ID);
   public static final String TASK_FAILURE_TOLERANCE = PropertyHelper.getPropertyId(BATCH_SETTINGS,
           TASK_FAILURE_TOLERANCE_PROPERTY_ID);
+  public static final String TASK_FAILURE_TOLERANCE_PER_BATCH = PropertyHelper.getPropertyId(BATCH_SETTINGS, TASK_FAILURE_TOLERANCE_PER_BATCH_PROPERTY_ID);
   public static final String REQUESTS = PropertyHelper.getPropertyId(null, REQUESTS_PROPERTY_ID);
 
   public static final String TYPE = PropertyHelper.getPropertyId(null, TYPE_PROPERTY_ID);
@@ -164,6 +166,7 @@ public class RequestScheduleResourceProvider extends AbstractControllerResourceP
     UPDATE_TIME,
     BATCH_SEPARATION_IN_SECONDS,
     TASK_FAILURE_TOLERANCE,
+    TASK_FAILURE_TOLERANCE_PER_BATCH,
     REQUESTS,
     TYPE,
     URI,
@@ -633,6 +636,10 @@ public class RequestScheduleResourceProvider extends AbstractControllerResourceP
                   (TASK_FAILURE_TOLERANCE)) {
                 batchSettings.setTaskFailureToleranceLimit(Integer.valueOf
                   ((String) batchMapEntry.getValue()));
+              }  else if (batchMapEntry.getKey().equals
+                  (TASK_FAILURE_TOLERANCE_PER_BATCH)) {
+                batchSettings.setTaskFailureToleranceLimitPerBatch(Integer.valueOf
+                    ((String) batchMapEntry.getValue()));
               } else if (batchMapEntry.getKey().equals
                   (BATCH_SEPARATION_IN_SECONDS)) {
                 batchSettings.setBatchSeparationInSeconds(Integer.valueOf
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/orm/entities/RequestScheduleEntity.java b/ambari-server/src/main/java/org/apache/ambari/server/orm/entities/RequestScheduleEntity.java
index 84196a7..8270d21 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/orm/entities/RequestScheduleEntity.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/orm/entities/RequestScheduleEntity.java
@@ -70,6 +70,9 @@ public class RequestScheduleEntity {
   @Column(name = "batch_toleration_limit")
   private Integer batchTolerationLimit;
 
+  @Column(name = "batch_toleration_limit_per_batch")
+  private Integer batchTolerationLimitPerBatch;
+
   @Column(name = "authenticated_user_id")
   private Integer authenticatedUserId;
 
@@ -327,4 +330,12 @@ public class RequestScheduleEntity {
     result = 31 * result + clusterId.hashCode();
     return result;
   }
+
+  public Integer getBatchTolerationLimitPerBatch() {
+    return batchTolerationLimitPerBatch;
+  }
+
+  public void setBatchTolerationLimitPerBatch(Integer batchTolerationLimitPerBatch) {
+    this.batchTolerationLimitPerBatch = batchTolerationLimitPerBatch;
+  }
 }
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/scheduler/ExecutionScheduleManager.java b/ambari-server/src/main/java/org/apache/ambari/server/scheduler/ExecutionScheduleManager.java
index e804961..4f03a3e 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/scheduler/ExecutionScheduleManager.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/scheduler/ExecutionScheduleManager.java
@@ -712,13 +712,19 @@ public class ExecutionScheduleManager {
     }
 
     BatchSettings batchSettings = requestExecution.getBatch().getBatchSettings();
-    if (batchSettings != null
-        && batchSettings.getTaskFailureToleranceLimit() != null) {
-      return taskCounts.get(BatchRequestJob.BATCH_REQUEST_FAILED_TASKS_KEY) >
-        batchSettings.getTaskFailureToleranceLimit();
+
+    boolean result = false;
+    if (batchSettings != null) {
+      if (batchSettings.getTaskFailureToleranceLimit() != null) {
+        result = taskCounts.get(BatchRequestJob.BATCH_REQUEST_FAILED_TASKS_KEY) > batchSettings.getTaskFailureToleranceLimit();
+      }
+      if (batchSettings.getTaskFailureToleranceLimitPerBatch() != null) {
+        result = result || taskCounts.get(BatchRequestJob.BATCH_REQUEST_FAILED_TASKS_IN_CURRENT_BATCH_KEY) >
+            batchSettings.getTaskFailureToleranceLimitPerBatch();
+      }
     }
 
-    return false;
+    return result;
   }
 
   /**
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/state/scheduler/BatchRequestJob.java b/ambari-server/src/main/java/org/apache/ambari/server/state/scheduler/BatchRequestJob.java
index bbe8e52..56f121f 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/state/scheduler/BatchRequestJob.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/state/scheduler/BatchRequestJob.java
@@ -45,6 +45,8 @@ public class BatchRequestJob extends AbstractLinearExecutionJob {
     "BatchRequestJob.ClusterName";
   public static final String BATCH_REQUEST_FAILED_TASKS_KEY =
     "BatchRequestJob.FailedTaskCount";
+  public static final String BATCH_REQUEST_FAILED_TASKS_IN_CURRENT_BATCH_KEY =
+    "BatchRequestJob.FailedTaskInCurrentBatchCount";
   public static final String BATCH_REQUEST_TOTAL_TASKS_KEY =
     "BatchRequestJob.TotalTaskCount";
 
@@ -110,7 +112,8 @@ public class BatchRequestJob extends AbstractLinearExecutionJob {
         throw new AmbariException("Task failure tolerance limit exceeded"
             + ", execution_id = " + executionId
             + ", processed batch_id = " + batchId
-            + ", failed tasks = " + aggregateCounts.get(BATCH_REQUEST_FAILED_TASKS_KEY)
+            + ", failed tasks in current batch = " + aggregateCounts.get(BATCH_REQUEST_FAILED_TASKS_IN_CURRENT_BATCH_KEY)
+            + ", failed tasks total = " + aggregateCounts.get(BATCH_REQUEST_FAILED_TASKS_KEY)
             + ", total tasks completed = " + aggregateCounts.get(BATCH_REQUEST_TOTAL_TASKS_KEY));
       }
     }
@@ -153,9 +156,11 @@ public class BatchRequestJob extends AbstractLinearExecutionJob {
         batchRequestResponse.getTotalTaskCount();
 
       taskCounts.put(BATCH_REQUEST_FAILED_TASKS_KEY, failedCount);
+      taskCounts.put(BATCH_REQUEST_FAILED_TASKS_IN_CURRENT_BATCH_KEY, batchRequestResponse.getFailedTaskCount());
       taskCounts.put(BATCH_REQUEST_TOTAL_TASKS_KEY, totalCount);
 
       properties.put(BATCH_REQUEST_FAILED_TASKS_KEY, failedCount);
+      properties.put(BATCH_REQUEST_FAILED_TASKS_IN_CURRENT_BATCH_KEY, batchRequestResponse.getFailedTaskCount());
       properties.put(BATCH_REQUEST_TOTAL_TASKS_KEY, totalCount);
     }
 
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/state/scheduler/BatchSettings.java b/ambari-server/src/main/java/org/apache/ambari/server/state/scheduler/BatchSettings.java
index d1ce992..c7074bb 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/state/scheduler/BatchSettings.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/state/scheduler/BatchSettings.java
@@ -23,6 +23,7 @@ import org.codehaus.jackson.map.annotate.JsonSerialize;
 public class BatchSettings {
   private Integer batchSeparationInSeconds;
   private Integer taskFailureTolerance;
+  private Integer taskFailureTolerancePerBatch;
 
   @JsonSerialize(include = JsonSerialize.Inclusion.NON_EMPTY)
   @JsonProperty("batch_separation_in_seconds")
@@ -44,4 +45,13 @@ public class BatchSettings {
     this.taskFailureTolerance = taskFailureTolerance;
   }
 
+  @JsonSerialize(include = JsonSerialize.Inclusion.NON_EMPTY)
+  @JsonProperty("task_failure_tolerance_limit_per_batch")
+  public Integer getTaskFailureToleranceLimitPerBatch() {
+    return taskFailureTolerancePerBatch;
+  }
+
+  public void setTaskFailureToleranceLimitPerBatch(Integer taskFailureTolerancePerBatch) {
+    this.taskFailureTolerancePerBatch = taskFailureTolerancePerBatch;
+  }
 }
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/state/scheduler/RequestExecutionImpl.java b/ambari-server/src/main/java/org/apache/ambari/server/state/scheduler/RequestExecutionImpl.java
index 3436203..104ca9b 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/state/scheduler/RequestExecutionImpl.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/state/scheduler/RequestExecutionImpl.java
@@ -104,6 +104,7 @@ public class RequestExecutionImpl implements RequestExecution {
     BatchSettings batchSettings = new BatchSettings();
     batchSettings.setBatchSeparationInSeconds(requestScheduleEntity.getBatchSeparationInSeconds());
     batchSettings.setTaskFailureToleranceLimit(requestScheduleEntity.getBatchTolerationLimit());
+    batchSettings.setTaskFailureToleranceLimitPerBatch(requestScheduleEntity.getBatchTolerationLimitPerBatch());
 
     batch.setBatchSettings(batchSettings);
 
@@ -314,6 +315,7 @@ public class RequestExecutionImpl implements RequestExecution {
       if (settings != null) {
         requestScheduleEntity.setBatchSeparationInSeconds(settings.getBatchSeparationInSeconds());
         requestScheduleEntity.setBatchTolerationLimit(settings.getTaskFailureToleranceLimit());
+        requestScheduleEntity.setBatchTolerationLimitPerBatch(settings.getTaskFailureToleranceLimitPerBatch());
       }
     }
   }
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/upgrade/SchemaUpgradeHelper.java b/ambari-server/src/main/java/org/apache/ambari/server/upgrade/SchemaUpgradeHelper.java
index 1b3aedf..7563975 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/upgrade/SchemaUpgradeHelper.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/upgrade/SchemaUpgradeHelper.java
@@ -191,6 +191,7 @@ public class SchemaUpgradeHelper {
       catalogBinder.addBinding().to(UpgradeCatalog270.class);
       catalogBinder.addBinding().to(UpgradeCatalog271.class);
       catalogBinder.addBinding().to(UpgradeCatalog272.class);
+      catalogBinder.addBinding().to(UpgradeCatalog280.class);
       catalogBinder.addBinding().to(UpdateAlertScriptPaths.class);
       catalogBinder.addBinding().to(FinalUpgradeCatalog.class);
 
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/upgrade/UpgradeCatalog280.java b/ambari-server/src/main/java/org/apache/ambari/server/upgrade/UpgradeCatalog280.java
new file mode 100644
index 0000000..b257f2d
--- /dev/null
+++ b/ambari-server/src/main/java/org/apache/ambari/server/upgrade/UpgradeCatalog280.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ambari.server.upgrade;
+
+import java.sql.SQLException;
+
+import org.apache.ambari.server.AmbariException;
+import org.apache.ambari.server.orm.DBAccessor;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.inject.Inject;
+import com.google.inject.Injector;
+
+
+/**
+ * The {@link UpgradeCatalog280} upgrades Ambari from 2.7.2 to 2.8.0.
+ */
+public class UpgradeCatalog280 extends AbstractUpgradeCatalog {
+
+  private static final Logger LOG = LoggerFactory.getLogger(UpgradeCatalog280.class);
+
+  private static final String REQUEST_SCHEDULE_TABLE_NAME = "requestschedule";
+  private static final String REQUEST_SCHEDULE_BATCH_TOLERATION_LIMIT_PER_BATCH_COLUMN_NAME = "batch_toleration_limit_per_batch";
+  @Inject
+  public UpgradeCatalog280(Injector injector) {
+    super(injector);
+  }
+
+  @Override
+  public String getSourceVersion() {
+    return "2.7.2";
+  }
+
+  @Override
+  public String getTargetVersion() {
+    return "2.8.0";
+  }
+
+  @Override
+  protected void executeDDLUpdates() throws AmbariException, SQLException {
+    addComulnsToRequestscheduleTable();
+  }
+
+  @Override
+  protected void executePreDMLUpdates() throws AmbariException, SQLException {
+  }
+
+  @Override
+  protected void executeDMLUpdates() throws AmbariException, SQLException {
+  }
+
+  protected void addComulnsToRequestscheduleTable() throws SQLException {
+    dbAccessor.addColumn(REQUEST_SCHEDULE_TABLE_NAME,
+        new DBAccessor.DBColumnInfo(REQUEST_SCHEDULE_BATCH_TOLERATION_LIMIT_PER_BATCH_COLUMN_NAME, Short.class, null,
+            null, true));
+  }
+}
diff --git a/ambari-server/src/main/resources/Ambari-DDL-Derby-CREATE.sql b/ambari-server/src/main/resources/Ambari-DDL-Derby-CREATE.sql
index 80af3a1..064807b 100644
--- a/ambari-server/src/main/resources/Ambari-DDL-Derby-CREATE.sql
+++ b/ambari-server/src/main/resources/Ambari-DDL-Derby-CREATE.sql
@@ -344,6 +344,7 @@ CREATE TABLE requestschedule (
   status varchar(255),
   batch_separation_seconds smallint,
   batch_toleration_limit smallint,
+  batch_toleration_limit_per_batch smallint,
   authenticated_user_id INTEGER,
   create_user varchar(255),
   create_timestamp bigint,
diff --git a/ambari-server/src/main/resources/Ambari-DDL-MySQL-CREATE.sql b/ambari-server/src/main/resources/Ambari-DDL-MySQL-CREATE.sql
index bd2edf1..ccc0851 100644
--- a/ambari-server/src/main/resources/Ambari-DDL-MySQL-CREATE.sql
+++ b/ambari-server/src/main/resources/Ambari-DDL-MySQL-CREATE.sql
@@ -365,6 +365,7 @@ CREATE TABLE requestschedule (
   status varchar(255),
   batch_separation_seconds smallint,
   batch_toleration_limit smallint,
+  batch_toleration_limit_per_batch smallint,
   authenticated_user_id INTEGER,
   create_user varchar(255),
   create_timestamp bigint,
diff --git a/ambari-server/src/main/resources/Ambari-DDL-Oracle-CREATE.sql b/ambari-server/src/main/resources/Ambari-DDL-Oracle-CREATE.sql
index c4fc7fb..5f315f1 100644
--- a/ambari-server/src/main/resources/Ambari-DDL-Oracle-CREATE.sql
+++ b/ambari-server/src/main/resources/Ambari-DDL-Oracle-CREATE.sql
@@ -345,6 +345,7 @@ CREATE TABLE requestschedule (
   status VARCHAR2(255),
   batch_separation_seconds smallint,
   batch_toleration_limit smallint,
+  batch_toleration_limit_per_batch smallint,
   authenticated_user_id NUMBER(10),
   create_user VARCHAR2(255),
   create_timestamp NUMBER(19),
diff --git a/ambari-server/src/main/resources/Ambari-DDL-Postgres-CREATE.sql b/ambari-server/src/main/resources/Ambari-DDL-Postgres-CREATE.sql
index 7fd9e68..f40f940 100644
--- a/ambari-server/src/main/resources/Ambari-DDL-Postgres-CREATE.sql
+++ b/ambari-server/src/main/resources/Ambari-DDL-Postgres-CREATE.sql
@@ -346,6 +346,7 @@ CREATE TABLE requestschedule (
   status varchar(255),
   batch_separation_seconds smallint,
   batch_toleration_limit smallint,
+  batch_toleration_limit_per_batch smallint,
   authenticated_user_id INTEGER,
   create_user varchar(255),
   create_timestamp bigint,
diff --git a/ambari-server/src/main/resources/Ambari-DDL-SQLAnywhere-CREATE.sql b/ambari-server/src/main/resources/Ambari-DDL-SQLAnywhere-CREATE.sql
index 6303cb1..f957031 100644
--- a/ambari-server/src/main/resources/Ambari-DDL-SQLAnywhere-CREATE.sql
+++ b/ambari-server/src/main/resources/Ambari-DDL-SQLAnywhere-CREATE.sql
@@ -343,6 +343,7 @@ CREATE TABLE requestschedule (
   status VARCHAR(255),
   batch_separation_seconds smallint,
   batch_toleration_limit smallint,
+  batch_toleration_limit_per_batch smallint,
   authenticated_user_id INTEGER,
   create_user VARCHAR(255),
   create_timestamp NUMERIC(19),
diff --git a/ambari-server/src/main/resources/Ambari-DDL-SQLServer-CREATE.sql b/ambari-server/src/main/resources/Ambari-DDL-SQLServer-CREATE.sql
index f080e51..ba59961 100644
--- a/ambari-server/src/main/resources/Ambari-DDL-SQLServer-CREATE.sql
+++ b/ambari-server/src/main/resources/Ambari-DDL-SQLServer-CREATE.sql
@@ -349,6 +349,7 @@ CREATE TABLE requestschedule (
   STATUS VARCHAR(255),
   batch_separation_seconds SMALLINT,
   batch_toleration_limit SMALLINT,
+  batch_toleration_limit_per_batch SMALLINT,
   authenticated_user_id INTEGER,
   create_user VARCHAR(255),
   create_timestamp BIGINT,
diff --git a/ambari-server/src/test/java/org/apache/ambari/server/scheduler/ExecutionScheduleManagerTest.java b/ambari-server/src/test/java/org/apache/ambari/server/scheduler/ExecutionScheduleManagerTest.java
index bc1ab47..3f4a5d5 100644
--- a/ambari-server/src/test/java/org/apache/ambari/server/scheduler/ExecutionScheduleManagerTest.java
+++ b/ambari-server/src/test/java/org/apache/ambari/server/scheduler/ExecutionScheduleManagerTest.java
@@ -172,6 +172,7 @@ public class ExecutionScheduleManagerTest {
 
     BatchSettings batchSettings = new BatchSettings();
     batchSettings.setTaskFailureToleranceLimit(10);
+    batchSettings.setTaskFailureToleranceLimitPerBatch(1);
     batches.setBatchSettings(batchSettings);
 
     List<BatchRequest> batchRequests = new ArrayList<>();
@@ -512,6 +513,7 @@ public class ExecutionScheduleManagerTest {
 
     BatchSettings batchSettings = new BatchSettings();
     batchSettings.setTaskFailureToleranceLimit(1);
+    batchSettings.setTaskFailureToleranceLimitPerBatch(1);
 
     Map<Long, RequestExecution> executionMap = new HashMap<>();
     executionMap.put(executionId, requestExecutionMock);
@@ -542,6 +544,53 @@ public class ExecutionScheduleManagerTest {
       executionSchedulerMock, batchMock);
   }
 
+  @Test
+  public void testHasToleranceThresholdPerBatchExceeded() throws Exception {
+    Clusters clustersMock = createMock(Clusters.class);
+    Cluster clusterMock = createMock(Cluster.class);
+    Configuration configurationMock = createNiceMock(Configuration.class);
+    ExecutionScheduler executionSchedulerMock = createMock(ExecutionScheduler.class);
+    InternalTokenStorage tokenStorageMock = createMock(InternalTokenStorage.class);
+    ActionDBAccessor actionDBAccessorMock = createMock(ActionDBAccessor.class);
+    Gson gson = new Gson();
+    RequestExecution requestExecutionMock = createMock(RequestExecution.class);
+    Batch batchMock = createMock(Batch.class);
+
+    long executionId = 11L;
+    String clusterName = "c1";
+
+    BatchSettings batchSettings = new BatchSettings();
+    batchSettings.setTaskFailureToleranceLimitPerBatch(1);
+
+    Map<Long, RequestExecution> executionMap = new HashMap<>();
+    executionMap.put(executionId, requestExecutionMock);
+
+    expect(clustersMock.getCluster(clusterName)).andReturn(clusterMock).anyTimes();
+    expect(clusterMock.getAllRequestExecutions()).andReturn(executionMap).anyTimes();
+    expect(requestExecutionMock.getBatch()).andReturn(batchMock).anyTimes();
+    expect(batchMock.getBatchSettings()).andReturn(batchSettings).anyTimes();
+
+    replay(clustersMock, clusterMock, configurationMock, requestExecutionMock,
+        executionSchedulerMock, batchMock);
+
+    ExecutionScheduleManager scheduleManager =
+        new ExecutionScheduleManager(configurationMock, executionSchedulerMock,
+            tokenStorageMock, clustersMock, actionDBAccessorMock, gson);
+
+    HashMap<String, Integer> taskCounts = new HashMap<String, Integer>() {{
+      put(BatchRequestJob.BATCH_REQUEST_FAILED_TASKS_IN_CURRENT_BATCH_KEY, 2);
+      put(BatchRequestJob.BATCH_REQUEST_TOTAL_TASKS_KEY, 10);
+    }};
+
+    boolean exceeded = scheduleManager.hasToleranceThresholdExceeded
+      (executionId, clusterName, taskCounts);
+
+    Assert.assertTrue(exceeded);
+
+    verify(clustersMock, clusterMock, configurationMock, requestExecutionMock,
+      executionSchedulerMock, batchMock);
+  }
+
   @SuppressWarnings("unchecked")
   @Test
   public void testFinalizeBatch() throws Exception {
diff --git a/ambari-server/src/test/java/org/apache/ambari/server/state/RequestExecutionTest.java b/ambari-server/src/test/java/org/apache/ambari/server/state/RequestExecutionTest.java
index 711cb7a..bdcdb76 100644
--- a/ambari-server/src/test/java/org/apache/ambari/server/state/RequestExecutionTest.java
+++ b/ambari-server/src/test/java/org/apache/ambari/server/state/RequestExecutionTest.java
@@ -88,6 +88,7 @@ public class RequestExecutionTest {
 
     BatchSettings batchSettings = new BatchSettings();
     batchSettings.setTaskFailureToleranceLimit(10);
+    batchSettings.setTaskFailureToleranceLimitPerBatch(2);
     batches.setBatchSettings(batchSettings);
 
     List<BatchRequest> batchRequests = new ArrayList<>();
@@ -132,6 +133,8 @@ public class RequestExecutionTest {
     Assert.assertNotNull(scheduleEntity);
     Assert.assertEquals(requestExecution.getBatch().getBatchSettings()
       .getTaskFailureToleranceLimit(), scheduleEntity.getBatchTolerationLimit());
+    Assert.assertEquals(requestExecution.getBatch().getBatchSettings()
+      .getTaskFailureToleranceLimitPerBatch(), scheduleEntity.getBatchTolerationLimitPerBatch());
     Assert.assertEquals(scheduleEntity.getRequestScheduleBatchRequestEntities().size(), 2);
     Collection<RequestScheduleBatchRequestEntity> batchRequestEntities =
       scheduleEntity.getRequestScheduleBatchRequestEntities();
@@ -236,6 +239,8 @@ public class RequestExecutionTest {
     Assert.assertNotNull(scheduleEntity);
     Assert.assertEquals(requestExecution.getBatch().getBatchSettings()
       .getTaskFailureToleranceLimit(), scheduleEntity.getBatchTolerationLimit());
+    Assert.assertEquals(requestExecution.getBatch().getBatchSettings()
+      .getTaskFailureToleranceLimitPerBatch(), scheduleEntity.getBatchTolerationLimitPerBatch());
     Assert.assertEquals(scheduleEntity.getRequestScheduleBatchRequestEntities().size(), 2);
     Collection<RequestScheduleBatchRequestEntity> batchRequestEntities =
       scheduleEntity.getRequestScheduleBatchRequestEntities();
diff --git a/ambari-server/src/test/java/org/apache/ambari/server/state/scheduler/BatchRequestJobTest.java b/ambari-server/src/test/java/org/apache/ambari/server/state/scheduler/BatchRequestJobTest.java
index 9c01e96..558800f 100644
--- a/ambari-server/src/test/java/org/apache/ambari/server/state/scheduler/BatchRequestJobTest.java
+++ b/ambari-server/src/test/java/org/apache/ambari/server/state/scheduler/BatchRequestJobTest.java
@@ -63,6 +63,7 @@ public class BatchRequestJobTest {
 
     HashMap<String, Integer> taskCounts = new HashMap<String, Integer>()
     {{ put(BatchRequestJob.BATCH_REQUEST_FAILED_TASKS_KEY, 0);
+      put(BatchRequestJob.BATCH_REQUEST_FAILED_TASKS_IN_CURRENT_BATCH_KEY, 0);
       put(BatchRequestJob.BATCH_REQUEST_TOTAL_TASKS_KEY, 0); }};
 
 
diff --git a/ambari-server/src/test/java/org/apache/ambari/server/upgrade/UpgradeCatalog280Test.java b/ambari-server/src/test/java/org/apache/ambari/server/upgrade/UpgradeCatalog280Test.java
new file mode 100644
index 0000000..ae6f300
--- /dev/null
+++ b/ambari-server/src/test/java/org/apache/ambari/server/upgrade/UpgradeCatalog280Test.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ambari.server.upgrade;
+
+
+import static org.easymock.EasyMock.capture;
+import static org.easymock.EasyMock.eq;
+import static org.easymock.EasyMock.expectLastCall;
+import static org.easymock.EasyMock.newCapture;
+import static org.easymock.EasyMock.replay;
+import static org.easymock.EasyMock.verify;
+
+import org.apache.ambari.server.orm.DBAccessor;
+import org.easymock.Capture;
+import org.easymock.CaptureType;
+import org.easymock.EasyMockSupport;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+
+import com.google.inject.Injector;
+
+public class UpgradeCatalog280Test {
+
+  private Injector injector;
+  private DBAccessor dbAccessor;
+
+  @Before
+  public void init() {
+    final EasyMockSupport easyMockSupport = new EasyMockSupport();
+    injector = easyMockSupport.createNiceMock(Injector.class);
+    dbAccessor = easyMockSupport.createNiceMock(DBAccessor.class);
+  }
+
+  @Test
+  public void testExecuteDDLUpdates() throws Exception {
+
+    Capture<DBAccessor.DBColumnInfo> perBatchLimitColumn = newCapture(CaptureType.ALL);
+    dbAccessor.addColumn(eq("requestschedule"), capture(perBatchLimitColumn));
+    expectLastCall().once();
+
+
+    replay(dbAccessor, injector);
+
+    UpgradeCatalog280 upgradeCatalog280 = new UpgradeCatalog280(injector);
+    upgradeCatalog280.dbAccessor = dbAccessor;
+    upgradeCatalog280.executeDDLUpdates();
+
+    DBAccessor.DBColumnInfo capturedBlueprintProvisioningStateColumn =
+        perBatchLimitColumn.getValue();
+    Assert.assertEquals("batch_toleration_limit_per_batch",
+        capturedBlueprintProvisioningStateColumn.getName());
+    Assert.assertEquals(null, capturedBlueprintProvisioningStateColumn.getDefaultValue());
+    Assert.assertEquals(Short.class, capturedBlueprintProvisioningStateColumn.getType());
+
+    verify(dbAccessor);
+  }
+}