You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ambari.apache.org by ol...@apache.org on 2018/01/26 10:22:17 UTC

[ambari] branch trunk updated: AMBARI-22799 - define scheduling of archiving Infra Solr Documents

This is an automated email from the ASF dual-hosted git repository.

oleewere pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/ambari.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 19a8e75  AMBARI-22799 - define scheduling of archiving Infra Solr Documents
19a8e75 is described below

commit 19a8e75d3887d9354a3b14a36c362f73d962e9bf
Author: kkasa <ka...@gmail.com>
AuthorDate: Tue Jan 16 16:13:24 2018 +0100

    AMBARI-22799 - define scheduling of archiving Infra Solr Documents
---
 .../org/apache/ambari/infra/HttpResponse.java}     |  20 +++-
 .../java/org/apache/ambari/infra/InfraClient.java  |  32 ++++--
 .../org/apache/ambari/infra/JobExecutionInfo.java} |  28 ++++-
 .../ambari/infra/steps/AbstractInfraSteps.java     |  12 ++-
 .../apache/ambari/infra/steps/ExportJobsSteps.java |  66 +++++++++---
 .../test/resources/stories/infra_api_tests.story   |  26 +++--
 .../ambari-infra-manager/docs/api/swagger.yaml     |   2 +-
 .../InfraManagerSchedulingConfig.java}             |  15 ++-
 .../infra/conf/batch/InfraManagerBatchConfig.java  |  25 +++--
 .../infra/job/AbstractJobsConfiguration.java       |  78 ++++++++++++++
 ...ertyMap.java => JobConfigurationException.java} |   8 +-
 ...{PropertyMap.java => JobContextRepository.java} |   7 +-
 .../ambari/infra/job/JobContextRepositoryImpl.java |  52 ++++++++++
 .../org/apache/ambari/infra/job/JobProperties.java |  35 +++++++
 .../org/apache/ambari/infra/job/JobScheduler.java  |  89 ++++++++++++++++
 .../{JobPropertyMap.java => JobsPropertyMap.java}  |  12 ++-
 ...{PropertyMap.java => SchedulingProperties.java} |  30 +++++-
 .../archive/DocumentArchivingConfiguration.java    |  59 +++++------
 .../job/archive/DocumentArchivingProperties.java   |   1 +
 .../job/archive/DocumentArchivingPropertyMap.java  |   8 +-
 .../ambari/infra/job/archive/DocumentExporter.java |  22 +++-
 .../infra/job/archive/FileNameSuffixFormatter.java |   2 +-
 .../deleting/DocumentDeletingConfiguration.java    |  53 ++++------
 .../job/deleting/DocumentDeletingPropertyMap.java  |   8 +-
 .../apache/ambari/infra/manager/JobManager.java    |  64 +++++++-----
 .../java/org/apache/ambari/infra/manager/Jobs.java |  42 ++++++++
 .../org/apache/ambari/infra/rest/JobResource.java  |  28 +++--
 .../src/main/resources/infra-manager.properties    |   8 +-
 .../src/main/resources/log4j2.xml                  |   3 +
 .../apache/ambari/infra/job/JobSchedulerTest.java  | 114 +++++++++++++++++++++
 .../infra/job/archive/DocumentExporterTest.java    | 110 ++++++++++++++++----
 .../vagrant-infra-manager.properties.sample        |  30 +++---
 32 files changed, 863 insertions(+), 226 deletions(-)

diff --git a/ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/PropertyMap.java b/ambari-infra/ambari-infra-manager-it/src/test/java/org/apache/ambari/infra/HttpResponse.java
similarity index 72%
copy from ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/PropertyMap.java
copy to ambari-infra/ambari-infra-manager-it/src/test/java/org/apache/ambari/infra/HttpResponse.java
index e7b9e77..3d8711b 100644
--- a/ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/PropertyMap.java
+++ b/ambari-infra/ambari-infra-manager-it/src/test/java/org/apache/ambari/infra/HttpResponse.java
@@ -16,10 +16,22 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.ambari.infra.job;
+package org.apache.ambari.infra;
 
