You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ambari.apache.org by ds...@apache.org on 2018/09/26 10:54:04 UTC
[ambari] branch trunk updated: [AMBARI-24682] Rolling Restarts:
Option to specify number of failures per batch (dsen) (#2371)
This is an automated email from the ASF dual-hosted git repository.
dsen pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/ambari.git
The following commit(s) were added to refs/heads/trunk by this push:
new 46711f8 [AMBARI-24682] Rolling Restarts: Option to specify number of failures per batch (dsen) (#2371)
46711f8 is described below
commit 46711f8259e388868ce2a58845e7edc92c199b1a
Author: Dmitry Sen <ds...@apache.org>
AuthorDate: Wed Sep 26 13:53:58 2018 +0300
[AMBARI-24682] Rolling Restarts: Option to specify number of failures per batch (dsen) (#2371)
---
.../internal/RequestScheduleResourceProvider.java | 7 ++
.../server/orm/entities/RequestScheduleEntity.java | 11 ++++
.../server/scheduler/ExecutionScheduleManager.java | 16 +++--
.../server/state/scheduler/BatchRequestJob.java | 7 +-
.../server/state/scheduler/BatchSettings.java | 10 +++
.../state/scheduler/RequestExecutionImpl.java | 2 +
.../ambari/server/upgrade/SchemaUpgradeHelper.java | 1 +
.../ambari/server/upgrade/UpgradeCatalog280.java | 73 +++++++++++++++++++++
.../src/main/resources/Ambari-DDL-Derby-CREATE.sql | 1 +
.../src/main/resources/Ambari-DDL-MySQL-CREATE.sql | 1 +
.../main/resources/Ambari-DDL-Oracle-CREATE.sql | 1 +
.../main/resources/Ambari-DDL-Postgres-CREATE.sql | 1 +
.../resources/Ambari-DDL-SQLAnywhere-CREATE.sql | 1 +
.../main/resources/Ambari-DDL-SQLServer-CREATE.sql | 1 +
.../scheduler/ExecutionScheduleManagerTest.java | 49 ++++++++++++++
.../ambari/server/state/RequestExecutionTest.java | 5 ++
.../state/scheduler/BatchRequestJobTest.java | 1 +
.../server/upgrade/UpgradeCatalog280Test.java | 74 ++++++++++++++++++++++
18 files changed, 256 insertions(+), 6 deletions(-)
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/controller/internal/RequestScheduleResourceProvider.java b/ambari-server/src/main/java/org/apache/ambari/server/controller/internal/RequestScheduleResourceProvider.java
index 15ae9dd..18ce73b 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/controller/internal/RequestScheduleResourceProvider.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/controller/internal/RequestScheduleResourceProvider.java
@@ -78,6 +78,7 @@ public class RequestScheduleResourceProvider extends AbstractControllerResourceP
public static final String BATCH_SEPARATION_IN_SECONDS_PROPERTY_ID = "batch_separation_in_seconds";
public static final String TASK_FAILURE_TOLERANCE_PROPERTY_ID = "task_failure_tolerance";
+ public static final String TASK_FAILURE_TOLERANCE_PER_BATCH_PROPERTY_ID = "task_failure_tolerance_per_batch";
public static final String TASK_FAILURE_TOLERANCE_LIMIT_PROPERTY_ID = "task_failure_tolerance_limit";
public static final String REQUESTS_PROPERTY_ID = "requests";
@@ -122,6 +123,7 @@ public class RequestScheduleResourceProvider extends AbstractControllerResourceP
BATCH_SEPARATION_IN_SECONDS_PROPERTY_ID);
public static final String TASK_FAILURE_TOLERANCE = PropertyHelper.getPropertyId(BATCH_SETTINGS,
TASK_FAILURE_TOLERANCE_PROPERTY_ID);
+ public static final String TASK_FAILURE_TOLERANCE_PER_BATCH = PropertyHelper.getPropertyId(BATCH_SETTINGS, TASK_FAILURE_TOLERANCE_PER_BATCH_PROPERTY_ID);
public static final String REQUESTS = PropertyHelper.getPropertyId(null, REQUESTS_PROPERTY_ID);
public static final String TYPE = PropertyHelper.getPropertyId(null, TYPE_PROPERTY_ID);
@@ -164,6 +166,7 @@ public class RequestScheduleResourceProvider extends AbstractControllerResourceP
UPDATE_TIME,
BATCH_SEPARATION_IN_SECONDS,
TASK_FAILURE_TOLERANCE,
+ TASK_FAILURE_TOLERANCE_PER_BATCH,
REQUESTS,
TYPE,
URI,
@@ -633,6 +636,10 @@ public class RequestScheduleResourceProvider extends AbstractControllerResourceP
(TASK_FAILURE_TOLERANCE)) {
batchSettings.setTaskFailureToleranceLimit(Integer.valueOf
((String) batchMapEntry.getValue()));
+ } else if (batchMapEntry.getKey().equals
+ (TASK_FAILURE_TOLERANCE_PER_BATCH)) {
+ batchSettings.setTaskFailureToleranceLimitPerBatch(Integer.valueOf
+ ((String) batchMapEntry.getValue()));
} else if (batchMapEntry.getKey().equals
(BATCH_SEPARATION_IN_SECONDS)) {
batchSettings.setBatchSeparationInSeconds(Integer.valueOf
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/orm/entities/RequestScheduleEntity.java b/ambari-server/src/main/java/org/apache/ambari/server/orm/entities/RequestScheduleEntity.java
index 84196a7..8270d21 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/orm/entities/RequestScheduleEntity.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/orm/entities/RequestScheduleEntity.java
@@ -70,6 +70,9 @@ public class RequestScheduleEntity {
@Column(name = "batch_toleration_limit")
private Integer batchTolerationLimit;
+ @Column(name = "batch_toleration_limit_per_batch")
+ private Integer batchTolerationLimitPerBatch;
+
@Column(name = "authenticated_user_id")
private Integer authenticatedUserId;
@@ -327,4 +330,12 @@ public class RequestScheduleEntity {
result = 31 * result + clusterId.hashCode();
return result;
}
+
+ public Integer getBatchTolerationLimitPerBatch() {
+ return batchTolerationLimitPerBatch;
+ }
+
+ public void setBatchTolerationLimitPerBatch(Integer batchTolerationLimitPerBatch) {
+ this.batchTolerationLimitPerBatch = batchTolerationLimitPerBatch;
+ }
}
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/scheduler/ExecutionScheduleManager.java b/ambari-server/src/main/java/org/apache/ambari/server/scheduler/ExecutionScheduleManager.java
index e804961..4f03a3e 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/scheduler/ExecutionScheduleManager.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/scheduler/ExecutionScheduleManager.java
@@ -712,13 +712,19 @@ public class ExecutionScheduleManager {
}
BatchSettings batchSettings = requestExecution.getBatch().getBatchSettings();
- if (batchSettings != null
- && batchSettings.getTaskFailureToleranceLimit() != null) {
- return taskCounts.get(BatchRequestJob.BATCH_REQUEST_FAILED_TASKS_KEY) >
- batchSettings.getTaskFailureToleranceLimit();
+
+ boolean result = false;
+ if (batchSettings != null) {
+ if (batchSettings.getTaskFailureToleranceLimit() != null) {
+ result = taskCounts.get(BatchRequestJob.BATCH_REQUEST_FAILED_TASKS_KEY) > batchSettings.getTaskFailureToleranceLimit();
+ }
+ if (batchSettings.getTaskFailureToleranceLimitPerBatch() != null) {
+ result = result || taskCounts.get(BatchRequestJob.BATCH_REQUEST_FAILED_TASKS_IN_CURRENT_BATCH_KEY) >
+ batchSettings.getTaskFailureToleranceLimitPerBatch();
+ }
}
- return false;
+ return result;
}
/**
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/state/scheduler/BatchRequestJob.java b/ambari-server/src/main/java/org/apache/ambari/server/state/scheduler/BatchRequestJob.java
index bbe8e52..56f121f 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/state/scheduler/BatchRequestJob.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/state/scheduler/BatchRequestJob.java
@@ -45,6 +45,8 @@ public class BatchRequestJob extends AbstractLinearExecutionJob {
"BatchRequestJob.ClusterName";
public static final String BATCH_REQUEST_FAILED_TASKS_KEY =
"BatchRequestJob.FailedTaskCount";
+ public static final String BATCH_REQUEST_FAILED_TASKS_IN_CURRENT_BATCH_KEY =
+ "BatchRequestJob.FailedTaskInCurrentBatchCount";
public static final String BATCH_REQUEST_TOTAL_TASKS_KEY =
"BatchRequestJob.TotalTaskCount";
@@ -110,7 +112,8 @@ public class BatchRequestJob extends AbstractLinearExecutionJob {
throw new AmbariException("Task failure tolerance limit exceeded"
+ ", execution_id = " + executionId
+ ", processed batch_id = " + batchId
- + ", failed tasks = " + aggregateCounts.get(BATCH_REQUEST_FAILED_TASKS_KEY)
+ + ", failed tasks in current batch = " + aggregateCounts.get(BATCH_REQUEST_FAILED_TASKS_IN_CURRENT_BATCH_KEY)
+ + ", failed tasks total = " + aggregateCounts.get(BATCH_REQUEST_FAILED_TASKS_KEY)
+ ", total tasks completed = " + aggregateCounts.get(BATCH_REQUEST_TOTAL_TASKS_KEY));
}
}
@@ -153,9 +156,11 @@ public class BatchRequestJob extends AbstractLinearExecutionJob {
batchRequestResponse.getTotalTaskCount();
taskCounts.put(BATCH_REQUEST_FAILED_TASKS_KEY, failedCount);
+ taskCounts.put(BATCH_REQUEST_FAILED_TASKS_IN_CURRENT_BATCH_KEY, batchRequestResponse.getFailedTaskCount());
taskCounts.put(BATCH_REQUEST_TOTAL_TASKS_KEY, totalCount);
properties.put(BATCH_REQUEST_FAILED_TASKS_KEY, failedCount);
+ properties.put(BATCH_REQUEST_FAILED_TASKS_IN_CURRENT_BATCH_KEY, batchRequestResponse.getFailedTaskCount());
properties.put(BATCH_REQUEST_TOTAL_TASKS_KEY, totalCount);
}
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/state/scheduler/BatchSettings.java b/ambari-server/src/main/java/org/apache/ambari/server/state/scheduler/BatchSettings.java
index d1ce992..c7074bb 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/state/scheduler/BatchSettings.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/state/scheduler/BatchSettings.java
@@ -23,6 +23,7 @@ import org.codehaus.jackson.map.annotate.JsonSerialize;
public class BatchSettings {
private Integer batchSeparationInSeconds;
private Integer taskFailureTolerance;
+ private Integer taskFailureTolerancePerBatch;
@JsonSerialize(include = JsonSerialize.Inclusion.NON_EMPTY)
@JsonProperty("batch_separation_in_seconds")
@@ -44,4 +45,13 @@ public class BatchSettings {
this.taskFailureTolerance = taskFailureTolerance;
}
+ @JsonSerialize(include = JsonSerialize.Inclusion.NON_EMPTY)
+ @JsonProperty("task_failure_tolerance_limit_per_batch")
+ public Integer getTaskFailureToleranceLimitPerBatch() {
+ return taskFailureTolerancePerBatch;
+ }
+
+ public void setTaskFailureToleranceLimitPerBatch(Integer taskFailureTolerancePerBatch) {
+ this.taskFailureTolerancePerBatch = taskFailureTolerancePerBatch;
+ }
}
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/state/scheduler/RequestExecutionImpl.java b/ambari-server/src/main/java/org/apache/ambari/server/state/scheduler/RequestExecutionImpl.java
index 3436203..104ca9b 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/state/scheduler/RequestExecutionImpl.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/state/scheduler/RequestExecutionImpl.java
@@ -104,6 +104,7 @@ public class RequestExecutionImpl implements RequestExecution {
BatchSettings batchSettings = new BatchSettings();
batchSettings.setBatchSeparationInSeconds(requestScheduleEntity.getBatchSeparationInSeconds());
batchSettings.setTaskFailureToleranceLimit(requestScheduleEntity.getBatchTolerationLimit());
+ batchSettings.setTaskFailureToleranceLimitPerBatch(requestScheduleEntity.getBatchTolerationLimitPerBatch());
batch.setBatchSettings(batchSettings);
@@ -314,6 +315,7 @@ public class RequestExecutionImpl implements RequestExecution {
if (settings != null) {
requestScheduleEntity.setBatchSeparationInSeconds(settings.getBatchSeparationInSeconds());
requestScheduleEntity.setBatchTolerationLimit(settings.getTaskFailureToleranceLimit());
+ requestScheduleEntity.setBatchTolerationLimitPerBatch(settings.getTaskFailureToleranceLimitPerBatch());
}
}
}
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/upgrade/SchemaUpgradeHelper.java b/ambari-server/src/main/java/org/apache/ambari/server/upgrade/SchemaUpgradeHelper.java
index 1b3aedf..7563975 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/upgrade/SchemaUpgradeHelper.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/upgrade/SchemaUpgradeHelper.java
@@ -191,6 +191,7 @@ public class SchemaUpgradeHelper {
catalogBinder.addBinding().to(UpgradeCatalog270.class);
catalogBinder.addBinding().to(UpgradeCatalog271.class);
catalogBinder.addBinding().to(UpgradeCatalog272.class);
+ catalogBinder.addBinding().to(UpgradeCatalog280.class);
catalogBinder.addBinding().to(UpdateAlertScriptPaths.class);
catalogBinder.addBinding().to(FinalUpgradeCatalog.class);
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/upgrade/UpgradeCatalog280.java b/ambari-server/src/main/java/org/apache/ambari/server/upgrade/UpgradeCatalog280.java
new file mode 100644
index 0000000..b257f2d
--- /dev/null
+++ b/ambari-server/src/main/java/org/apache/ambari/server/upgrade/UpgradeCatalog280.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ambari.server.upgrade;
+
+import java.sql.SQLException;
+
+import org.apache.ambari.server.AmbariException;
+import org.apache.ambari.server.orm.DBAccessor;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.inject.Inject;
+import com.google.inject.Injector;
+
+
+/**
+ * The {@link UpgradeCatalog280} upgrades Ambari from 2.7.2 to 2.8.0.
+ */
+public class UpgradeCatalog280 extends AbstractUpgradeCatalog {
+
+ private static final Logger LOG = LoggerFactory.getLogger(UpgradeCatalog280.class);
+
+ private static final String REQUEST_SCHEDULE_TABLE_NAME = "requestschedule";
+ private static final String REQUEST_SCHEDULE_BATCH_TOLERATION_LIMIT_PER_BATCH_COLUMN_NAME = "batch_toleration_limit_per_batch";
+ @Inject
+ public UpgradeCatalog280(Injector injector) {
+ super(injector);
+ }
+
+ @Override
+ public String getSourceVersion() {
+ return "2.7.2";
+ }
+
+ @Override
+ public String getTargetVersion() {
+ return "2.8.0";
+ }
+
+ @Override
+ protected void executeDDLUpdates() throws AmbariException, SQLException {
+ addComulnsToRequestscheduleTable();
+ }
+
+ @Override
+ protected void executePreDMLUpdates() throws AmbariException, SQLException {
+ }
+
+ @Override
+ protected void executeDMLUpdates() throws AmbariException, SQLException {
+ }
+
+ protected void addComulnsToRequestscheduleTable() throws SQLException {
+ dbAccessor.addColumn(REQUEST_SCHEDULE_TABLE_NAME,
+ new DBAccessor.DBColumnInfo(REQUEST_SCHEDULE_BATCH_TOLERATION_LIMIT_PER_BATCH_COLUMN_NAME, Short.class, null,
+ null, true));
+ }
+}
diff --git a/ambari-server/src/main/resources/Ambari-DDL-Derby-CREATE.sql b/ambari-server/src/main/resources/Ambari-DDL-Derby-CREATE.sql
index 80af3a1..064807b 100644
--- a/ambari-server/src/main/resources/Ambari-DDL-Derby-CREATE.sql
+++ b/ambari-server/src/main/resources/Ambari-DDL-Derby-CREATE.sql
@@ -344,6 +344,7 @@ CREATE TABLE requestschedule (
status varchar(255),
batch_separation_seconds smallint,
batch_toleration_limit smallint,
+ batch_toleration_limit_per_batch smallint,
authenticated_user_id INTEGER,
create_user varchar(255),
create_timestamp bigint,
diff --git a/ambari-server/src/main/resources/Ambari-DDL-MySQL-CREATE.sql b/ambari-server/src/main/resources/Ambari-DDL-MySQL-CREATE.sql
index bd2edf1..ccc0851 100644
--- a/ambari-server/src/main/resources/Ambari-DDL-MySQL-CREATE.sql
+++ b/ambari-server/src/main/resources/Ambari-DDL-MySQL-CREATE.sql
@@ -365,6 +365,7 @@ CREATE TABLE requestschedule (
status varchar(255),
batch_separation_seconds smallint,
batch_toleration_limit smallint,
+ batch_toleration_limit_per_batch smallint,
authenticated_user_id INTEGER,
create_user varchar(255),
create_timestamp bigint,
diff --git a/ambari-server/src/main/resources/Ambari-DDL-Oracle-CREATE.sql b/ambari-server/src/main/resources/Ambari-DDL-Oracle-CREATE.sql
index c4fc7fb..5f315f1 100644
--- a/ambari-server/src/main/resources/Ambari-DDL-Oracle-CREATE.sql
+++ b/ambari-server/src/main/resources/Ambari-DDL-Oracle-CREATE.sql
@@ -345,6 +345,7 @@ CREATE TABLE requestschedule (
status VARCHAR2(255),
batch_separation_seconds smallint,
batch_toleration_limit smallint,
+ batch_toleration_limit_per_batch smallint,
authenticated_user_id NUMBER(10),
create_user VARCHAR2(255),
create_timestamp NUMBER(19),
diff --git a/ambari-server/src/main/resources/Ambari-DDL-Postgres-CREATE.sql b/ambari-server/src/main/resources/Ambari-DDL-Postgres-CREATE.sql
index 7fd9e68..f40f940 100644
--- a/ambari-server/src/main/resources/Ambari-DDL-Postgres-CREATE.sql
+++ b/ambari-server/src/main/resources/Ambari-DDL-Postgres-CREATE.sql
@@ -346,6 +346,7 @@ CREATE TABLE requestschedule (
status varchar(255),
batch_separation_seconds smallint,
batch_toleration_limit smallint,
+ batch_toleration_limit_per_batch smallint,
authenticated_user_id INTEGER,
create_user varchar(255),
create_timestamp bigint,
diff --git a/ambari-server/src/main/resources/Ambari-DDL-SQLAnywhere-CREATE.sql b/ambari-server/src/main/resources/Ambari-DDL-SQLAnywhere-CREATE.sql
index 6303cb1..f957031 100644
--- a/ambari-server/src/main/resources/Ambari-DDL-SQLAnywhere-CREATE.sql
+++ b/ambari-server/src/main/resources/Ambari-DDL-SQLAnywhere-CREATE.sql
@@ -343,6 +343,7 @@ CREATE TABLE requestschedule (
status VARCHAR(255),
batch_separation_seconds smallint,
batch_toleration_limit smallint,
+ batch_toleration_limit_per_batch smallint,
authenticated_user_id INTEGER,
create_user VARCHAR(255),
create_timestamp NUMERIC(19),
diff --git a/ambari-server/src/main/resources/Ambari-DDL-SQLServer-CREATE.sql b/ambari-server/src/main/resources/Ambari-DDL-SQLServer-CREATE.sql
index f080e51..ba59961 100644
--- a/ambari-server/src/main/resources/Ambari-DDL-SQLServer-CREATE.sql
+++ b/ambari-server/src/main/resources/Ambari-DDL-SQLServer-CREATE.sql
@@ -349,6 +349,7 @@ CREATE TABLE requestschedule (
STATUS VARCHAR(255),
batch_separation_seconds SMALLINT,
batch_toleration_limit SMALLINT,
+ batch_toleration_limit_per_batch SMALLINT,
authenticated_user_id INTEGER,
create_user VARCHAR(255),
create_timestamp BIGINT,
diff --git a/ambari-server/src/test/java/org/apache/ambari/server/scheduler/ExecutionScheduleManagerTest.java b/ambari-server/src/test/java/org/apache/ambari/server/scheduler/ExecutionScheduleManagerTest.java
index bc1ab47..3f4a5d5 100644
--- a/ambari-server/src/test/java/org/apache/ambari/server/scheduler/ExecutionScheduleManagerTest.java
+++ b/ambari-server/src/test/java/org/apache/ambari/server/scheduler/ExecutionScheduleManagerTest.java
@@ -172,6 +172,7 @@ public class ExecutionScheduleManagerTest {
BatchSettings batchSettings = new BatchSettings();
batchSettings.setTaskFailureToleranceLimit(10);
+ batchSettings.setTaskFailureToleranceLimitPerBatch(1);
batches.setBatchSettings(batchSettings);
List<BatchRequest> batchRequests = new ArrayList<>();
@@ -512,6 +513,7 @@ public class ExecutionScheduleManagerTest {
BatchSettings batchSettings = new BatchSettings();
batchSettings.setTaskFailureToleranceLimit(1);
+ batchSettings.setTaskFailureToleranceLimitPerBatch(1);
Map<Long, RequestExecution> executionMap = new HashMap<>();
executionMap.put(executionId, requestExecutionMock);
@@ -542,6 +544,53 @@ public class ExecutionScheduleManagerTest {
executionSchedulerMock, batchMock);
}
+ @Test
+ public void testHasToleranceThresholdPerBatchExceeded() throws Exception {
+ Clusters clustersMock = createMock(Clusters.class);
+ Cluster clusterMock = createMock(Cluster.class);
+ Configuration configurationMock = createNiceMock(Configuration.class);
+ ExecutionScheduler executionSchedulerMock = createMock(ExecutionScheduler.class);
+ InternalTokenStorage tokenStorageMock = createMock(InternalTokenStorage.class);
+ ActionDBAccessor actionDBAccessorMock = createMock(ActionDBAccessor.class);
+ Gson gson = new Gson();
+ RequestExecution requestExecutionMock = createMock(RequestExecution.class);
+ Batch batchMock = createMock(Batch.class);
+
+ long executionId = 11L;
+ String clusterName = "c1";
+
+ BatchSettings batchSettings = new BatchSettings();
+ batchSettings.setTaskFailureToleranceLimitPerBatch(1);
+
+ Map<Long, RequestExecution> executionMap = new HashMap<>();
+ executionMap.put(executionId, requestExecutionMock);
+
+ expect(clustersMock.getCluster(clusterName)).andReturn(clusterMock).anyTimes();
+ expect(clusterMock.getAllRequestExecutions()).andReturn(executionMap).anyTimes();
+ expect(requestExecutionMock.getBatch()).andReturn(batchMock).anyTimes();
+ expect(batchMock.getBatchSettings()).andReturn(batchSettings).anyTimes();
+
+ replay(clustersMock, clusterMock, configurationMock, requestExecutionMock,
+ executionSchedulerMock, batchMock);
+
+ ExecutionScheduleManager scheduleManager =
+ new ExecutionScheduleManager(configurationMock, executionSchedulerMock,
+ tokenStorageMock, clustersMock, actionDBAccessorMock, gson);
+
+ HashMap<String, Integer> taskCounts = new HashMap<String, Integer>() {{
+ put(BatchRequestJob.BATCH_REQUEST_FAILED_TASKS_IN_CURRENT_BATCH_KEY, 2);
+ put(BatchRequestJob.BATCH_REQUEST_TOTAL_TASKS_KEY, 10);
+ }};
+
+ boolean exceeded = scheduleManager.hasToleranceThresholdExceeded
+ (executionId, clusterName, taskCounts);
+
+ Assert.assertTrue(exceeded);
+
+ verify(clustersMock, clusterMock, configurationMock, requestExecutionMock,
+ executionSchedulerMock, batchMock);
+ }
+
@SuppressWarnings("unchecked")
@Test
public void testFinalizeBatch() throws Exception {
diff --git a/ambari-server/src/test/java/org/apache/ambari/server/state/RequestExecutionTest.java b/ambari-server/src/test/java/org/apache/ambari/server/state/RequestExecutionTest.java
index 711cb7a..bdcdb76 100644
--- a/ambari-server/src/test/java/org/apache/ambari/server/state/RequestExecutionTest.java
+++ b/ambari-server/src/test/java/org/apache/ambari/server/state/RequestExecutionTest.java
@@ -88,6 +88,7 @@ public class RequestExecutionTest {
BatchSettings batchSettings = new BatchSettings();
batchSettings.setTaskFailureToleranceLimit(10);
+ batchSettings.setTaskFailureToleranceLimitPerBatch(2);
batches.setBatchSettings(batchSettings);
List<BatchRequest> batchRequests = new ArrayList<>();
@@ -132,6 +133,8 @@ public class RequestExecutionTest {
Assert.assertNotNull(scheduleEntity);
Assert.assertEquals(requestExecution.getBatch().getBatchSettings()
.getTaskFailureToleranceLimit(), scheduleEntity.getBatchTolerationLimit());
+ Assert.assertEquals(requestExecution.getBatch().getBatchSettings()
+ .getTaskFailureToleranceLimitPerBatch(), scheduleEntity.getBatchTolerationLimitPerBatch());
Assert.assertEquals(scheduleEntity.getRequestScheduleBatchRequestEntities().size(), 2);
Collection<RequestScheduleBatchRequestEntity> batchRequestEntities =
scheduleEntity.getRequestScheduleBatchRequestEntities();
@@ -236,6 +239,8 @@ public class RequestExecutionTest {
Assert.assertNotNull(scheduleEntity);
Assert.assertEquals(requestExecution.getBatch().getBatchSettings()
.getTaskFailureToleranceLimit(), scheduleEntity.getBatchTolerationLimit());
+ Assert.assertEquals(requestExecution.getBatch().getBatchSettings()
+ .getTaskFailureToleranceLimitPerBatch(), scheduleEntity.getBatchTolerationLimitPerBatch());
Assert.assertEquals(scheduleEntity.getRequestScheduleBatchRequestEntities().size(), 2);
Collection<RequestScheduleBatchRequestEntity> batchRequestEntities =
scheduleEntity.getRequestScheduleBatchRequestEntities();
diff --git a/ambari-server/src/test/java/org/apache/ambari/server/state/scheduler/BatchRequestJobTest.java b/ambari-server/src/test/java/org/apache/ambari/server/state/scheduler/BatchRequestJobTest.java
index 9c01e96..558800f 100644
--- a/ambari-server/src/test/java/org/apache/ambari/server/state/scheduler/BatchRequestJobTest.java
+++ b/ambari-server/src/test/java/org/apache/ambari/server/state/scheduler/BatchRequestJobTest.java
@@ -63,6 +63,7 @@ public class BatchRequestJobTest {
HashMap<String, Integer> taskCounts = new HashMap<String, Integer>()
{{ put(BatchRequestJob.BATCH_REQUEST_FAILED_TASKS_KEY, 0);
+ put(BatchRequestJob.BATCH_REQUEST_FAILED_TASKS_IN_CURRENT_BATCH_KEY, 0);
put(BatchRequestJob.BATCH_REQUEST_TOTAL_TASKS_KEY, 0); }};
diff --git a/ambari-server/src/test/java/org/apache/ambari/server/upgrade/UpgradeCatalog280Test.java b/ambari-server/src/test/java/org/apache/ambari/server/upgrade/UpgradeCatalog280Test.java
new file mode 100644
index 0000000..ae6f300
--- /dev/null
+++ b/ambari-server/src/test/java/org/apache/ambari/server/upgrade/UpgradeCatalog280Test.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ambari.server.upgrade;
+
+
+import static org.easymock.EasyMock.capture;
+import static org.easymock.EasyMock.eq;
+import static org.easymock.EasyMock.expectLastCall;
+import static org.easymock.EasyMock.newCapture;
+import static org.easymock.EasyMock.replay;
+import static org.easymock.EasyMock.verify;
+
+import org.apache.ambari.server.orm.DBAccessor;
+import org.easymock.Capture;
+import org.easymock.CaptureType;
+import org.easymock.EasyMockSupport;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+
+import com.google.inject.Injector;
+
+public class UpgradeCatalog280Test {
+
+ private Injector injector;
+ private DBAccessor dbAccessor;
+
+ @Before
+ public void init() {
+ final EasyMockSupport easyMockSupport = new EasyMockSupport();
+ injector = easyMockSupport.createNiceMock(Injector.class);
+ dbAccessor = easyMockSupport.createNiceMock(DBAccessor.class);
+ }
+
+ @Test
+ public void testExecuteDDLUpdates() throws Exception {
+
+ Capture<DBAccessor.DBColumnInfo> perBatchLimitColumn = newCapture(CaptureType.ALL);
+ dbAccessor.addColumn(eq("requestschedule"), capture(perBatchLimitColumn));
+ expectLastCall().once();
+
+
+ replay(dbAccessor, injector);
+
+ UpgradeCatalog280 upgradeCatalog280 = new UpgradeCatalog280(injector);
+ upgradeCatalog280.dbAccessor = dbAccessor;
+ upgradeCatalog280.executeDDLUpdates();
+
+ DBAccessor.DBColumnInfo capturedBlueprintProvisioningStateColumn =
+ perBatchLimitColumn.getValue();
+ Assert.assertEquals("batch_toleration_limit_per_batch",
+ capturedBlueprintProvisioningStateColumn.getName());
+ Assert.assertEquals(null, capturedBlueprintProvisioningStateColumn.getDefaultValue());
+ Assert.assertEquals(Short.class, capturedBlueprintProvisioningStateColumn.getType());
+
+ verify(dbAccessor);
+ }
+}