-import java.util.Map;
+public class HttpResponse {
+  private final int code;
+  private final String body;
 
-public interface PropertyMap<T extends JobProperties<T>> {
-  Map<String, T> getPropertyMap();
+  public HttpResponse(int code, String body) {
+    this.code = code;
+    this.body = body;
+  }
+
+  public int getCode() {
+    return code;
+  }
+
+  public String getBody() {
+    return body;
+  }
 }
diff --git a/ambari-infra/ambari-infra-manager-it/src/test/java/org/apache/ambari/infra/InfraClient.java b/ambari-infra/ambari-infra-manager-it/src/test/java/org/apache/ambari/infra/InfraClient.java
index b798ce1..0118c76 100644
--- a/ambari-infra/ambari-infra-manager-it/src/test/java/org/apache/ambari/infra/InfraClient.java
+++ b/ambari-infra/ambari-infra-manager-it/src/test/java/org/apache/ambari/infra/InfraClient.java
@@ -25,6 +25,7 @@ import com.fasterxml.jackson.databind.ObjectMapper;
 import org.apache.commons.io.IOUtils;
 import org.apache.http.client.ClientProtocolException;
 import org.apache.http.client.methods.CloseableHttpResponse;
+import org.apache.http.client.methods.HttpDelete;
 import org.apache.http.client.methods.HttpGet;
 import org.apache.http.client.methods.HttpPost;
 import org.apache.http.client.methods.HttpRequestBase;
@@ -71,11 +72,14 @@ public class InfraClient implements AutoCloseable {
     execute(new HttpGet(baseUrl));
   }
 
-  private String execute(HttpRequestBase post) {
+  private HttpResponse execute(HttpRequestBase post) {
     try (CloseableHttpResponse response = httpClient.execute(post)) {
       String responseBodyText = IOUtils.toString(response.getEntity().getContent(), Charset.defaultCharset());
-      LOG.info("Response code {} body {} ", response.getStatusLine().getStatusCode(), responseBodyText);
-      return responseBodyText;
+      int statusCode = response.getStatusLine().getStatusCode();
+      LOG.info("Response code {} body {} ", statusCode, responseBodyText);
+      if (!(200 <= statusCode && statusCode <= 299))
+        throw new RuntimeException("Error while executing http request: " + responseBodyText);
+      return new HttpResponse(statusCode, responseBodyText);
     } catch (ClientProtocolException e) {
       throw new RuntimeException(e);
     } catch (IOException e) {
@@ -83,16 +87,16 @@ public class InfraClient implements AutoCloseable {
     }
   }
 
-  public String startJob(String jobName, String parameters) {
+  public JobExecutionInfo startJob(String jobName, String parameters) {
     URIBuilder uriBuilder = new URIBuilder(baseUrl);
     uriBuilder.setScheme("http");
     uriBuilder.setPath(uriBuilder.getPath() + "/" + jobName);
     if (!isBlank(parameters))
       uriBuilder.addParameter("params", parameters);
     try {
-      String responseText = execute(new HttpPost(uriBuilder.build()));
+      String responseText = execute(new HttpPost(uriBuilder.build())).getBody();
       Map<String, Object> responseContent = new ObjectMapper().readValue(responseText, new TypeReference<HashMap<String,Object>>() {});
-      return responseContent.get("jobId").toString();
+      return new JobExecutionInfo(responseContent.get("jobId").toString(), ((Map)responseContent.get("jobExecutionData")).get("id").toString());
     } catch (URISyntaxException | JsonParseException | JsonMappingException e) {
       throw new RuntimeException(e);
     } catch (IOException e) {
@@ -106,7 +110,21 @@ public class InfraClient implements AutoCloseable {
     uriBuilder.setPath(String.format("%s/%s/%s/executions", uriBuilder.getPath(), jobName, jobId));
     uriBuilder.addParameter("operation", "RESTART");
     try {
-      execute(new HttpPost(uriBuilder.build()));
+      HttpResponse httpResponse = execute(new HttpPost(uriBuilder.build()));
+      if (httpResponse.getCode() != 200)
+        throw new RuntimeException(httpResponse.getBody());
+    } catch (URISyntaxException e) {
+      throw new RuntimeException(e);
+    }
+  }
+
+  public void stopJob(String jobExecutionId) {
+    URIBuilder uriBuilder = new URIBuilder(baseUrl);
+    uriBuilder.setScheme("http");
+    uriBuilder.setPath(String.format("%s/executions/%s", uriBuilder.getPath(), jobExecutionId));
+    uriBuilder.addParameter("operation", "STOP");
+    try {
+      execute(new HttpDelete(uriBuilder.build()));
     } catch (URISyntaxException e) {
       throw new RuntimeException(e);
     }
diff --git a/ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/PropertyMap.java b/ambari-infra/ambari-infra-manager-it/src/test/java/org/apache/ambari/infra/JobExecutionInfo.java
similarity index 59%
copy from ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/PropertyMap.java
copy to ambari-infra/ambari-infra-manager-it/src/test/java/org/apache/ambari/infra/JobExecutionInfo.java
index e7b9e77..92b7834 100644
--- a/ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/PropertyMap.java
+++ b/ambari-infra/ambari-infra-manager-it/src/test/java/org/apache/ambari/infra/JobExecutionInfo.java
@@ -16,10 +16,30 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.ambari.infra.job;
+package org.apache.ambari.infra;
 
-import java.util.Map;
+public class JobExecutionInfo {
+  private final String jobId;
+  private final String executionId;
 
-public interface PropertyMap<T extends JobProperties<T>> {
-  Map<String, T> getPropertyMap();
+  public JobExecutionInfo(String jobId, String executionId) {
+    this.jobId = jobId;
+    this.executionId = executionId;
+  }
+
+  public String getJobId() {
+    return jobId;
+  }
+
+  public String getExecutionId() {
+    return executionId;
+  }
+
+  @Override
+  public String toString() {
+    return "JobExecutionInfo{" +
+            "jobId='" + jobId + '\'' +
+            ", executionId='" + executionId + '\'' +
+            '}';
+  }
 }
diff --git a/ambari-infra/ambari-infra-manager-it/src/test/java/org/apache/ambari/infra/steps/AbstractInfraSteps.java b/ambari-infra/ambari-infra-manager-it/src/test/java/org/apache/ambari/infra/steps/AbstractInfraSteps.java
index ece1c59..fb8a100 100644
--- a/ambari-infra/ambari-infra-manager-it/src/test/java/org/apache/ambari/infra/steps/AbstractInfraSteps.java
+++ b/ambari-infra/ambari-infra-manager-it/src/test/java/org/apache/ambari/infra/steps/AbstractInfraSteps.java
@@ -64,6 +64,7 @@ public abstract class AbstractInfraSteps {
   private static final int FAKE_S3_PORT = 4569;
   private static final int HDFS_PORT = 9000;
   private static final String AUDIT_LOGS_COLLECTION = "audit_logs";
+  private static final String HADOOP_LOGS_COLLECTION = "hadoop_logs";
   protected static final String S3_BUCKET_NAME = "testbucket";
   private String ambariFolder;
   private String shellScriptLocation;
@@ -111,8 +112,8 @@ public abstract class AbstractInfraSteps {
             SOLR_PORT,
             AUDIT_LOGS_COLLECTION)).build();
 
-    LOG.info("Creating collection");
-    runCommand(new String[]{"docker", "exec", "docker_solr_1", "solr", "create_collection", "-c", AUDIT_LOGS_COLLECTION, "-d", "configsets/"+ AUDIT_LOGS_COLLECTION +"/conf", "-n", AUDIT_LOGS_COLLECTION + "_conf"});
+    createSolrCollection(AUDIT_LOGS_COLLECTION);
+    createSolrCollection(HADOOP_LOGS_COLLECTION);
 
     LOG.info("Initializing s3 client");
     s3client = new AmazonS3Client(new BasicAWSCredentials("remote-identity", "remote-credential"));
@@ -122,6 +123,11 @@ public abstract class AbstractInfraSteps {
     checkInfraManagerReachable();
   }
 
+  private void createSolrCollection(String collectionName) {
+    LOG.info("Creating collection");
+    runCommand(new String[]{"docker", "exec", "docker_solr_1", "solr", "create_collection", "-c", collectionName, "-d", "configsets/"+ collectionName +"/conf", "-n", collectionName + "_conf"});
+  }
+
   private void runCommand(String[] command) {
     try {
       LOG.info("Exec command: {}", StringUtils.join(command, " "));
@@ -146,7 +152,7 @@ public abstract class AbstractInfraSteps {
     });
   }
 
-  private void doWithin(int sec, String actionName, Runnable runnable) {
+  protected void doWithin(int sec, String actionName, Runnable runnable) {
     long start = currentTimeMillis();
     Exception exception;
     while (true) {
diff --git a/ambari-infra/ambari-infra-manager-it/src/test/java/org/apache/ambari/infra/steps/ExportJobsSteps.java b/ambari-infra/ambari-infra-manager-it/src/test/java/org/apache/ambari/infra/steps/ExportJobsSteps.java
index 7e54a31..d4224c6 100644
--- a/ambari-infra/ambari-infra-manager-it/src/test/java/org/apache/ambari/infra/steps/ExportJobsSteps.java
+++ b/ambari-infra/ambari-infra-manager-it/src/test/java/org/apache/ambari/infra/steps/ExportJobsSteps.java
@@ -23,6 +23,7 @@ import com.amazonaws.services.s3.model.ListObjectsRequest;
 import com.amazonaws.services.s3.model.ObjectListing;
 import com.amazonaws.services.s3.model.ObjectMetadata;
 import org.apache.ambari.infra.InfraClient;
+import org.apache.ambari.infra.JobExecutionInfo;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.LocatedFileStatus;
 import org.apache.hadoop.fs.Path;
@@ -56,7 +57,7 @@ import static org.junit.Assert.assertThat;
 public class ExportJobsSteps extends AbstractInfraSteps {
   private static final Logger LOG = LoggerFactory.getLogger(ExportJobsSteps.class);
 
-  private Map<String, String> launchedJobs = new HashMap<>();
+  private Map<String, JobExecutionInfo> launchedJobs = new HashMap<>();
 
   @Given("$count documents in solr")
   public void addDocuments(int count) throws Exception {
@@ -85,22 +86,39 @@ public class ExportJobsSteps extends AbstractInfraSteps {
 
   @When("start $jobName job")
   public void startJob(String jobName) throws Exception {
-    startJob(jobName, null);
+    startJob(jobName, null, 0);
   }
 
-  @When("start $jobName job with parameters $parameters")
-  public void startJob(String jobName, String parameters) throws Exception {
+  @When("start $jobName job with parameters $parameters after $waitSec seconds")
+  public void startJob(String jobName, String parameters, int waitSec) throws Exception {
+    Thread.sleep(waitSec * 1000);
     try (InfraClient httpClient = getInfraClient()) {
-      String jobId = httpClient.startJob(jobName, parameters);
-      LOG.info("Job {} started jobId: {}", jobName, jobId);
-      launchedJobs.put(jobName, jobId);
+      JobExecutionInfo jobExecutionInfo = httpClient.startJob(jobName, parameters);
+      LOG.info("Job {} started: {}", jobName, jobExecutionInfo);
+      launchedJobs.put(jobName, jobExecutionInfo);
     }
   }
 
-  @When("restart $jobName job")
-  public void restartJob(String jobName) throws Exception {
+  @When("restart $jobName job within $waitSec seconds")
+  public void restartJob(String jobName, int waitSec) {
+    doWithin(waitSec, "Restarting job " + jobName, () -> {
+      try (InfraClient httpClient = getInfraClient()) {
+        httpClient.restartJob(jobName, launchedJobs.get(jobName).getJobId());
+      } catch (Exception e) {
+        throw new RuntimeException(e);
+      }
+    });
+  }
+
+  @When("stop job $jobName after at least $count file exists in s3 with filename containing text $text within $waitSec seconds")
+  public void stopJob(String jobName, int count, String text, int waitSec) throws Exception {
+    AmazonS3Client s3Client = getS3client();
+    ListObjectsRequest listObjectsRequest = new ListObjectsRequest().withBucketName(S3_BUCKET_NAME);
+    doWithin(waitSec, "check uploaded files to s3", () -> s3Client.doesBucketExist(S3_BUCKET_NAME)
+            && fileCountOnS3(text, s3Client, listObjectsRequest) > count);
+
     try (InfraClient httpClient = getInfraClient()) {
-      httpClient.restartJob(jobName, launchedJobs.get(jobName));
+      httpClient.stopJob(launchedJobs.get(jobName).getExecutionId());
     }
   }
 
@@ -125,9 +143,25 @@ public class ExportJobsSteps extends AbstractInfraSteps {
     AmazonS3Client s3Client = getS3client();
     ListObjectsRequest listObjectsRequest = new ListObjectsRequest().withBucketName(S3_BUCKET_NAME);
     doWithin(waitSec, "check uploaded files to s3", () -> s3Client.doesBucketExist(S3_BUCKET_NAME)
-            && s3Client.listObjects(listObjectsRequest).getObjectSummaries().stream()
-            .filter(s3ObjectSummary -> s3ObjectSummary.getKey().contains(text))
-            .count() == count);
+            && fileCountOnS3(text, s3Client, listObjectsRequest) == count);
+  }
+
+  private long fileCountOnS3(String text, AmazonS3Client s3Client, ListObjectsRequest listObjectsRequest) {
+    return s3Client.listObjects(listObjectsRequest).getObjectSummaries().stream()
+    .filter(s3ObjectSummary -> s3ObjectSummary.getKey().contains(text))
+    .count();
+  }
+
+  @Then("Less than $count files exists on s3 server with filenames containing the text $text after $waitSec seconds")
+  public void checkLessThanFileExistsOnS3(long count, String text, int waitSec) {
+    AmazonS3Client s3Client = getS3client();
+    ListObjectsRequest listObjectsRequest = new ListObjectsRequest().withBucketName(S3_BUCKET_NAME);
+    doWithin(waitSec, "check uploaded files to s3", () -> s3Client.doesBucketExist(S3_BUCKET_NAME) && between(
+            fileCountOnS3(text, s3Client, listObjectsRequest), 1L, count - 1L));
+  }
+
+  private boolean between(long count, long from, long to) {
+    return from <= count && count <= to;
   }
 
   @Then("No file exists on s3 server with filenames containing the text $text")
@@ -184,9 +218,9 @@ public class ExportJobsSteps extends AbstractInfraSteps {
     }
   }
 
-  @Then("Check $count files exists on local filesystem with filenames containing the text $text in the folder $path")
-  public void checkNumberOfFilesOnLocalFilesystem(long count, String text, String path) {
-    File destinationDirectory = new File(getLocalDataFolder(), path);
+  @Then("Check $count files exists on local filesystem with filenames containing the text $text in the folder $path for job $jobName")
+  public void checkNumberOfFilesOnLocalFilesystem(long count, String text, String path, String jobName) {
+    File destinationDirectory = new File(getLocalDataFolder(), path.replace("${jobId}", launchedJobs.get(jobName).getJobId()));
     LOG.info("Destination directory path: {}", destinationDirectory.getAbsolutePath());
     doWithin(5, "Destination directory exists", destinationDirectory::exists);
 
diff --git a/ambari-infra/ambari-infra-manager-it/src/test/resources/stories/infra_api_tests.story b/ambari-infra/ambari-infra-manager-it/src/test/resources/stories/infra_api_tests.story
index d1eb4a4..122a634 100644
--- a/ambari-infra/ambari-infra-manager-it/src/test/resources/stories/infra_api_tests.story
+++ b/ambari-infra/ambari-infra-manager-it/src/test/resources/stories/infra_api_tests.story
@@ -8,7 +8,7 @@ Then Check filenames contains the text audit_logs on s3 server after 20 seconds
 Scenario: Exporting 10 documents using writeBlockSize=3 produces 4 files
 
 Given 10 documents in solr with logtime from 2010-10-09T05:00:00.000Z to 2010-10-09T20:00:00.000Z
-When start archive_audit_logs job with parameters writeBlockSize=3,start=2010-10-09T00:00:00.000Z,end=2010-10-11T00:00:00.000Z
+When start archive_audit_logs job with parameters writeBlockSize=3,start=2010-10-09T00:00:00.000Z,end=2010-10-11T00:00:00.000Z after 2 seconds
 Then Check 4 files exists on s3 server with filenames containing the text solr_archive_audit_logs_-_2010-10-09 after 20 seconds
 And solr does not contain documents between 2010-10-09T05:00:00.000Z and 2010-10-09T20:00:00.000Z after 5 seconds
 
@@ -16,7 +16,7 @@ And solr does not contain documents between 2010-10-09T05:00:00.000Z and 2010-10
 Scenario: Running archiving job with a bigger start value than end value exports and deletes 0 documents
 
 Given 10 documents in solr with logtime from 2010-01-01T05:00:00.000Z to 2010-01-04T05:00:00.000Z
-When start archive_audit_logs job with parameters writeBlockSize=3,start=2010-01-03T05:00:00.000Z,end=2010-01-02T05:00:00.000Z
+When start archive_audit_logs job with parameters writeBlockSize=3,start=2010-01-03T05:00:00.000Z,end=2010-01-02T05:00:00.000Z after 2 seconds
 Then No file exists on s3 server with filenames containing the text solr_archive_audit_logs_-_2010-01-0
 And solr contains 10 documents between 2010-01-01T05:00:00.000Z and 2010-01-04T05:00:00.000Z
 
@@ -25,11 +25,11 @@ Scenario: Archiving job fails when part of the data is exported. After resolving
 
 Given 200 documents in solr with logtime from 2011-10-09T05:00:00.000Z to 2011-10-09T20:00:00.000Z
 And a file on s3 with key solr_archive_audit_logs_-_2011-10-09T08-00-00.000Z.json.tar.gz
-When start archive_audit_logs job with parameters writeBlockSize=20,start=2010-11-09T00:00:00.000Z,end=2011-10-11T00:00:00.000Z
+When start archive_audit_logs job with parameters writeBlockSize=20,start=2010-11-09T00:00:00.000Z,end=2011-10-11T00:00:00.000Z after 2 seconds
 Then Check 3 files exists on s3 server with filenames containing the text solr_archive_audit_logs_-_2011-10-09 after 20 seconds
 And solr does not contain documents between 2011-10-09T05:00:00.000Z and 2011-10-09T07:59:59.999Z after 5 seconds
 When delete file with key solr_archive_audit_logs_-_2011-10-09T08-00-00.000Z.json.tar.gz from s3
-And restart archive_audit_logs job
+And restart archive_audit_logs job within 2 seconds
 Then Check 10 files exists on s3 server with filenames containing the text solr_archive_audit_logs_-_2011-10-09 after 20 seconds
 And solr does not contain documents between 2011-10-09T05:00:00.000Z and 2011-10-09T20:00:00.000Z after 5 seconds
 
@@ -37,14 +37,14 @@ And solr does not contain documents between 2011-10-09T05:00:00.000Z and 2011-10
 Scenario: After Deleting job deletes documents from solr no document found in the specified interval
 
 Given 10 documents in solr with logtime from 2012-10-09T05:00:00.000Z to 2012-10-09T20:00:00.000Z
-When start delete_audit_logs job with parameters start=2012-10-09T05:00:00.000Z,end=2012-10-09T20:00:00.000Z
+When start delete_audit_logs job with parameters start=2012-10-09T05:00:00.000Z,end=2012-10-09T20:00:00.000Z after 2 seconds
 Then solr does not contain documents between 2012-10-09T05:00:00.000Z and 2012-10-09T20:00:00.000Z after 5 seconds
 
 
 Scenario: Archiving documents to hdfs
 
 Given 1000 documents in solr with logtime from 2014-01-04T05:00:00.000Z to 2014-01-06T20:00:00.000Z
-When start archive_audit_logs job with parameters start=2014-01-04T05:00:00.000Z,end=2014-01-06T20:00:00.000Z,destination=HDFS
+When start archive_audit_logs job with parameters start=2014-01-04T05:00:00.000Z,end=2014-01-06T20:00:00.000Z,destination=HDFS after 2 seconds
 Then Check 7 files exists on hdfs with filenames containing the text audit_logs_-_2014-01-0 in the folder /test_audit_logs after 10 seconds
 And solr does not contain documents between 2014-01-04T05:00:00.000Z and 2014-01-06T20:00:00.000Z after 10 seconds
 
@@ -52,6 +52,16 @@ And solr does not contain documents between 2014-01-04T05:00:00.000Z and 2014-01
 Scenario: Archiving documents to local filesystem
 
 Given 200 documents in solr with logtime from 2014-02-04T05:00:00.000Z to 2014-02-06T20:00:00.000Z
-When start archive_audit_logs job with parameters start=2014-02-04T05:00:00.000Z,end=2014-02-06T20:00:00.000Z,destination=LOCAL,localDestinationDirectory=/root/archive
-Then Check 2 files exists on local filesystem with filenames containing the text audit_logs_-_2014-02-0 in the folder audit_logs_8_2014-02-06T20-00-00.000Z
+When start archive_audit_logs job with parameters start=2014-02-04T05:00:00.000Z,end=2014-02-06T20:00:00.000Z,destination=LOCAL,localDestinationDirectory=/root/archive after 2 seconds
+Then Check 2 files exists on local filesystem with filenames containing the text audit_logs_-_2014-02-0 in the folder audit_logs_${jobId}_2014-02-06T20-00-00.000Z for job archive_audit_logs
 And solr does not contain documents between 2014-02-04T05:00:00.000Z and 2014-02-06T20:00:00.000Z after 10 seconds
+
+
+Scenario: Launch Archiving job. Initiate stop and check that part of the data is archived. After restart all data must be extracted.
+
+Given 200 documents in solr with logtime from 2014-03-09T05:00:00.000Z to 2014-03-09T20:00:00.000Z
+When start archive_audit_logs job with parameters writeBlockSize=20,start=2014-03-09T05:00:00.000Z,end=2014-03-09T20:00:00.000Z after 2 seconds
+And stop job archive_audit_logs after at least 1 file exists in s3 with filename containing text solr_archive_audit_logs_-_2014-03-09 within 10 seconds
+Then Less than 10 files exists on s3 server with filenames containing the text solr_archive_audit_logs_-_2014-03-09 after 20 seconds
+When restart archive_audit_logs job within 10 seconds
+Then Check 10 files exists on s3 server with filenames containing the text solr_archive_audit_logs_-_2014-03-09 after 20 seconds
diff --git a/ambari-infra/ambari-infra-manager/docs/api/swagger.yaml b/ambari-infra/ambari-infra-manager/docs/api/swagger.yaml
index 824629f..6fad22d 100644
--- a/ambari-infra/ambari-infra-manager/docs/api/swagger.yaml
+++ b/ambari-infra/ambari-infra-manager/docs/api/swagger.yaml
@@ -65,7 +65,7 @@ paths:
       - "jobs"
       summary: "Get job and step details for job execution instance."
       description: ""
-      operationId: "getExectionInfo"
+      operationId: "getExecutionInfo"
       produces:
       - "application/json"
       parameters:
diff --git a/ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/PropertyMap.java b/ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/conf/InfraManagerSchedulingConfig.java
similarity index 65%
copy from ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/PropertyMap.java
copy to ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/conf/InfraManagerSchedulingConfig.java
index e7b9e77..bb495a2 100644
--- a/ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/PropertyMap.java
+++ b/ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/conf/InfraManagerSchedulingConfig.java
@@ -16,10 +16,17 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.ambari.infra.job;
+package org.apache.ambari.infra.conf;
 
-import java.util.Map;
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
+import org.springframework.scheduling.TaskScheduler;
+import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler;
 
-public interface PropertyMap<T extends JobProperties<T>> {
-  Map<String, T> getPropertyMap();
+@Configuration
+public class InfraManagerSchedulingConfig {
+  @Bean
+  public TaskScheduler taskScheduler() {
+    return new ThreadPoolTaskScheduler();
+  }
 }
diff --git a/ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/conf/batch/InfraManagerBatchConfig.java b/ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/conf/batch/InfraManagerBatchConfig.java
index b1169b4..706ed8b 100644
--- a/ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/conf/batch/InfraManagerBatchConfig.java
+++ b/ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/conf/batch/InfraManagerBatchConfig.java
@@ -30,6 +30,7 @@ import org.springframework.batch.core.configuration.JobRegistry;
 import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;
 import org.springframework.batch.core.configuration.support.JobRegistryBeanPostProcessor;
 import org.springframework.batch.core.explore.JobExplorer;
+import org.springframework.batch.core.explore.support.JobExplorerFactoryBean;
 import org.springframework.batch.core.launch.JobLauncher;
 import org.springframework.batch.core.launch.JobOperator;
 import org.springframework.batch.core.launch.support.SimpleJobLauncher;
@@ -56,7 +57,6 @@ import org.springframework.transaction.PlatformTransactionManager;
 
 import javax.inject.Inject;
 import javax.sql.DataSource;
-import java.net.MalformedURLException;
 
 @Configuration
 @EnableBatchProcessing
@@ -85,9 +85,6 @@ public class InfraManagerBatchConfig {
   @Inject
   private JobRegistry jobRegistry;
 
-  @Inject
-  private JobExplorer jobExplorer;
-
   @Bean
   public DataSource dataSource() {
     DriverManagerDataSource dataSource = new DriverManagerDataSource();
@@ -99,8 +96,7 @@ public class InfraManagerBatchConfig {
   }
 
   @Bean
-  public DataSourceInitializer dataSourceInitializer(DataSource dataSource)
-    throws MalformedURLException {
+  public DataSourceInitializer dataSourceInitializer() {
     ResourceDatabasePopulator databasePopulator = new ResourceDatabasePopulator();
     if (dropDatabaseOnStartup) {
       databasePopulator.addScript(dropRepositoryTables);
@@ -110,7 +106,7 @@ public class InfraManagerBatchConfig {
     databasePopulator.setContinueOnError(true);
 
     DataSourceInitializer initializer = new DataSourceInitializer();
-    initializer.setDataSource(dataSource);
+    initializer.setDataSource(dataSource());
     initializer.setDatabasePopulator(databasePopulator);
 
     return initializer;
@@ -125,14 +121,14 @@ public class InfraManagerBatchConfig {
   public JobRepository jobRepository() throws Exception {
     JobRepositoryFactoryBean factory = new JobRepositoryFactoryBean();
     factory.setDataSource(dataSource());
-    factory.setTransactionManager(getTransactionManager());
+    factory.setTransactionManager(transactionManager());
     factory.setSerializer(executionContextSerializer());
     factory.afterPropertiesSet();
     return factory.getObject();
   }
 
   @Bean
-  public PlatformTransactionManager getTransactionManager() {
+  public PlatformTransactionManager transactionManager() {
     return new ResourcelessTransactionManager();
   }
 
@@ -148,7 +144,7 @@ public class InfraManagerBatchConfig {
   @Bean
   public JobOperator jobOperator() throws Exception {
     SimpleJobOperator jobOperator = new SimpleJobOperator();
-    jobOperator.setJobExplorer(jobExplorer);
+    jobOperator.setJobExplorer(jobExplorer());
     jobOperator.setJobLauncher(jobLauncher());
     jobOperator.setJobRegistry(jobRegistry);
     jobOperator.setJobRepository(jobRepository());
@@ -156,6 +152,15 @@ public class InfraManagerBatchConfig {
   }
 
   @Bean
+  public JobExplorer jobExplorer() throws Exception {
+    JobExplorerFactoryBean factoryBean = new JobExplorerFactoryBean();
+    factoryBean.setSerializer(executionContextSerializer());
+    factoryBean.setDataSource(dataSource());
+    factoryBean.afterPropertiesSet();
+    return factoryBean.getObject();
+  }
+
+  @Bean
   public JobRegistryBeanPostProcessor jobRegistryBeanPostProcessor() {
     JobRegistryBeanPostProcessor jobRegistryBeanPostProcessor = new JobRegistryBeanPostProcessor();
     jobRegistryBeanPostProcessor.setJobRegistry(jobRegistry);
diff --git a/ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/AbstractJobsConfiguration.java b/ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/AbstractJobsConfiguration.java
new file mode 100644
index 0000000..a57d0e0
--- /dev/null
+++ b/ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/AbstractJobsConfiguration.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.ambari.infra.job;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.batch.core.Job;
+import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
+import org.springframework.batch.core.configuration.support.JobRegistryBeanPostProcessor;
+import org.springframework.batch.core.job.builder.JobBuilder;
+import org.springframework.boot.context.event.ApplicationReadyEvent;
+import org.springframework.context.event.EventListener;
+
+import javax.annotation.PostConstruct;
+import java.util.Map;
+
+public abstract class AbstractJobsConfiguration<T extends JobProperties<T>> {
+  private static final Logger LOG = LoggerFactory.getLogger(AbstractJobsConfiguration.class);
+
+  private final Map<String, T> propertyMap;
+  private final JobScheduler scheduler;
+  private final JobBuilderFactory jobs;
+  private final JobRegistryBeanPostProcessor jobRegistryBeanPostProcessor;
+
+  protected AbstractJobsConfiguration(Map<String, T> propertyMap, JobScheduler scheduler, JobBuilderFactory jobs, JobRegistryBeanPostProcessor jobRegistryBeanPostProcessor) {
+    this.propertyMap = propertyMap;
+    this.scheduler = scheduler;
+    this.jobs = jobs;
+    this.jobRegistryBeanPostProcessor = jobRegistryBeanPostProcessor;
+  }
+
+  @PostConstruct
+  public void registerJobs() {
+    if (propertyMap == null)
+      return;
+
+    for (String jobName : propertyMap.keySet())
+      propertyMap.get(jobName).validate(jobName);
+
+    propertyMap.keySet().stream()
+            .filter(key -> propertyMap.get(key).isEnabled())
+            .forEach(jobName -> {
+              LOG.info("Registering job {}", jobName);
+              JobBuilder jobBuilder = jobs.get(jobName).listener(new JobsPropertyMap<>(propertyMap));
+              Job job = buildJob(jobBuilder);
+              jobRegistryBeanPostProcessor.postProcessAfterInitialization(job, jobName);
+            });
+  }
+
+  @EventListener(ApplicationReadyEvent.class)
+  public void scheduleJobs() {
+    if (propertyMap == null)
+      return;
+
+    propertyMap.keySet().stream()
+            .filter(key -> propertyMap.get(key).isEnabled())
+            .forEach(jobName -> propertyMap.get(jobName).scheduling().ifPresent(
+                    schedulingProperties -> scheduler.schedule(jobName, schedulingProperties)));
+  }
+
+  protected abstract Job buildJob(JobBuilder jobBuilder);
+}
diff --git a/ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/PropertyMap.java b/ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/JobConfigurationException.java
similarity index 84%
copy from ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/PropertyMap.java
copy to ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/JobConfigurationException.java
index e7b9e77..8c16daa 100644
--- a/ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/PropertyMap.java
+++ b/ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/JobConfigurationException.java
@@ -18,8 +18,8 @@
  */
 package org.apache.ambari.infra.job;
 
-import java.util.Map;
-
-public interface PropertyMap<T extends JobProperties<T>> {
-  Map<String, T> getPropertyMap();
+public class JobConfigurationException extends RuntimeException {
+  public JobConfigurationException(String message, Exception ex) {
+    super(message, ex);
+  }
 }
diff --git a/ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/PropertyMap.java b/ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/JobContextRepository.java
similarity index 79%
copy from ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/PropertyMap.java
copy to ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/JobContextRepository.java
index e7b9e77..eb7f717 100644
--- a/ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/PropertyMap.java
+++ b/ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/JobContextRepository.java
@@ -18,8 +18,9 @@
  */
 package org.apache.ambari.infra.job;
 
-import java.util.Map;
+import org.springframework.batch.core.StepExecution;
 
-public interface PropertyMap<T extends JobProperties<T>> {
-  Map<String, T> getPropertyMap();
+public interface JobContextRepository {
+  StepExecution getStepExecution(Long jobExecutionId, Long id);
+  void updateExecutionContext(StepExecution stepExecution);
 }
diff --git a/ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/JobContextRepositoryImpl.java b/ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/JobContextRepositoryImpl.java
new file mode 100644
index 0000000..fbb256f
--- /dev/null
+++ b/ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/JobContextRepositoryImpl.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.ambari.infra.job;
+
+import org.springframework.batch.admin.service.JobService;
+import org.springframework.batch.admin.service.NoSuchStepExecutionException;
+import org.springframework.batch.core.StepExecution;
+import org.springframework.batch.core.launch.NoSuchJobExecutionException;
+import org.springframework.batch.core.repository.JobRepository;
+
+import javax.inject.Inject;
+import javax.inject.Named;
+
+@Named
+public class JobContextRepositoryImpl implements JobContextRepository {
+
+  @Inject
+  private JobRepository jobRepository;
+  @Inject
+  private JobService jobService;
+
+
+  @Override
+  public StepExecution getStepExecution(Long jobExecutionId, Long id) {
+    try {
+      return jobService.getStepExecution(jobExecutionId, id);
+    } catch (NoSuchStepExecutionException | NoSuchJobExecutionException e) {
+      throw new RuntimeException(e);
+    }
+  }
+
+  @Override
+  public void updateExecutionContext(StepExecution stepExecution) {
+    jobRepository.updateExecutionContext(stepExecution);
+  }
+}
diff --git a/ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/JobProperties.java b/ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/JobProperties.java
index 292e15e..53909ae 100644
--- a/ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/JobProperties.java
+++ b/ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/JobProperties.java
@@ -23,14 +23,32 @@ import org.springframework.batch.core.JobParameters;
 
 import java.io.IOException;
 import java.io.UncheckedIOException;
+import java.util.Optional;
 
 public abstract class JobProperties<T extends JobProperties<T>> {
+
+  private SchedulingProperties scheduling;
   private final Class<T> clazz;
+  private boolean enabled;
 
   protected JobProperties(Class<T> clazz) {
     this.clazz = clazz;
   }
 
+  public SchedulingProperties getScheduling() {
+    return scheduling;
+  }
+
+  public Optional<SchedulingProperties> scheduling() {
+    if (scheduling != null && scheduling.isEnabled())
+      return Optional.of(scheduling);
+    return Optional.empty();
+  }
+
+  public void setScheduling(SchedulingProperties scheduling) {
+    this.scheduling = scheduling;
+  }
+
   public T deepCopy() {
     try {
       ObjectMapper objectMapper = new ObjectMapper();
@@ -44,4 +62,21 @@ public abstract class JobProperties<T extends JobProperties<T>> {
   public abstract void apply(JobParameters jobParameters);
 
   public abstract void validate();
+
+  public void validate(String jobName) {
+    try {
+      validate();
+    }
+    catch (Exception ex) {
+      throw new JobConfigurationException(String.format("Configuration of job %s is invalid!", jobName), ex);
+    }
+  }
+
+  public boolean isEnabled() {
+    return enabled;
+  }
+
+  public void setEnabled(boolean enabled) {
+    this.enabled = enabled;
+  }
 }
diff --git a/ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/JobScheduler.java b/ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/JobScheduler.java
new file mode 100644
index 0000000..324c0b3
--- /dev/null
+++ b/ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/JobScheduler.java
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.ambari.infra.job;
+
+import org.apache.ambari.infra.manager.Jobs;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.batch.core.ExitStatus;
+import org.springframework.batch.core.JobExecution;
+import org.springframework.batch.core.JobParametersBuilder;
+import org.springframework.batch.core.JobParametersInvalidException;
+import org.springframework.batch.core.launch.NoSuchJobException;
+import org.springframework.batch.core.launch.NoSuchJobExecutionException;
+import org.springframework.batch.core.repository.JobExecutionAlreadyRunningException;
+import org.springframework.batch.core.repository.JobInstanceAlreadyCompleteException;
+import org.springframework.batch.core.repository.JobRestartException;
+import org.springframework.scheduling.TaskScheduler;
+import org.springframework.scheduling.support.CronTrigger;
+
+import javax.inject.Inject;
+import javax.inject.Named;
+import java.time.Duration;
+import java.time.OffsetDateTime;
+
+import static org.apache.ambari.infra.job.archive.FileNameSuffixFormatter.SOLR_DATETIME_FORMATTER;
+import static org.apache.commons.lang.StringUtils.isBlank;
+
+@Named
+public class JobScheduler {
+  private static final Logger LOG = LoggerFactory.getLogger(JobScheduler.class);
+
+  private final TaskScheduler scheduler;
+  private final Jobs jobs;
+
+  @Inject
+  public JobScheduler(TaskScheduler scheduler, Jobs jobs) {
+    this.scheduler = scheduler;
+    this.jobs = jobs;
+  }
+
+  public void schedule(String jobName, SchedulingProperties schedulingProperties) {
+    try {
+      jobs.lastRun(jobName).ifPresent(this::restartIfFailed);
+    } catch (NoSuchJobException | NoSuchJobExecutionException e) {
+      throw new RuntimeException(e);
+    }
+
+    scheduler.schedule(() -> launchJob(jobName, schedulingProperties.getIntervalEndDelta()), new CronTrigger(schedulingProperties.getCron()));
+    LOG.info("Job {} scheduled for running. Cron: {}", jobName, schedulingProperties.getCron());
+  }
+
+  private void restartIfFailed(JobExecution jobExecution) {
+    if (jobExecution.getExitStatus() == ExitStatus.FAILED) {
+      try {
+        jobs.restart(jobExecution.getId());
+      } catch (JobInstanceAlreadyCompleteException | NoSuchJobException | JobExecutionAlreadyRunningException | JobRestartException | JobParametersInvalidException | NoSuchJobExecutionException e) {
+        throw new RuntimeException(e);
+      }
+    }
+  }
+
+  private void launchJob(String jobName, String endDelta) {
+    try {
+      JobParametersBuilder jobParametersBuilder = new JobParametersBuilder();
+      if (!isBlank(endDelta))
+        jobParametersBuilder.addString("end", SOLR_DATETIME_FORMATTER.format(OffsetDateTime.now().minus(Duration.parse(endDelta))));
+
+      jobs.launchJob(jobName, jobParametersBuilder.toJobParameters());
+    } catch (JobParametersInvalidException | NoSuchJobException | JobExecutionAlreadyRunningException | JobRestartException | JobInstanceAlreadyCompleteException e) {
+      throw new RuntimeException(e);
+    }
+  }
+}
diff --git a/ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/JobPropertyMap.java b/ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/JobsPropertyMap.java
similarity index 85%
rename from ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/JobPropertyMap.java
rename to ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/JobsPropertyMap.java
index b5061f8..094e797 100644
--- a/ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/JobPropertyMap.java
+++ b/ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/JobsPropertyMap.java
@@ -22,11 +22,13 @@ import org.springframework.batch.core.ExitStatus;
 import org.springframework.batch.core.JobExecution;
 import org.springframework.batch.core.JobExecutionListener;
 
-public class JobPropertyMap<T extends JobProperties<T>> implements JobExecutionListener {
+import java.util.Map;
 
-  private final PropertyMap<T> propertyMap;
+public class JobsPropertyMap<T extends JobProperties<T>> implements JobExecutionListener {
 
-  public JobPropertyMap(PropertyMap<T> propertyMap) {
+  private final Map<String, T> propertyMap;
+
+  public JobsPropertyMap(Map<String, T> propertyMap) {
     this.propertyMap = propertyMap;
   }
 
@@ -34,13 +36,13 @@ public class JobPropertyMap<T extends JobProperties<T>> implements JobExecutionL
   public void beforeJob(JobExecution jobExecution) {
     try {
       String jobName = jobExecution.getJobInstance().getJobName();
-      T defaultProperties = propertyMap.getPropertyMap().get(jobName);
+      T defaultProperties = propertyMap.get(jobName);
       if (defaultProperties == null)
         throw new UnsupportedOperationException("Properties not found for job " + jobName);
 
       T properties = defaultProperties.deepCopy();
       properties.apply(jobExecution.getJobParameters());
-      properties.validate();
+      properties.validate(jobName);
       jobExecution.getExecutionContext().put("jobProperties", properties);
     }
     catch (UnsupportedOperationException | IllegalArgumentException ex) {
diff --git a/ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/PropertyMap.java b/ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/SchedulingProperties.java
similarity index 60%
rename from ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/PropertyMap.java
rename to ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/SchedulingProperties.java
index e7b9e77..af81b4f 100644
--- a/ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/PropertyMap.java
+++ b/ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/SchedulingProperties.java
@@ -18,8 +18,32 @@
  */
 package org.apache.ambari.infra.job;
 
-import java.util.Map;
+public class SchedulingProperties {
+  private boolean enabled = false;
+  private String cron;
+  private String intervalEndDelta;
 
-public interface PropertyMap<T extends JobProperties<T>> {
-  Map<String, T> getPropertyMap();
+  public boolean isEnabled() {
+    return enabled;
+  }
+
+  public void setEnabled(boolean enabled) {
+    this.enabled = enabled;
+  }
+
+  public String getCron() {
+    return cron;
+  }
+
+  public void setCron(String cron) {
+    this.cron = cron;
+  }
+
+  public String getIntervalEndDelta() {
+    return intervalEndDelta;
+  }
+
+  public void setIntervalEndDelta(String intervalEndDelta) {
+    this.intervalEndDelta = intervalEndDelta;
+  }
 }
diff --git a/ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/archive/DocumentArchivingConfiguration.java b/ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/archive/DocumentArchivingConfiguration.java
index 837b9c4..89f94bd 100644
--- a/ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/archive/DocumentArchivingConfiguration.java
+++ b/ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/archive/DocumentArchivingConfiguration.java
@@ -19,7 +19,9 @@
 package org.apache.ambari.infra.job.archive;
 
 import org.apache.ambari.infra.conf.InfraManagerDataConfig;
-import org.apache.ambari.infra.job.JobPropertyMap;
+import org.apache.ambari.infra.job.AbstractJobsConfiguration;
+import org.apache.ambari.infra.job.JobContextRepository;
+import org.apache.ambari.infra.job.JobScheduler;
 import org.apache.ambari.infra.job.ObjectSource;
 import org.apache.hadoop.fs.Path;
 import org.slf4j.Logger;
@@ -31,55 +33,41 @@ import org.springframework.batch.core.configuration.annotation.JobScope;
 import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
 import org.springframework.batch.core.configuration.annotation.StepScope;
 import org.springframework.batch.core.configuration.support.JobRegistryBeanPostProcessor;
+import org.springframework.batch.core.job.builder.JobBuilder;
 import org.springframework.beans.factory.annotation.Qualifier;
 import org.springframework.beans.factory.annotation.Value;
 import org.springframework.context.annotation.Bean;
 import org.springframework.context.annotation.Configuration;
 
-import javax.annotation.PostConstruct;
 import javax.inject.Inject;
 import java.io.File;
 
 import static org.apache.commons.lang.StringUtils.isBlank;
 
 @Configuration
-public class DocumentArchivingConfiguration {
+public class DocumentArchivingConfiguration extends AbstractJobsConfiguration<DocumentArchivingProperties> {
   private static final Logger LOG = LoggerFactory.getLogger(DocumentArchivingConfiguration.class);
   private static final DocumentWiper NOT_DELETE = (firstDocument, lastDocument) -> { };
 
-  @Inject
-  private DocumentArchivingPropertyMap propertyMap;
-
-  @Inject
-  private StepBuilderFactory steps;
+  private final StepBuilderFactory steps;
+  private final Step exportStep;
 
   @Inject
-  private JobBuilderFactory jobs;
-
-  @Inject
-  @Qualifier("exportStep")
-  private Step exportStep;
-
-  @Inject
-  private JobRegistryBeanPostProcessor jobRegistryBeanPostProcessor;
-
-
-  @PostConstruct
-  public void createJobs() {
-    if (propertyMap == null || propertyMap.getSolrDataArchiving() == null)
-      return;
-
-    propertyMap.getSolrDataArchiving().values().forEach(DocumentArchivingProperties::validate);
-
-    propertyMap.getSolrDataArchiving().keySet().forEach(jobName -> {
-      LOG.info("Registering data archiving job {}", jobName);
-      Job job = logExportJob(jobName, exportStep);
-      jobRegistryBeanPostProcessor.postProcessAfterInitialization(job, jobName);
-    });
+  public DocumentArchivingConfiguration(
+          DocumentArchivingPropertyMap jobsPropertyMap,
+          JobScheduler scheduler,
+          StepBuilderFactory steps,
+          JobBuilderFactory jobs,
+          @Qualifier("exportStep") Step exportStep,
+          JobRegistryBeanPostProcessor jobRegistryBeanPostProcessor) {
+    super(jobsPropertyMap.getSolrDataArchiving(), scheduler, jobs, jobRegistryBeanPostProcessor);
+    this.exportStep = exportStep;
+    this.steps = steps;
   }
 
-  private Job logExportJob(String jobName, Step logExportStep) {
-    return jobs.get(jobName).listener(new JobPropertyMap<>(propertyMap)).start(logExportStep).build();
+  @Override
+  protected Job buildJob(JobBuilder jobBuilder) {
+    return jobBuilder.start(exportStep).build();
   }
 
   @Bean
@@ -93,11 +81,12 @@ public class DocumentArchivingConfiguration {
   @Bean
   @StepScope
   public DocumentExporter documentExporter(DocumentItemReader documentItemReader,
-                                           @Value("#{stepExecution.jobExecution.id}") String jobId,
+                                           @Value("#{stepExecution.jobExecution.jobId}") String jobId,
                                            @Value("#{stepExecution.jobExecution.executionContext.get('jobProperties')}") DocumentArchivingProperties properties,
                                            InfraManagerDataConfig infraManagerDataConfig,
                                            @Value("#{jobParameters[end]}") String intervalEnd,
-                                           DocumentWiper documentWiper) {
+                                           DocumentWiper documentWiper,
+                                           JobContextRepository jobContextRepository) {
 
     File baseDir = new File(infraManagerDataConfig.getDataFolder(), "exporting");
     CompositeFileAction fileAction = new CompositeFileAction(new TarGzCompressor());
@@ -134,7 +123,7 @@ public class DocumentArchivingConfiguration {
             documentItemReader,
             firstDocument -> new LocalDocumentItemWriter(
                     outFile(properties.getSolr().getCollection(), destinationDirectory, fileNameSuffixFormatter.format(firstDocument)), itemWriterListener),
-            properties.getWriteBlockSize());
+            properties.getWriteBlockSize(), jobContextRepository);
   }
 
   @Bean
diff --git a/ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/archive/DocumentArchivingProperties.java b/ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/archive/DocumentArchivingProperties.java
index b90402a..f8fa33b 100644
--- a/ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/archive/DocumentArchivingProperties.java
+++ b/ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/archive/DocumentArchivingProperties.java
@@ -259,6 +259,7 @@ public class DocumentArchivingProperties extends JobProperties<DocumentArchiving
                   "The property hdfsDestinationDirectory can not be null or empty string when destination is set to %s!", HDFS.name()));
     }
 
+    requireNonNull(solr, "No solr query was specified for archiving job!");
     solr.validate();
   }
 }
diff --git a/ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/archive/DocumentArchivingPropertyMap.java b/ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/archive/DocumentArchivingPropertyMap.java
index 7d1a738..a009031 100644
--- a/ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/archive/DocumentArchivingPropertyMap.java
+++ b/ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/archive/DocumentArchivingPropertyMap.java
@@ -18,7 +18,6 @@
  */
 package org.apache.ambari.infra.job.archive;
 
-import org.apache.ambari.infra.job.PropertyMap;
 import org.springframework.boot.context.properties.ConfigurationProperties;
 import org.springframework.context.annotation.Configuration;
 
@@ -26,7 +25,7 @@ import java.util.Map;
 
 @Configuration
 @ConfigurationProperties(prefix = "infra-manager.jobs")
-public class DocumentArchivingPropertyMap implements PropertyMap<DocumentArchivingProperties> {
+public class DocumentArchivingPropertyMap {
   private Map<String, DocumentArchivingProperties> solrDataArchiving;
 
   public Map<String, DocumentArchivingProperties> getSolrDataArchiving() {
@@ -36,9 +35,4 @@ public class DocumentArchivingPropertyMap implements PropertyMap<DocumentArchivi
   public void setSolrDataArchiving(Map<String, DocumentArchivingProperties> solrDataArchiving) {
     this.solrDataArchiving = solrDataArchiving;
   }
-
-  @Override
-  public Map<String, DocumentArchivingProperties> getPropertyMap() {
-    return getSolrDataArchiving();
-  }
 }
diff --git a/ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/archive/DocumentExporter.java b/ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/archive/DocumentExporter.java
index 6106c20..d87fdea 100644
--- a/ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/archive/DocumentExporter.java
+++ b/ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/archive/DocumentExporter.java
@@ -18,6 +18,10 @@
  */
 package org.apache.ambari.infra.job.archive;
 
+import org.apache.ambari.infra.job.JobContextRepository;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.batch.core.BatchStatus;
 import org.springframework.batch.core.ExitStatus;
 import org.springframework.batch.core.StepContribution;
 import org.springframework.batch.core.StepExecution;
@@ -30,15 +34,19 @@ import org.springframework.batch.repeat.RepeatStatus;
 
 public class DocumentExporter implements Tasklet, StepExecutionListener {
 
+  private static final Logger LOG = LoggerFactory.getLogger(DocumentExporter.class);
+
   private boolean complete = false;
   private final ItemStreamReader<Document> documentReader;
   private final DocumentDestination documentDestination;
   private final int writeBlockSize;
+  private final JobContextRepository jobContextRepository;
 
-  public DocumentExporter(ItemStreamReader<Document> documentReader, DocumentDestination documentDestination, int writeBlockSize) {
+  public DocumentExporter(ItemStreamReader<Document> documentReader, DocumentDestination documentDestination, int writeBlockSize, JobContextRepository jobContextRepository) {
     this.documentReader = documentReader;
     this.documentDestination = documentDestination;
     this.writeBlockSize = writeBlockSize;
+    this.jobContextRepository = jobContextRepository;
   }
 
   @Override
@@ -58,7 +66,8 @@ public class DocumentExporter implements Tasklet, StepExecutionListener {
 
   @Override
   public RepeatStatus execute(StepContribution contribution, ChunkContext chunkContext) throws Exception {
-    ExecutionContext executionContext = chunkContext.getStepContext().getStepExecution().getExecutionContext();
+    StepExecution stepExecution = chunkContext.getStepContext().getStepExecution();
+    ExecutionContext executionContext = stepExecution.getExecutionContext();
     documentReader.open(executionContext);
 
     DocumentItemWriter writer = null;
@@ -67,10 +76,19 @@ public class DocumentExporter implements Tasklet, StepExecutionListener {
       Document document;
       while ((document = documentReader.read()) != null) {
         if (writer != null && writtenCount >= writeBlockSize) {
+          stepExecution = jobContextRepository.getStepExecution(stepExecution.getJobExecutionId(), stepExecution.getId());
+          if (stepExecution.getJobExecution().getStatus() == BatchStatus.STOPPING) {
+            LOG.info("Received stop signal.");
+            writer.revert();
+            writer = null;
+            return RepeatStatus.CONTINUABLE;
+          }
+
           writer.close();
           writer = null;
           writtenCount = 0;
           documentReader.update(executionContext);
+          jobContextRepository.updateExecutionContext(stepExecution);
         }
 
         if (writer == null)
diff --git a/ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/archive/FileNameSuffixFormatter.java b/ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/archive/FileNameSuffixFormatter.java
index 85ec00b..f9016e6 100644
--- a/ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/archive/FileNameSuffixFormatter.java
+++ b/ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/archive/FileNameSuffixFormatter.java
@@ -26,7 +26,7 @@ import static org.apache.ambari.infra.job.archive.SolrDocumentIterator.SOLR_DATE
 import static org.apache.commons.lang.StringUtils.isBlank;
 
 public class FileNameSuffixFormatter {
-  private static final DateTimeFormatter SOLR_DATETIME_FORMATTER = DateTimeFormatter.ofPattern(SOLR_DATE_FORMAT_TEXT);
+  public static final DateTimeFormatter SOLR_DATETIME_FORMATTER = DateTimeFormatter.ofPattern(SOLR_DATE_FORMAT_TEXT);
 
   public static FileNameSuffixFormatter from(DocumentArchivingProperties properties) {
     return new FileNameSuffixFormatter(properties.getFileNameSuffixColumn(), properties.getFileNameSuffixDateFormat());
diff --git a/ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/deleting/DocumentDeletingConfiguration.java b/ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/deleting/DocumentDeletingConfiguration.java
index ce8970d..4a68c49 100644
--- a/ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/deleting/DocumentDeletingConfiguration.java
+++ b/ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/deleting/DocumentDeletingConfiguration.java
@@ -18,9 +18,8 @@
  */
 package org.apache.ambari.infra.job.deleting;
 
-import org.apache.ambari.infra.job.JobPropertyMap;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import org.apache.ambari.infra.job.AbstractJobsConfiguration;
+import org.apache.ambari.infra.job.JobScheduler;
 import org.springframework.batch.core.Job;
 import org.springframework.batch.core.Step;
 import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
@@ -28,50 +27,36 @@ import org.springframework.batch.core.configuration.annotation.JobScope;
 import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
 import org.springframework.batch.core.configuration.annotation.StepScope;
 import org.springframework.batch.core.configuration.support.JobRegistryBeanPostProcessor;
+import org.springframework.batch.core.job.builder.JobBuilder;
 import org.springframework.beans.factory.annotation.Qualifier;
 import org.springframework.beans.factory.annotation.Value;
 import org.springframework.context.annotation.Bean;
 import org.springframework.context.annotation.Configuration;
 
-import javax.annotation.PostConstruct;
 import javax.inject.Inject;
 
 @Configuration
-public class DocumentDeletingConfiguration {
-  private static final Logger LOG = LoggerFactory.getLogger(DocumentDeletingConfiguration.class);
+public class DocumentDeletingConfiguration extends AbstractJobsConfiguration<DocumentDeletingProperties> {
 
-  @Inject
-  private DocumentDeletingPropertyMap propertyMap;
-
-  @Inject
-  private StepBuilderFactory steps;
-
-  @Inject
-  private JobBuilderFactory jobs;
+  private final StepBuilderFactory steps;
+  private final Step deleteStep;
 
   @Inject
-  private JobRegistryBeanPostProcessor jobRegistryBeanPostProcessor;
-
-  @Inject
-  @Qualifier("deleteStep")
-  private Step deleteStep;
-
-  @PostConstruct
-  public void createJobs() {
-    if (propertyMap == null || propertyMap.getSolrDataDeleting() == null)
-      return;
-
-    propertyMap.getSolrDataDeleting().values().forEach(DocumentDeletingProperties::validate);
-
-    propertyMap.getSolrDataDeleting().keySet().forEach(jobName -> {
-      LOG.info("Registering data deleting job {}", jobName);
-      Job job = logDeleteJob(jobName, deleteStep);
-      jobRegistryBeanPostProcessor.postProcessAfterInitialization(job, jobName);
-    });
+  public DocumentDeletingConfiguration(
+          DocumentDeletingPropertyMap documentDeletingPropertyMap,
+          JobScheduler scheduler,
+          StepBuilderFactory steps,
+          JobBuilderFactory jobs,
+          JobRegistryBeanPostProcessor jobRegistryBeanPostProcessor,
+          @Qualifier("deleteStep") Step deleteStep) {
+    super(documentDeletingPropertyMap.getSolrDataDeleting(), scheduler, jobs, jobRegistryBeanPostProcessor);
+    this.steps = steps;
+    this.deleteStep = deleteStep;
   }
 
-  private Job logDeleteJob(String jobName, Step logExportStep) {
-    return jobs.get(jobName).listener(new JobPropertyMap<>(propertyMap)).start(logExportStep).build();
+  @Override
+  protected Job buildJob(JobBuilder jobBuilder) {
+    return jobBuilder.start(deleteStep).build();
   }
 
   @Bean
diff --git a/ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/deleting/DocumentDeletingPropertyMap.java b/ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/deleting/DocumentDeletingPropertyMap.java
index fccfd59..1dc0caf 100644
--- a/ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/deleting/DocumentDeletingPropertyMap.java
+++ b/ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/deleting/DocumentDeletingPropertyMap.java
@@ -18,7 +18,6 @@
  */
 package org.apache.ambari.infra.job.deleting;
 
-import org.apache.ambari.infra.job.PropertyMap;
 import org.springframework.boot.context.properties.ConfigurationProperties;
 import org.springframework.context.annotation.Configuration;
 
@@ -26,7 +25,7 @@ import java.util.Map;
 
 @Configuration
 @ConfigurationProperties(prefix = "infra-manager.jobs")
-public class DocumentDeletingPropertyMap implements PropertyMap<DocumentDeletingProperties> {
+public class DocumentDeletingPropertyMap {
   private Map<String, DocumentDeletingProperties> solrDataDeleting;
 
   public Map<String, DocumentDeletingProperties> getSolrDataDeleting() {
@@ -36,9 +35,4 @@ public class DocumentDeletingPropertyMap implements PropertyMap<DocumentDeleting
   public void setSolrDataDeleting(Map<String, DocumentDeletingProperties> solrDataDeleting) {
     this.solrDataDeleting = solrDataDeleting;
   }
-
-  @Override
-  public Map<String, DocumentDeletingProperties> getPropertyMap() {
-    return getSolrDataDeleting();
-  }
 }
diff --git a/ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/manager/JobManager.java b/ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/manager/JobManager.java
index 862119a..f35387d 100644
--- a/ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/manager/JobManager.java
+++ b/ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/manager/JobManager.java
@@ -18,7 +18,6 @@
  */
 package org.apache.ambari.infra.manager;
 
-import com.google.common.base.Splitter;
 import com.google.common.collect.Lists;
 import org.apache.ambari.infra.model.ExecutionContextResponse;
 import org.apache.ambari.infra.model.JobDetailsResponse;
@@ -36,9 +35,13 @@ import org.springframework.batch.admin.service.JobService;
 import org.springframework.batch.admin.service.NoSuchStepExecutionException;
 import org.springframework.batch.admin.web.JobInfo;
 import org.springframework.batch.admin.web.StepExecutionProgress;
-import org.springframework.batch.core.*;
+import org.springframework.batch.core.JobExecution;
+import org.springframework.batch.core.JobInstance;
+import org.springframework.batch.core.JobParameters;
+import org.springframework.batch.core.JobParametersInvalidException;
+import org.springframework.batch.core.StepExecution;
+import org.springframework.batch.core.explore.JobExplorer;
 import org.springframework.batch.core.launch.JobExecutionNotRunningException;
-import org.springframework.batch.core.launch.JobInstanceAlreadyExistsException;
 import org.springframework.batch.core.launch.JobOperator;
 import org.springframework.batch.core.launch.NoSuchJobException;
 import org.springframework.batch.core.launch.NoSuchJobExecutionException;
@@ -51,16 +54,16 @@ import javax.inject.Inject;
 import javax.inject.Named;
 import java.util.ArrayList;
 import java.util.Collection;
-import java.util.Collections;
 import java.util.Comparator;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Optional;
 import java.util.Set;
 import java.util.TimeZone;
 
 @Named
-public class JobManager {
+public class JobManager implements Jobs {
 
   private static final Logger LOG = LoggerFactory.getLogger(JobManager.class);
 
@@ -70,6 +73,9 @@ public class JobManager {
   @Inject
   private JobOperator jobOperator;
 
+  @Inject
+  private JobExplorer jobExplorer;
+
   private TimeZone timeZone = TimeZone.getDefault();
 
   public Set<String> getAllJobNames() {
@@ -80,18 +86,28 @@ public class JobManager {
    * Launch a new job instance (based on job name) and applies customized parameters to it.
    * Also add a new date parameter to make sure the job instance will be unique
    */
-  public JobExecutionInfoResponse launchJob(String jobName, String params)
-    throws JobParametersInvalidException, JobInstanceAlreadyExistsException, NoSuchJobException,
+  @Override
+  public JobExecutionInfoResponse launchJob(String jobName, JobParameters jobParameters)
+    throws JobParametersInvalidException, NoSuchJobException,
     JobExecutionAlreadyRunningException, JobRestartException, JobInstanceAlreadyCompleteException {
-    JobParametersBuilder jobParametersBuilder = new JobParametersBuilder();
-    if (params != null) {
-      LOG.info("Parsing parameters of job {} '{}'", jobName, params);
-      Splitter.on(',')
-              .trimResults()
-              .withKeyValueSeparator(Splitter.on('=').limit(2).trimResults())
-              .split(params).entrySet().forEach(entry -> jobParametersBuilder.addString(entry.getKey(), entry.getValue()));
-    }
-    return new JobExecutionInfoResponse(jobService.launch(jobName, jobParametersBuilder.toJobParameters()), timeZone);
+
+    Set<JobExecution> running = jobExplorer.findRunningJobExecutions(jobName);
+    if (!running.isEmpty())
+      throw new JobExecutionAlreadyRunningException("An instance of this job is already active: "+jobName);
+
+    return new JobExecutionInfoResponse(jobService.launch(jobName, jobParameters), timeZone);
+  }
+
+  @Override
+  public void restart(Long jobExecutionId)
+          throws JobInstanceAlreadyCompleteException, NoSuchJobException, JobExecutionAlreadyRunningException,
+          JobParametersInvalidException, JobRestartException, NoSuchJobExecutionException {
+    jobService.restart(jobExecutionId);
+  }
+
+  @Override
+  public Optional<JobExecution> lastRun(String jobName) throws NoSuchJobException {
+    return jobService.listJobExecutionsForJob(jobName, 0, 1).stream().findFirst();
   }
 
   /**
@@ -111,19 +127,14 @@ public class JobManager {
   /**
    * Gather job execution details by job execution id.
    */
-  public JobExecutionDetailsResponse getExectionInfo(Long jobExecutionId) throws NoSuchJobExecutionException {
+  public JobExecutionDetailsResponse getExecutionInfo(Long jobExecutionId) throws NoSuchJobExecutionException {
     JobExecution jobExecution = jobService.getJobExecution(jobExecutionId);
-    List<StepExecutionInfoResponse> stepExecutionInfos = new ArrayList<StepExecutionInfoResponse>();
+    List<StepExecutionInfoResponse> stepExecutionInfoList = new ArrayList<>();
     for (StepExecution stepExecution : jobExecution.getStepExecutions()) {
-      stepExecutionInfos.add(new StepExecutionInfoResponse(stepExecution, timeZone));
+      stepExecutionInfoList.add(new StepExecutionInfoResponse(stepExecution, timeZone));
     }
-    Collections.sort(stepExecutionInfos, new Comparator<StepExecutionInfoResponse>() {
-      @Override
-      public int compare(StepExecutionInfoResponse o1, StepExecutionInfoResponse o2) {
-        return o1.getId().compareTo(o2.getId());
-      }
-    });
-    return new JobExecutionDetailsResponse(new JobExecutionInfoResponse(jobExecution, timeZone), stepExecutionInfos);
+    stepExecutionInfoList.sort(Comparator.comparing(StepExecutionInfoResponse::getId));
+    return new JobExecutionDetailsResponse(new JobExecutionInfoResponse(jobExecution, timeZone), stepExecutionInfoList);
   }
 
   /**
@@ -139,6 +150,7 @@ public class JobManager {
     } else {
       throw new UnsupportedOperationException("Unsupported operaration");
     }
+    LOG.info("Job {} was marked {}", jobExecution.getJobInstance().getJobName(), operation.name());
     return new JobExecutionInfoResponse(jobExecution, timeZone);
   }
 
diff --git a/ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/manager/Jobs.java b/ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/manager/Jobs.java
new file mode 100644
index 0000000..b2ca605
--- /dev/null
+++ b/ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/manager/Jobs.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.ambari.infra.manager;
+
+import org.apache.ambari.infra.model.JobExecutionInfoResponse;
+import org.springframework.batch.core.JobExecution;
+import org.springframework.batch.core.JobParameters;
+import org.springframework.batch.core.JobParametersInvalidException;
+import org.springframework.batch.core.launch.NoSuchJobException;
+import org.springframework.batch.core.launch.NoSuchJobExecutionException;
+import org.springframework.batch.core.repository.JobExecutionAlreadyRunningException;
+import org.springframework.batch.core.repository.JobInstanceAlreadyCompleteException;
+import org.springframework.batch.core.repository.JobRestartException;
+
+import java.util.Optional;
+
+public interface Jobs {
+  JobExecutionInfoResponse launchJob(String jobName, JobParameters params)
+          throws JobParametersInvalidException, NoSuchJobException,
+          JobExecutionAlreadyRunningException, JobRestartException, JobInstanceAlreadyCompleteException;
+  void restart(Long jobExecutionId)
+          throws JobInstanceAlreadyCompleteException, NoSuchJobException, JobExecutionAlreadyRunningException,
+          JobParametersInvalidException, JobRestartException, NoSuchJobExecutionException;
+
+  Optional<JobExecution> lastRun(String jobName) throws NoSuchJobException, NoSuchJobExecutionException;
+}
diff --git a/ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/rest/JobResource.java b/ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/rest/JobResource.java
index 0e20b54..502057e 100644
--- a/ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/rest/JobResource.java
+++ b/ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/rest/JobResource.java
@@ -18,6 +18,7 @@
  */
 package org.apache.ambari.infra.rest;
 
+import com.google.common.base.Splitter;
 import io.swagger.annotations.Api;
 import io.swagger.annotations.ApiOperation;
 import org.apache.ambari.infra.manager.JobManager;
@@ -35,11 +36,13 @@ import org.apache.ambari.infra.model.StepExecutionContextResponse;
 import org.apache.ambari.infra.model.StepExecutionInfoResponse;
 import org.apache.ambari.infra.model.StepExecutionProgressResponse;
 import org.apache.ambari.infra.model.StepExecutionRequest;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 import org.springframework.batch.admin.service.NoSuchStepExecutionException;
 import org.springframework.batch.admin.web.JobInfo;
+import org.springframework.batch.core.JobParametersBuilder;
 import org.springframework.batch.core.JobParametersInvalidException;
 import org.springframework.batch.core.launch.JobExecutionNotRunningException;
-import org.springframework.batch.core.launch.JobInstanceAlreadyExistsException;
 import org.springframework.batch.core.launch.NoSuchJobException;
 import org.springframework.batch.core.launch.NoSuchJobExecutionException;
 import org.springframework.batch.core.launch.NoSuchJobInstanceException;
@@ -67,6 +70,7 @@ import java.util.Set;
 @Named
 @Scope("request")
 public class JobResource {
+  private static final Logger LOG = LoggerFactory.getLogger(JobResource.class);
 
   @Inject
   private JobManager jobManager;
@@ -83,9 +87,21 @@ public class JobResource {
   @Path("{jobName}")
   @ApiOperation("Start a new job instance by job name.")
   public JobExecutionInfoResponse startJob(@BeanParam @Valid JobInstanceStartRequest request)
-    throws JobParametersInvalidException, JobInstanceAlreadyExistsException, NoSuchJobException, JobExecutionAlreadyRunningException,
+    throws JobParametersInvalidException, NoSuchJobException, JobExecutionAlreadyRunningException,
     JobRestartException, JobInstanceAlreadyCompleteException {
-    return jobManager.launchJob(request.getJobName(), request.getParams());
+
+    String jobName = request.getJobName();
+    String params = request.getParams();
+    JobParametersBuilder jobParametersBuilder = new JobParametersBuilder();
+    if (params != null) {
+      LOG.info("Parsing parameters of job {} '{}'", jobName, params);
+      Splitter.on(',')
+              .trimResults()
+              .withKeyValueSeparator(Splitter.on('=').limit(2).trimResults())
+              .split(params).forEach(jobParametersBuilder::addString);
+    }
+
+    return jobManager.launchJob(jobName, jobParametersBuilder.toJobParameters());
   }
 
   @GET
@@ -117,7 +133,7 @@ public class JobResource {
   @Path("/executions/{jobExecutionId}")
   @ApiOperation("Get job and step details for job execution instance.")
   public JobExecutionDetailsResponse getExectionInfo(@PathParam("jobExecutionId") @Valid Long jobExecutionId) throws NoSuchJobExecutionException {
-    return jobManager.getExectionInfo(jobExecutionId);
+    return jobManager.getExecutionInfo(jobExecutionId);
   }
 
   @GET
@@ -150,8 +166,8 @@ public class JobResource {
   @Produces({"application/json"})
   @Path("/{jobName}/{jobInstanceId}/executions")
   @ApiOperation("Get execution for job instance.")
-  public List<JobExecutionInfoResponse> getExecutionsForInstance(@BeanParam @Valid JobExecutionRequest request) throws JobInstanceAlreadyCompleteException,
-    NoSuchJobExecutionException, JobExecutionAlreadyRunningException, JobParametersInvalidException, JobRestartException, NoSuchJobException, NoSuchJobInstanceException {
+  public List<JobExecutionInfoResponse> getExecutionsForInstance(@BeanParam @Valid JobExecutionRequest request) throws
+          NoSuchJobException, NoSuchJobInstanceException {
     return jobManager.getExecutionsForJobInstance(request.getJobName(), request.getJobInstanceId());
   }
 
diff --git a/ambari-infra/ambari-infra-manager/src/main/resources/infra-manager.properties b/ambari-infra/ambari-infra-manager/src/main/resources/infra-manager.properties
index aea2b88..a0712ba 100644
--- a/ambari-infra/ambari-infra-manager/src/main/resources/infra-manager.properties
+++ b/ambari-infra/ambari-infra-manager/src/main/resources/infra-manager.properties
@@ -14,13 +14,14 @@
 # limitations under the License.
 
 infra-manager.batch.db.file=job-repository.db
-infra-manager.batch.db.init=true
+infra-manager.batch.db.init=false
 infra-manager.batch.db.username=admin
 infra-manager.batch.db.password=admin
 management.security.enabled=false
 management.health.solr.enabled=false
 infra-manager.server.data.folder=/tmp/ambariInfraManager
 
+infra-manager.jobs.solr_data_archiving.archive_service_logs.enabled=true
 infra-manager.jobs.solr_data_archiving.archive_service_logs.solr.zoo_keeper_connection_string=zookeeper:2181
 infra-manager.jobs.solr_data_archiving.archive_service_logs.solr.collection=hadoop_logs
 infra-manager.jobs.solr_data_archiving.archive_service_logs.solr.query_text=logtime:[${start} TO ${end}]
@@ -33,6 +34,10 @@ infra-manager.jobs.solr_data_archiving.archive_service_logs.destination=LOCAL
 infra-manager.jobs.solr_data_archiving.archive_service_logs.local_destination_directory=/tmp/ambariInfraManager
 infra-manager.jobs.solr_data_archiving.archive_service_logs.file_name_suffix_column=logtime
 infra-manager.jobs.solr_data_archiving.archive_service_logs.file_name_suffix_date_format=yyyy-MM-dd'T'HH-mm-ss.SSSX
+infra-manager.jobs.solr_data_archiving.archive_service_logs.scheduling.enabled=true
+infra-manager.jobs.solr_data_archiving.archive_service_logs.scheduling.cron=0 * * * * ?
+infra-manager.jobs.solr_data_archiving.archive_service_logs.scheduling.intervalEndDelta=PT24H
+infra-manager.jobs.solr_data_archiving.archive_audit_logs.enabled=true
 infra-manager.jobs.solr_data_archiving.archive_audit_logs.solr.zoo_keeper_connection_string=zookeeper:2181
 infra-manager.jobs.solr_data_archiving.archive_audit_logs.solr.collection=audit_logs
 infra-manager.jobs.solr_data_archiving.archive_audit_logs.solr.query_text=logtime:[${start} TO ${end}]
@@ -63,6 +68,7 @@ infra-manager.jobs.solr_data_archiving.archive_audit_logs.s3_endpoint=http://fak
 #infra-manager.jobs.solr_data_archiving.export_ranger_audit_logs.query.filter_query_text=(logtime:"${logtime}" AND id:{"${id}" TO *]) OR logtime:{"${logtime}" TO "${end}"]
 #infra-manager.jobs.solr_data_archiving.export_ranger_audit_logs.query.sort_column[0]=logtime
 #infra-manager.jobs.solr_data_archiving.export_ranger_audit_logs.query.sort_column[1]=id
+infra-manager.jobs.solr_data_deleting.delete_audit_logs.enabled=true
 infra-manager.jobs.solr_data_deleting.delete_audit_logs.zoo_keeper_connection_string=zookeeper:2181
 infra-manager.jobs.solr_data_deleting.delete_audit_logs.collection=audit_logs
 infra-manager.jobs.solr_data_deleting.delete_audit_logs.filter_field=logtime
diff --git a/ambari-infra/ambari-infra-manager/src/main/resources/log4j2.xml b/ambari-infra/ambari-infra-manager/src/main/resources/log4j2.xml
index 9737554..d3db3d7 100644
--- a/ambari-infra/ambari-infra-manager/src/main/resources/log4j2.xml
+++ b/ambari-infra/ambari-infra-manager/src/main/resources/log4j2.xml
@@ -37,5 +37,8 @@
       <AppenderRef ref="File" />
       <AppenderRef ref="Console" />
     </Root>
+    <!--<Logger name="org.springframework.jdbc.core.JdbcTemplate" level="debug">-->
+      <!--<AppenderRef ref="Console"/>-->
+    <!--</Logger>-->
   </Loggers>
 </Configuration>
diff --git a/ambari-infra/ambari-infra-manager/src/test/java/org/apache/ambari/infra/job/JobSchedulerTest.java b/ambari-infra/ambari-infra-manager/src/test/java/org/apache/ambari/infra/job/JobSchedulerTest.java
new file mode 100644
index 0000000..ba1150f
--- /dev/null
+++ b/ambari-infra/ambari-infra-manager/src/test/java/org/apache/ambari/infra/job/JobSchedulerTest.java
@@ -0,0 +1,114 @@
+package org.apache.ambari.infra.job;
+
+import org.apache.ambari.infra.manager.Jobs;
+import org.easymock.EasyMockRunner;
+import org.easymock.EasyMockSupport;
+import org.easymock.Mock;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.springframework.batch.core.ExitStatus;
+import org.springframework.batch.core.JobExecution;
+import org.springframework.batch.core.JobParameters;
+import org.springframework.scheduling.TaskScheduler;
+import org.springframework.scheduling.support.CronTrigger;
+
+import javax.batch.operations.NoSuchJobException;
+import java.util.Optional;
+import java.util.concurrent.ScheduledFuture;
+
+import static org.easymock.EasyMock.eq;
+import static org.easymock.EasyMock.expect;
+import static org.easymock.EasyMock.expectLastCall;
+import static org.easymock.EasyMock.isA;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+@RunWith(EasyMockRunner.class)
+public class JobSchedulerTest extends EasyMockSupport {
+
+  @Mock
+  private TaskScheduler taskScheduler;
+  @Mock
+  private Jobs jobs;
+  @Mock
+  private ScheduledFuture scheduledFuture;
+  private JobScheduler jobScheduler;
+
+  @Before
+  public void setUp() throws Exception {
+    jobScheduler = new JobScheduler(taskScheduler, jobs);
+  }
+
+  @After
+  public void tearDown() throws Exception {
+    verifyAll();
+  }
+
+  @Test(expected = NoSuchJobException.class)
+  public void testScheduleWhenJobNotExistsThrowsException() throws Exception {
+    String jobName = "notFoundJob";
+    expect(jobs.lastRun(jobName)).andThrow(new NoSuchJobException());
+    replayAll();
+
+    jobScheduler.schedule(jobName, null);
+  }
+
+  @Test
+  public void testScheduleWhenNoPreviousExecutionExistsJobIsScheduled() throws Exception {
+    String jobName = "job0";
+    SchedulingProperties schedulingProperties = new SchedulingProperties();
+    schedulingProperties.setCron("* * * * * ?");
+    expect(jobs.lastRun(jobName)).andReturn(Optional.empty());
+    expect(taskScheduler.schedule(isA(Runnable.class), eq(new CronTrigger(schedulingProperties.getCron())))).andReturn(scheduledFuture);
+    replayAll();
+
+    jobScheduler.schedule(jobName, schedulingProperties);
+  }
+
+  @Test
+  public void testScheduleWhenPreviousExecutionWasSuccessfulJobIsScheduled() throws Exception {
+    String jobName = "job0";
+    SchedulingProperties schedulingProperties = new SchedulingProperties();
+    schedulingProperties.setCron("* * * * * ?");
+    JobExecution jobExecution = new JobExecution(1L, new JobParameters());
+    jobExecution.setExitStatus(ExitStatus.COMPLETED);
+    expect(jobs.lastRun(jobName)).andReturn(Optional.of(jobExecution));
+    expect(taskScheduler.schedule(isA(Runnable.class), eq(new CronTrigger(schedulingProperties.getCron())))).andReturn(scheduledFuture);
+    replayAll();
+
+    jobScheduler.schedule(jobName, schedulingProperties);
+  }
+
+  @Test
+  public void testScheduleWhenPreviousExecutionFailedJobIsRestartedAndScheduled() throws Exception {
+    String jobName = "job0";
+    SchedulingProperties schedulingProperties = new SchedulingProperties();
+    schedulingProperties.setCron("* * * * * ?");
+    JobExecution jobExecution = new JobExecution(1L, new JobParameters());
+    jobExecution.setExitStatus(ExitStatus.FAILED);
+    expect(jobs.lastRun(jobName)).andReturn(Optional.of(jobExecution));
+    jobs.restart(1L); expectLastCall();
+    expect(taskScheduler.schedule(isA(Runnable.class), eq(new CronTrigger(schedulingProperties.getCron())))).andReturn(scheduledFuture);
+    replayAll();
+
+    jobScheduler.schedule(jobName, schedulingProperties);
+  }
+}
\ No newline at end of file
diff --git a/ambari-infra/ambari-infra-manager/src/test/java/org/apache/ambari/infra/job/archive/DocumentExporterTest.java b/ambari-infra/ambari-infra-manager/src/test/java/org/apache/ambari/infra/job/archive/DocumentExporterTest.java
index 88fbff0..b31110c 100644
--- a/ambari-infra/ambari-infra-manager/src/test/java/org/apache/ambari/infra/job/archive/DocumentExporterTest.java
+++ b/ambari-infra/ambari-infra-manager/src/test/java/org/apache/ambari/infra/job/archive/DocumentExporterTest.java
@@ -19,6 +19,7 @@
 
 package org.apache.ambari.infra.job.archive;
 
+import org.apache.ambari.infra.job.JobContextRepository;
 import org.easymock.EasyMockRunner;
 import org.easymock.EasyMockSupport;
 import org.easymock.Mock;
@@ -26,12 +27,14 @@ import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
 import org.junit.runner.RunWith;
+import org.springframework.batch.core.BatchStatus;
 import org.springframework.batch.core.JobExecution;
 import org.springframework.batch.core.StepExecution;
 import org.springframework.batch.core.scope.context.ChunkContext;
 import org.springframework.batch.core.scope.context.StepContext;
 import org.springframework.batch.item.ExecutionContext;
 import org.springframework.batch.item.ItemStreamReader;
+import org.springframework.batch.repeat.RepeatStatus;
 
 import java.io.IOException;
 import java.io.UncheckedIOException;
@@ -39,10 +42,20 @@ import java.util.HashMap;
 
 import static org.easymock.EasyMock.expect;
 import static org.easymock.EasyMock.expectLastCall;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.core.Is.is;
 
 @RunWith(EasyMockRunner.class)
 public class DocumentExporterTest extends EasyMockSupport {
 
+  private static final long JOB_EXECUTION_ID = 1L;
+  private static final long STEP_EXECUTION_ID = 1L;
+  private static final Document DOCUMENT_2 = new Document(new HashMap<String, String>() {{
+    put("id", "2");
+  }});
+  private static final Document DOCUMENT_3 = new Document(new HashMap<String, String>() {{
+    put("id", "3");
+  }});
   private DocumentExporter documentExporter;
   @Mock
   private ItemStreamReader<Document> reader;
@@ -52,17 +65,30 @@ public class DocumentExporterTest extends EasyMockSupport {
   private DocumentItemWriter documentItemWriter;
   @Mock
   private DocumentItemWriter documentItemWriter2;
+  @Mock
+  private DocumentItemWriter documentItemWriter3;
+  @Mock
+  private JobContextRepository jobContextRepository;
 
-  private ExecutionContext executionContext;
+//  private ExecutionContext executionContext;
   private ChunkContext chunkContext;
   private static final Document DOCUMENT = new Document(new HashMap<String, String>() {{ put("id", "1"); }});
 
   @Before
   public void setUp() throws Exception {
-    StepExecution stepExecution = new StepExecution("exportDoc", new JobExecution(1L));
-    chunkContext = new ChunkContext(new StepContext(stepExecution));
-    executionContext = stepExecution.getExecutionContext();
-    documentExporter = new DocumentExporter(reader, documentDestination, 2);
+    chunkContext = chunkContext(BatchStatus.STARTED);
+    documentExporter = documentExporter(2);
+  }
+
+  private DocumentExporter documentExporter(int writeBlockSize) {
+    return new DocumentExporter(reader, documentDestination, writeBlockSize, jobContextRepository);
+  }
+
+  private ChunkContext chunkContext(BatchStatus batchStatus) {
+    StepExecution stepExecution = new StepExecution("exportDoc", new JobExecution(JOB_EXECUTION_ID));
+    stepExecution.setId(STEP_EXECUTION_ID);
+    stepExecution.getJobExecution().setStatus(batchStatus);
+    return new ChunkContext(new StepContext(stepExecution));
   }
 
   @After
@@ -72,7 +98,7 @@ public class DocumentExporterTest extends EasyMockSupport {
 
   @Test
   public void testNothingToRead() throws Exception {
-    reader.open(executionContext); expectLastCall();
+    reader.open(executionContext(chunkContext)); expectLastCall();
     expect(reader.read()).andReturn(null);
     reader.close(); expectLastCall();
     replayAll();
@@ -80,9 +106,13 @@ public class DocumentExporterTest extends EasyMockSupport {
     documentExporter.execute(null, chunkContext);
   }
 
+  private ExecutionContext executionContext(ChunkContext chunkContext) {
+    return chunkContext.getStepContext().getStepExecution().getExecutionContext();
+  }
+
   @Test
   public void testWriteLessDocumentsThanWriteBlockSize() throws Exception {
-    reader.open(executionContext); expectLastCall();
+    reader.open(executionContext(chunkContext)); expectLastCall();
     expect(reader.read()).andReturn(DOCUMENT);
     expect(documentDestination.open(DOCUMENT)).andReturn(documentItemWriter);
     documentItemWriter.write(DOCUMENT); expectLastCall();
@@ -91,36 +121,35 @@ public class DocumentExporterTest extends EasyMockSupport {
     documentItemWriter.close(); expectLastCall();
     replayAll();
 
-    documentExporter.execute(null, chunkContext);
+    assertThat(documentExporter.execute(null, chunkContext), is(RepeatStatus.FINISHED));
   }
 
   @Test
   public void testWriteMoreDocumentsThanWriteBlockSize() throws Exception {
-    Document document2 = new Document(new HashMap<String, String>() {{ put("id", "2"); }});
-    Document document3 = new Document(new HashMap<String, String>() {{ put("id", "3"); }});
-
-    reader.open(executionContext); expectLastCall();
+    reader.open(executionContext(chunkContext)); expectLastCall();
     expect(reader.read()).andReturn(DOCUMENT);
     expect(documentDestination.open(DOCUMENT)).andReturn(documentItemWriter);
     documentItemWriter.write(DOCUMENT); expectLastCall();
-    expect(reader.read()).andReturn(document2);
-    documentItemWriter.write(document2); expectLastCall();
-    expect(reader.read()).andReturn(document3);
+    expect(reader.read()).andReturn(DOCUMENT_2);
+    documentItemWriter.write(DOCUMENT_2); expectLastCall();
+    expect(reader.read()).andReturn(DOCUMENT_3);
     documentItemWriter.close(); expectLastCall();
-    expect(documentDestination.open(document3)).andReturn(documentItemWriter2);
-    documentItemWriter2.write(document3); expectLastCall();
+    jobContextRepository.updateExecutionContext(chunkContext.getStepContext().getStepExecution());
+    expect(jobContextRepository.getStepExecution(JOB_EXECUTION_ID, STEP_EXECUTION_ID)).andReturn(chunkContext.getStepContext().getStepExecution());
+    expect(documentDestination.open(DOCUMENT_3)).andReturn(documentItemWriter2);
+    documentItemWriter2.write(DOCUMENT_3); expectLastCall();
     expect(reader.read()).andReturn(null);
-    reader.update(executionContext);
+    reader.update(executionContext(chunkContext));
     reader.close(); expectLastCall();
     documentItemWriter2.close(); expectLastCall();
     replayAll();
 
-    documentExporter.execute(null, chunkContext);
+    assertThat(documentExporter.execute(null, chunkContext), is(RepeatStatus.FINISHED));
   }
 
   @Test(expected = IOException.class)
   public void testReadError() throws Exception {
-    reader.open(executionContext); expectLastCall();
+    reader.open(executionContext(chunkContext)); expectLastCall();
     expect(reader.read()).andReturn(DOCUMENT);
     expect(documentDestination.open(DOCUMENT)).andReturn(documentItemWriter);
     documentItemWriter.write(DOCUMENT); expectLastCall();
@@ -134,7 +163,7 @@ public class DocumentExporterTest extends EasyMockSupport {
 
   @Test(expected = UncheckedIOException.class)
   public void testWriteError() throws Exception {
-    reader.open(executionContext); expectLastCall();
+    reader.open(executionContext(chunkContext)); expectLastCall();
     expect(reader.read()).andReturn(DOCUMENT);
     expect(documentDestination.open(DOCUMENT)).andReturn(documentItemWriter);
     documentItemWriter.write(DOCUMENT); expectLastCall().andThrow(new UncheckedIOException(new IOException("TEST")));
@@ -144,4 +173,43 @@ public class DocumentExporterTest extends EasyMockSupport {
 
     documentExporter.execute(null, chunkContext);
   }
+
+  @Test
+  public void testStopAndRestartExportsAllDocuments() throws Exception {
+    ChunkContext stoppingChunkContext = chunkContext(BatchStatus.STOPPING);
+    DocumentExporter documentExporter = documentExporter(1);
+
+    reader.open(executionContext(chunkContext)); expectLastCall();
+    expect(reader.read()).andReturn(DOCUMENT);
+
+    expect(documentDestination.open(DOCUMENT)).andReturn(documentItemWriter);
+    documentItemWriter.write(DOCUMENT); expectLastCall();
+    expect(reader.read()).andReturn(DOCUMENT_2);
+    expect(jobContextRepository.getStepExecution(JOB_EXECUTION_ID, STEP_EXECUTION_ID)).andReturn(chunkContext.getStepContext().getStepExecution());
+    documentItemWriter.close(); expectLastCall();
+    reader.update(executionContext(this.chunkContext));
+    jobContextRepository.updateExecutionContext(this.chunkContext.getStepContext().getStepExecution());
+
+    expect(documentDestination.open(DOCUMENT_2)).andReturn(documentItemWriter2);
+    documentItemWriter2.write(DOCUMENT_2); expectLastCall();
+    expect(reader.read()).andReturn(DOCUMENT_3);
+    expect(jobContextRepository.getStepExecution(JOB_EXECUTION_ID, STEP_EXECUTION_ID)).andReturn(stoppingChunkContext.getStepContext().getStepExecution());
+    documentItemWriter2.revert(); expectLastCall();
+    reader.close(); expectLastCall();
+
+    reader.open(executionContext(chunkContext));
+    expect(reader.read()).andReturn(DOCUMENT_3);
+    expect(documentDestination.open(DOCUMENT_3)).andReturn(documentItemWriter3);
+    documentItemWriter3.write(DOCUMENT_3); expectLastCall();
+    documentItemWriter3.close(); expectLastCall();
+
+    expect(reader.read()).andReturn(null);
+    reader.close(); expectLastCall();
+    replayAll();
+
+    RepeatStatus repeatStatus = documentExporter.execute(null, this.chunkContext);
+    assertThat(repeatStatus, is(RepeatStatus.CONTINUABLE));
+    repeatStatus = documentExporter.execute(null, this.chunkContext);
+    assertThat(repeatStatus, is(RepeatStatus.FINISHED));
+  }
 }
\ No newline at end of file
diff --git a/ambari-infra/ambari-infra-manager/src/test/resoruces/vagrant-infra-manager.properties.sample b/ambari-infra/ambari-infra-manager/src/test/resoruces/vagrant-infra-manager.properties.sample
index f008a53..d722f0e 100644
--- a/ambari-infra/ambari-infra-manager/src/test/resoruces/vagrant-infra-manager.properties.sample
+++ b/ambari-infra/ambari-infra-manager/src/test/resoruces/vagrant-infra-manager.properties.sample
@@ -12,6 +12,7 @@
 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 # See the License for the specific language governing permissions and
 # limitations under the License.
+
 infra-manager.batch.db.file=job-repository.db
 infra-manager.batch.db.init=true
 infra-manager.batch.db.username=admin
@@ -20,18 +21,19 @@ management.security.enabled=false
 management.health.solr.enabled=false
 infra-manager.server.data.folder=/tmp/ambariInfraManager
 
-infra-manager.jobs.solr_data_export.archive_service_logs.solr.zoo_keeper_connection_string=c6401.ambari.apache.org:2181/infra-solr
-infra-manager.jobs.solr_data_export.archive_service_logs.solr.collection=hadoop_logs
-infra-manager.jobs.solr_data_export.archive_service_logs.solr.query_text=logtime:[${start} TO ${end}]
-infra-manager.jobs.solr_data_export.archive_service_logs.solr.filter_query_text=(logtime:${logtime} AND id:{${id} TO *]) OR logtime:{${logtime} TO ${end}]
-infra-manager.jobs.solr_data_export.archive_service_logs.solr.sort_column[0]=logtime
-infra-manager.jobs.solr_data_export.archive_service_logs.solr.sort_column[1]=id
-infra-manager.jobs.solr_data_export.archive_service_logs.solr.delete_query_text=logtime:[${start.logtime} TO ${end.logtime}} OR (logtime:${end.logtime} AND id:[* TO ${end.id}])
-infra-manager.jobs.solr_data_export.archive_service_logs.read_block_size=2000
-infra-manager.jobs.solr_data_export.archive_service_logs.write_block_size=1000
-infra-manager.jobs.solr_data_export.archive_service_logs.destination=HDFS
-infra-manager.jobs.solr_data_export.archive_service_logs.file_name_suffix_column=logtime
-infra-manager.jobs.solr_data_export.archive_service_logs.file_name_suffix_date_format=yyyy-MM-dd'T'HH-mm-ss.SSSX
-infra-manager.jobs.solr_data_export.archive_service_logs.hdfs_endpoint=hdfs://c6401.ambari.apache.org:8020
-infra-manager.jobs.solr_data_export.archive_service_logs.hdfs_destination_directory=/archived_service_logs
+infra-manager.jobs.solr_data_archiving.archive_service_logs.enabled=true
+infra-manager.jobs.solr_data_archiving.archive_service_logs.solr.zoo_keeper_connection_string=c6401.ambari.apache.org:2181/infra-solr
+infra-manager.jobs.solr_data_archiving.archive_service_logs.solr.collection=hadoop_logs
+infra-manager.jobs.solr_data_archiving.archive_service_logs.solr.query_text=logtime:[${start} TO ${end}]
+infra-manager.jobs.solr_data_archiving.archive_service_logs.solr.filter_query_text=(logtime:${logtime} AND id:{${id} TO *]) OR logtime:{${logtime} TO ${end}]
+infra-manager.jobs.solr_data_archiving.archive_service_logs.solr.sort_column[0]=logtime
+infra-manager.jobs.solr_data_archiving.archive_service_logs.solr.sort_column[1]=id
+infra-manager.jobs.solr_data_archiving.archive_service_logs.solr.delete_query_text=logtime:[${start.logtime} TO ${end.logtime}} OR (logtime:${end.logtime} AND id:[* TO ${end.id}])
+infra-manager.jobs.solr_data_archiving.archive_service_logs.read_block_size=2000
+infra-manager.jobs.solr_data_archiving.archive_service_logs.write_block_size=1000
+infra-manager.jobs.solr_data_archiving.archive_service_logs.destination=HDFS
+infra-manager.jobs.solr_data_archiving.archive_service_logs.file_name_suffix_column=logtime
+infra-manager.jobs.solr_data_archiving.archive_service_logs.file_name_suffix_date_format=yyyy-MM-dd'T'HH-mm-ss.SSSX
+infra-manager.jobs.solr_data_archiving.archive_service_logs.hdfs_endpoint=hdfs://c6401.ambari.apache.org:8020
+infra-manager.jobs.solr_data_archiving.archive_service_logs.hdfs_destination_directory=/archived_service_logs
 # Note: set hdfs user using the HADOOP_USER_NAME environmental variable. Value: hdfs
\ No newline at end of file

-- 
To stop receiving notification emails like this one, please contact
oleewere@apache.org.