You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ambari.apache.org by am...@apache.org on 2018/01/05 07:56:22 UTC

[26/45] ambari git commit: AMBARI-22702. Infra Manager: scheduled deleting of Infra Solr documents (Krisztian Kasa via oleewere )

AMBARI-22702. Infra Manager: scheduled deleting of Infra Solr documents (Krisztian Kasa via oleewere
)


Project: http://git-wip-us.apache.org/repos/asf/ambari/repo
Commit: http://git-wip-us.apache.org/repos/asf/ambari/commit/a85ff23b
Tree: http://git-wip-us.apache.org/repos/asf/ambari/tree/a85ff23b
Diff: http://git-wip-us.apache.org/repos/asf/ambari/diff/a85ff23b

Branch: refs/heads/branch-feature-AMBARI-22008-isilon
Commit: a85ff23bd8737a17544983c44752a9fd602d864a
Parents: c813e05
Author: Krisztian Kasa <ka...@gmail.com>
Authored: Wed Jan 3 11:38:43 2018 +0100
Committer: Oliver Szabo <ol...@gmail.com>
Committed: Wed Jan 3 11:38:43 2018 +0100

----------------------------------------------------------------------
 .../ambari/infra/OffsetDateTimeConverter.java   |   2 +-
 .../ambari/infra/steps/AbstractInfraSteps.java  |   4 +-
 .../ambari/infra/steps/ExportJobsSteps.java     |  48 ++++++-
 .../resources/stories/infra_api_tests.story     |  31 ++--
 .../docker/infra-manager-docker-compose.sh      |   2 +
 ambari-infra/ambari-infra-manager/pom.xml       |   5 +
 .../apache/ambari/infra/job/JobProperties.java  |  47 ++++++
 .../apache/ambari/infra/job/JobPropertyMap.java |  57 ++++++++
 .../apache/ambari/infra/job/PropertyMap.java    |  25 ++++
 .../apache/ambari/infra/job/SolrDAOBase.java    |  65 +++++++++
 .../infra/job/archive/AbstractFileAction.java   |  15 +-
 .../infra/job/archive/CompositeFileAction.java  |   4 +-
 .../ambari/infra/job/archive/Document.java      |   2 +-
 .../archive/DocumentArchivingConfiguration.java | 142 +++++++++++++++++++
 .../archive/DocumentExportConfiguration.java    | 140 ------------------
 .../job/archive/DocumentExportJobListener.java  |  58 --------
 .../job/archive/DocumentExportProperties.java   | 117 +++++++++------
 .../job/archive/DocumentExportPropertyMap.java  |   8 +-
 .../ambari/infra/job/archive/DocumentWiper.java |  23 +++
 .../ambari/infra/job/archive/FileAction.java    |   2 +-
 .../job/archive/FileNameSuffixFormatter.java    |  56 ++++++++
 .../infra/job/archive/ItemWriterListener.java   |   4 +-
 .../job/archive/LocalDocumentItemWriter.java    |  17 ++-
 .../job/archive/LocalItemWriterListener.java    |  36 +++++
 .../ambari/infra/job/archive/S3Properties.java  |  23 ++-
 .../ambari/infra/job/archive/S3Uploader.java    |   4 +-
 .../ambari/infra/job/archive/SolrDAO.java       |  68 +++++++++
 .../infra/job/archive/SolrDocumentIterator.java |   3 +-
 .../infra/job/archive/SolrDocumentSource.java   |  41 +-----
 .../job/archive/SolrParametrizedString.java     |  77 ++++++++++
 .../infra/job/archive/SolrProperties.java       | 117 +++++++++++++++
 .../infra/job/archive/SolrQueryBuilder.java     |  69 +++------
 .../infra/job/archive/TarGzCompressor.java      |   2 +-
 .../infra/job/archive/WriteCompletedEvent.java  |  45 ++++++
 .../deleting/DocumentDeletingConfiguration.java |  90 ++++++++++++
 .../deleting/DocumentDeletingProperties.java    |  77 ++++++++++
 .../deleting/DocumentDeletingPropertyMap.java   |  44 ++++++
 .../job/deleting/DocumentWiperTasklet.java      |  49 +++++++
 .../src/main/resources/infra-manager.properties |  48 ++++---
 .../ambari/infra/job/JobPropertiesTest.java     |  56 ++++++++
 .../archive/DocumentExportPropertiesTest.java   |  54 -------
 .../archive/FileNameSuffixFormatterTest.java    |  58 ++++++++
 .../archive/LocalDocumentItemWriterTest.java    |  21 ++-
 .../job/archive/SolrParametrizedStringTest.java |  57 ++++++++
 .../infra/job/archive/SolrPropertiesTest.java   |  54 +++++++
 .../infra/job/archive/SolrQueryBuilderTest.java |  39 ++---
 46 files changed, 1531 insertions(+), 475 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ambari/blob/a85ff23b/ambari-infra/ambari-infra-manager-it/src/test/java/org/apache/ambari/infra/OffsetDateTimeConverter.java
----------------------------------------------------------------------
diff --git a/ambari-infra/ambari-infra-manager-it/src/test/java/org/apache/ambari/infra/OffsetDateTimeConverter.java b/ambari-infra/ambari-infra-manager-it/src/test/java/org/apache/ambari/infra/OffsetDateTimeConverter.java
index 9db562c..ef469a4 100644
--- a/ambari-infra/ambari-infra-manager-it/src/test/java/org/apache/ambari/infra/OffsetDateTimeConverter.java
+++ b/ambari-infra/ambari-infra-manager-it/src/test/java/org/apache/ambari/infra/OffsetDateTimeConverter.java
@@ -25,7 +25,7 @@ import java.time.OffsetDateTime;
 import java.time.format.DateTimeFormatter;
 
 public class OffsetDateTimeConverter implements ParameterConverters.ParameterConverter {
-  private static final DateTimeFormatter SOLR_DATETIME_FORMATTER = DateTimeFormatter.ofPattern("yyyy-MM-dd'T'HH:mm:ss.SSSX");
+  public static final DateTimeFormatter SOLR_DATETIME_FORMATTER = DateTimeFormatter.ofPattern("yyyy-MM-dd'T'HH:mm:ss.SSSX");
 
   @Override
   public boolean accept(Type type) {

http://git-wip-us.apache.org/repos/asf/ambari/blob/a85ff23b/ambari-infra/ambari-infra-manager-it/src/test/java/org/apache/ambari/infra/steps/AbstractInfraSteps.java
----------------------------------------------------------------------
diff --git a/ambari-infra/ambari-infra-manager-it/src/test/java/org/apache/ambari/infra/steps/AbstractInfraSteps.java b/ambari-infra/ambari-infra-manager-it/src/test/java/org/apache/ambari/infra/steps/AbstractInfraSteps.java
index 703e1cf..f48d4c2 100644
--- a/ambari-infra/ambari-infra-manager-it/src/test/java/org/apache/ambari/infra/steps/AbstractInfraSteps.java
+++ b/ambari-infra/ambari-infra-manager-it/src/test/java/org/apache/ambari/infra/steps/AbstractInfraSteps.java
@@ -46,7 +46,6 @@ import java.net.URL;
 import java.nio.charset.StandardCharsets;
 import java.time.OffsetDateTime;
 import java.util.Date;
-import java.util.UUID;
 import java.util.function.BooleanSupplier;
 
 import static java.lang.System.currentTimeMillis;
@@ -64,6 +63,7 @@ public abstract class AbstractInfraSteps {
   private String dockerHost;
   private SolrClient solrClient;
   private AmazonS3Client s3client;
+  private int documentId = 0;
 
   public InfraClient getInfraClient() {
     return new InfraClient(String.format("http://%s:%d/api/v1/jobs", dockerHost, INFRA_MANAGER_PORT));
@@ -189,7 +189,7 @@ public abstract class AbstractInfraSteps {
     solrInputDocument.addField("action", "getfileinfo");
     solrInputDocument.addField("log_message", "allowed=true\tugi=ambari-qa (auth:SIMPLE)\tip=/192.168.64.102\tcmd=getfileinfo\tsrc=/ats/active\tdst=null\tperm=null\tproto=rpc\tcallerContext=HIVE_QUERY_ID:ambari-qa_20160317200111_223b3079-4a2d-431c-920f-6ba37ed63e9f");
     solrInputDocument.addField("logger_name", "FSNamesystem.audit");
-    solrInputDocument.addField("id", UUID.randomUUID().toString());
+    solrInputDocument.addField("id", Integer.toString(documentId++));
     solrInputDocument.addField("authType", "SIMPLE");
     solrInputDocument.addField("logfile_line_number", 1);
     solrInputDocument.addField("cliIP", "/192.168.64.102");

http://git-wip-us.apache.org/repos/asf/ambari/blob/a85ff23b/ambari-infra/ambari-infra-manager-it/src/test/java/org/apache/ambari/infra/steps/ExportJobsSteps.java
----------------------------------------------------------------------
diff --git a/ambari-infra/ambari-infra-manager-it/src/test/java/org/apache/ambari/infra/steps/ExportJobsSteps.java b/ambari-infra/ambari-infra-manager-it/src/test/java/org/apache/ambari/infra/steps/ExportJobsSteps.java
index 4a09d7d..22826a0 100644
--- a/ambari-infra/ambari-infra-manager-it/src/test/java/org/apache/ambari/infra/steps/ExportJobsSteps.java
+++ b/ambari-infra/ambari-infra-manager-it/src/test/java/org/apache/ambari/infra/steps/ExportJobsSteps.java
@@ -23,6 +23,8 @@ import com.amazonaws.services.s3.model.ListObjectsRequest;
 import com.amazonaws.services.s3.model.ObjectListing;
 import com.amazonaws.services.s3.model.ObjectMetadata;
 import org.apache.ambari.infra.InfraClient;
+import org.apache.solr.client.solrj.SolrQuery;
+import org.apache.solr.client.solrj.SolrServerException;
 import org.jbehave.core.annotations.Alias;
 import org.jbehave.core.annotations.Given;
 import org.jbehave.core.annotations.Then;
@@ -31,9 +33,13 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.io.UncheckedIOException;
 import java.time.Duration;
 import java.time.OffsetDateTime;
 
+import static org.apache.ambari.infra.OffsetDateTimeConverter.SOLR_DATETIME_FORMATTER;
+import static org.hamcrest.CoreMatchers.is;
 import static org.hamcrest.Matchers.containsString;
 import static org.hamcrest.Matchers.hasProperty;
 import static org.hamcrest.core.IsCollectionContaining.hasItem;
@@ -44,8 +50,10 @@ public class ExportJobsSteps extends AbstractInfraSteps {
 
   @Given("$count documents in solr")
   public void addDocuments(int count) throws Exception {
-    for (int i = 0; i < count; ++i)
-      addDocument(OffsetDateTime.now().minusMinutes(i));
+    OffsetDateTime intervalEnd = OffsetDateTime.now();
+    for (int i = 0; i < count; ++i) {
+      addDocument(intervalEnd.minusMinutes(i % (count / 10)));
+    }
     getSolrClient().commit();
   }
 
@@ -84,7 +92,7 @@ public class ExportJobsSteps extends AbstractInfraSteps {
   }
 
   @Then("Check filenames contains the text $text on s3 server after $waitSec seconds")
-  public void checkS3After(String text, int waitSec) throws Exception {
+  public void checkS3After(String text, int waitSec) {
     AmazonS3Client s3Client = getS3client();
     ListObjectsRequest listObjectsRequest = new ListObjectsRequest().withBucketName(S3_BUCKET_NAME);
     doWithin(waitSec, "check uploaded files to s3", () -> s3Client.doesBucketExist(S3_BUCKET_NAME)
@@ -103,4 +111,38 @@ public class ExportJobsSteps extends AbstractInfraSteps {
             .filter(s3ObjectSummary -> s3ObjectSummary.getKey().contains(text))
             .count() == count);
   }
+
+  @Then("No file exists on s3 server with filenames containing the text $text")
+  public void fileNotExistOnS3(String text) {
+    AmazonS3Client s3Client = getS3client();
+    ListObjectsRequest listObjectsRequest = new ListObjectsRequest().withBucketName(S3_BUCKET_NAME);
+    assertThat(s3Client.listObjects(listObjectsRequest).getObjectSummaries().stream()
+            .anyMatch(s3ObjectSummary -> s3ObjectSummary.getKey().contains(text)), is(false));
+  }
+
+  @Then("solr contains $count documents between $startLogtime and $endLogtime")
+  public void documentCount(int count, OffsetDateTime startLogTime, OffsetDateTime endLogTime) throws Exception {
+    SolrQuery query = new SolrQuery();
+    query.setRows(count * 2);
+    query.setQuery(String.format("logtime:[\"%s\" TO \"%s\"]", SOLR_DATETIME_FORMATTER.format(startLogTime), SOLR_DATETIME_FORMATTER.format(endLogTime)));
+    assertThat(getSolrClient().query(query).getResults().size(), is(count));
+  }
+
+  @Then("solr does not contain documents between $startLogtime and $endLogtime after $waitSec seconds")
+  public void isSolrEmpty(OffsetDateTime startLogTime, OffsetDateTime endLogTime, int waitSec) {
+    SolrQuery query = new SolrQuery();
+    query.setRows(1);
+    query.setQuery(String.format("logtime:[\"%s\" TO \"%s\"]", SOLR_DATETIME_FORMATTER.format(startLogTime), SOLR_DATETIME_FORMATTER.format(endLogTime)));
+    doWithin(waitSec, "check solr is empty", () -> isSolrEmpty(query));
+  }
+
+  private boolean isSolrEmpty(SolrQuery query) {
+    try {
+      return getSolrClient().query(query).getResults().isEmpty();
+    } catch (SolrServerException e) {
+      throw new RuntimeException(e);
+    } catch (IOException e) {
+      throw new UncheckedIOException(e);
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/ambari/blob/a85ff23b/ambari-infra/ambari-infra-manager-it/src/test/resources/stories/infra_api_tests.story
----------------------------------------------------------------------
diff --git a/ambari-infra/ambari-infra-manager-it/src/test/resources/stories/infra_api_tests.story b/ambari-infra/ambari-infra-manager-it/src/test/resources/stories/infra_api_tests.story
index cd1f49d..1044258 100644
--- a/ambari-infra/ambari-infra-manager-it/src/test/resources/stories/infra_api_tests.story
+++ b/ambari-infra/ambari-infra-manager-it/src/test/resources/stories/infra_api_tests.story
@@ -1,23 +1,38 @@
-Scenario: Export documents form solr and upload them to s3 using defult configuration
+Scenario: Exporting documents form solr and upload them to s3 using defult configuration
 
 Given 1000 documents in solr
-When start export_audit_logs job
+When start archive_audit_logs job
 Then Check filenames contains the text audit_logs on s3 server after 20 seconds
 
 
 Scenario: Exporting 10 documents using writeBlockSize=3 produces 4 files
 
 Given 10 documents in solr with logtime from 2010-10-09T05:00:00.000Z to 2010-10-09T20:00:00.000Z
-When start export_audit_logs job with parameters writeBlockSize=3,start=2010-10-09T00:00:00.000Z,end=2010-10-11T00:00:00.000Z
+When start archive_audit_logs job with parameters writeBlockSize=3,start=2010-10-09T00:00:00.000Z,end=2010-10-11T00:00:00.000Z
 Then Check 4 files exists on s3 server with filenames containing the text solr_archive_audit_logs_-_2010-10-09 after 20 seconds
+And solr does not contain documents between 2010-10-09T05:00:00.000Z and 2010-10-09T20:00:00.000Z after 5 seconds
 
+Scenario: Running archiving job with a bigger start value than end value exports and deletes 0 documents
 
-Scenario: Export job fails when part of the data is exported. After resolving the issue and restarting the job exports the rest of the data.
+Given 10 documents in solr with logtime from 2010-01-01T05:00:00.000Z to 2010-01-04T05:00:00.000Z
+When start archive_audit_logs job with parameters writeBlockSize=3,start=2010-01-03T05:00:00.000Z,end=2010-01-02T05:00:00.000Z
+Then No file exists on s3 server with filenames containing the text solr_archive_audit_logs_-_2010-01-0
+And solr contains 10 documents between 2010-01-01T05:00:00.000Z and 2010-01-04T05:00:00.000Z
+
+Scenario: Archiving job fails when part of the data is exported. After resolving the issue and restarting the job exports the rest of the data.
 
 Given 200 documents in solr with logtime from 2011-10-09T05:00:00.000Z to 2011-10-09T20:00:00.000Z
-And a file on s3 with key solr_archive_audit_logs_-_2011-10-09T08:00:00.000Z.json.tar.gz
-When start export_audit_logs job with parameters writeBlockSize=20,start=2010-11-09T00:00:00.000Z,end=2011-10-11T00:00:00.000Z
+And a file on s3 with key solr_archive_audit_logs_-_2011-10-09T08-00-00.000Z.json.tar.gz
+When start archive_audit_logs job with parameters writeBlockSize=20,start=2010-11-09T00:00:00.000Z,end=2011-10-11T00:00:00.000Z
 Then Check 3 files exists on s3 server with filenames containing the text solr_archive_audit_logs_-_2011-10-09 after 20 seconds
-When delete file with key solr_archive_audit_logs_-_2011-10-09T08:00:00.000Z.json.tar.gz from s3
-And restart export_audit_logs job with parameters writeBlockSize=20,start=2010-11-09T00:00:00.000Z,end=2011-10-11T00:00:00.000Z
+And solr does not contain documents between 2011-10-09T05:00:00.000Z and 2011-10-09T07:59:59.999Z after 5 seconds
+When delete file with key solr_archive_audit_logs_-_2011-10-09T08-00-00.000Z.json.tar.gz from s3
+And restart archive_audit_logs job with parameters writeBlockSize=20,start=2010-11-09T00:00:00.000Z,end=2011-10-11T00:00:00.000Z
 Then Check 10 files exists on s3 server with filenames containing the text solr_archive_audit_logs_-_2011-10-09 after 20 seconds
+And solr does not contain documents between 2011-10-09T05:00:00.000Z and 2011-10-09T20:00:00.000Z after 5 seconds
+
+Scenario: After Deleting job deletes documents from solr no document found in the specified interval
+
+Given 10 documents in solr with logtime from 2012-10-09T05:00:00.000Z to 2012-10-09T20:00:00.000Z
+When start delete_audit_logs job with parameters start=2012-10-09T05:00:00.000Z,end=2012-10-09T20:00:00.000Z
+Then solr does not contain documents between 2012-10-09T05:00:00.000Z and 2012-10-09T20:00:00.000Z after 5 seconds
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ambari/blob/a85ff23b/ambari-infra/ambari-infra-manager/docker/infra-manager-docker-compose.sh
----------------------------------------------------------------------
diff --git a/ambari-infra/ambari-infra-manager/docker/infra-manager-docker-compose.sh b/ambari-infra/ambari-infra-manager/docker/infra-manager-docker-compose.sh
old mode 100644
new mode 100755
index ab02659..6ddb7c2
--- a/ambari-infra/ambari-infra-manager/docker/infra-manager-docker-compose.sh
+++ b/ambari-infra/ambari-infra-manager/docker/infra-manager-docker-compose.sh
@@ -81,6 +81,8 @@ function setup_profile() {
   local AMBARI_LOCATION=$(pwd)
   popd
   cat << EOF > $sdir/Profile
+AWS_ACCESS_KEY_ID=test
+AWS_SECRET_ACCESS_KEY=test
 EOF
 }
 

http://git-wip-us.apache.org/repos/asf/ambari/blob/a85ff23b/ambari-infra/ambari-infra-manager/pom.xml
----------------------------------------------------------------------
diff --git a/ambari-infra/ambari-infra-manager/pom.xml b/ambari-infra/ambari-infra-manager/pom.xml
index 67bf7d1..de131b0 100644
--- a/ambari-infra/ambari-infra-manager/pom.xml
+++ b/ambari-infra/ambari-infra-manager/pom.xml
@@ -428,6 +428,11 @@
       <artifactId>aws-java-sdk-s3</artifactId>
       <version>1.11.5</version>
     </dependency>
+    <dependency>
+      <groupId>org.apache.commons</groupId>
+      <artifactId>commons-csv</artifactId>
+      <version>1.5</version>
+    </dependency>
   </dependencies>
 
 </project>

http://git-wip-us.apache.org/repos/asf/ambari/blob/a85ff23b/ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/JobProperties.java
----------------------------------------------------------------------
diff --git a/ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/JobProperties.java b/ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/JobProperties.java
new file mode 100644
index 0000000..0841dd7
--- /dev/null
+++ b/ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/JobProperties.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.ambari.infra.job;
+
+import org.apache.htrace.fasterxml.jackson.databind.ObjectMapper;
+import org.springframework.batch.core.JobParameters;
+
+import java.io.IOException;
+import java.io.UncheckedIOException;
+
+public abstract class JobProperties<T extends JobProperties<T>> {
+  private final Class<T> clazz;
+
+  protected JobProperties(Class<T> clazz) {
+    this.clazz = clazz;
+  }
+
+  public T deepCopy() {
+    try {
+      ObjectMapper objectMapper = new ObjectMapper();
+      String json = objectMapper.writeValueAsString(this);
+      return objectMapper.readValue(json, clazz);
+    } catch (IOException e) {
+      throw new UncheckedIOException(e);
+    }
+  }
+
+  public abstract void apply(JobParameters jobParameters);
+
+  public abstract void validate();
+}

http://git-wip-us.apache.org/repos/asf/ambari/blob/a85ff23b/ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/JobPropertyMap.java
----------------------------------------------------------------------
diff --git a/ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/JobPropertyMap.java b/ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/JobPropertyMap.java
new file mode 100644
index 0000000..b5061f8
--- /dev/null
+++ b/ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/JobPropertyMap.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.ambari.infra.job;
+
+import org.springframework.batch.core.ExitStatus;
+import org.springframework.batch.core.JobExecution;
+import org.springframework.batch.core.JobExecutionListener;
+
+public class JobPropertyMap<T extends JobProperties<T>> implements JobExecutionListener {
+
+  private final PropertyMap<T> propertyMap;
+
+  public JobPropertyMap(PropertyMap<T> propertyMap) {
+    this.propertyMap = propertyMap;
+  }
+
+  @Override
+  public void beforeJob(JobExecution jobExecution) {
+    try {
+      String jobName = jobExecution.getJobInstance().getJobName();
+      T defaultProperties = propertyMap.getPropertyMap().get(jobName);
+      if (defaultProperties == null)
+        throw new UnsupportedOperationException("Properties not found for job " + jobName);
+
+      T properties = defaultProperties.deepCopy();
+      properties.apply(jobExecution.getJobParameters());
+      properties.validate();
+      jobExecution.getExecutionContext().put("jobProperties", properties);
+    }
+    catch (UnsupportedOperationException | IllegalArgumentException ex) {
+      jobExecution.stop();
+      jobExecution.setExitStatus(new ExitStatus(ExitStatus.FAILED.getExitCode(), ex.getMessage()));
+      throw ex;
+    }
+  }
+
+  @Override
+  public void afterJob(JobExecution jobExecution) {
+
+  }
+}

http://git-wip-us.apache.org/repos/asf/ambari/blob/a85ff23b/ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/PropertyMap.java
----------------------------------------------------------------------
diff --git a/ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/PropertyMap.java b/ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/PropertyMap.java
new file mode 100644
index 0000000..e7b9e77
--- /dev/null
+++ b/ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/PropertyMap.java
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.ambari.infra.job;
+
+import java.util.Map;
+
+public interface PropertyMap<T extends JobProperties<T>> {
+  Map<String, T> getPropertyMap();
+}

http://git-wip-us.apache.org/repos/asf/ambari/blob/a85ff23b/ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/SolrDAOBase.java
----------------------------------------------------------------------
diff --git a/ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/SolrDAOBase.java b/ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/SolrDAOBase.java
new file mode 100644
index 0000000..3ac5b05
--- /dev/null
+++ b/ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/SolrDAOBase.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.ambari.infra.job;
+
+import org.apache.solr.client.solrj.SolrServerException;
+import org.apache.solr.client.solrj.impl.CloudSolrClient;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.io.UncheckedIOException;
+
+public abstract class SolrDAOBase {
+  private static final Logger LOG = LoggerFactory.getLogger(SolrDAOBase.class);
+
+  private final String zooKeeperConnectionString;
+  private final String defaultCollection;
+
+  protected SolrDAOBase(String zooKeeperConnectionString, String defaultCollection) {
+    this.zooKeeperConnectionString = zooKeeperConnectionString;
+    this.defaultCollection = defaultCollection;
+  }
+
+  protected void delete(String deleteQueryText) {
+    try (CloudSolrClient client = createClient()) {
+      try {
+        LOG.info("Executing solr delete by query {}", deleteQueryText);
+        client.deleteByQuery(deleteQueryText);
+        client.commit();
+      } catch (Exception e) {
+        try {
+          client.rollback();
+        } catch (SolrServerException e1) {
+          LOG.warn("Unable to rollback after solr delete operation failure.", e1);
+        }
+        throw new RuntimeException(e);
+      }
+    } catch (IOException e) {
+      throw new UncheckedIOException(e);
+    }
+  }
+
+  protected CloudSolrClient createClient() {
+    CloudSolrClient client = new CloudSolrClient.Builder().withZkHost(zooKeeperConnectionString).build();
+    client.setDefaultCollection(defaultCollection);
+    return client;
+  }
+}
+

http://git-wip-us.apache.org/repos/asf/ambari/blob/a85ff23b/ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/archive/AbstractFileAction.java
----------------------------------------------------------------------
diff --git a/ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/archive/AbstractFileAction.java b/ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/archive/AbstractFileAction.java
index 7a30393..3df18b6 100644
--- a/ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/archive/AbstractFileAction.java
+++ b/ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/archive/AbstractFileAction.java
@@ -18,16 +18,21 @@
  */
 package org.apache.ambari.infra.job.archive;
 
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 import java.io.File;
 
 public abstract class AbstractFileAction implements FileAction {
+  private static final Logger LOG = LoggerFactory.getLogger(AbstractFileAction.class);
+
   @Override
-  public File perform(File inputFile, boolean deleteInput) {
-    File outputFile =  perform(inputFile);
-    if (deleteInput)
-      inputFile.delete();
+  public File perform(File inputFile) {
+    File outputFile =  onPerform(inputFile);
+    if (!inputFile.delete())
+      LOG.warn("File {} was not deleted. Exists: {}", inputFile.getAbsolutePath(), inputFile.exists());
     return outputFile;
   }
 
-  protected abstract File perform(File inputFile);
+  protected abstract File onPerform(File inputFile);
 }

http://git-wip-us.apache.org/repos/asf/ambari/blob/a85ff23b/ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/archive/CompositeFileAction.java
----------------------------------------------------------------------
diff --git a/ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/archive/CompositeFileAction.java b/ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/archive/CompositeFileAction.java
index 8421802..99bc6d9 100644
--- a/ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/archive/CompositeFileAction.java
+++ b/ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/archive/CompositeFileAction.java
@@ -37,10 +37,10 @@ public class CompositeFileAction implements FileAction {
   }
 
   @Override
-  public File perform(File inputFile, boolean deleteInput) {
+  public File perform(File inputFile) {
     File file = inputFile;
     for (FileAction action : actions) {
-      file = action.perform(file, deleteInput);
+      file = action.perform(file);
     }
     return file;
   }

http://git-wip-us.apache.org/repos/asf/ambari/blob/a85ff23b/ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/archive/Document.java
----------------------------------------------------------------------
diff --git a/ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/archive/Document.java b/ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/archive/Document.java
index 1f3957a..5ff9587 100644
--- a/ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/archive/Document.java
+++ b/ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/archive/Document.java
@@ -42,7 +42,7 @@ public class Document {
   }
 
   @JsonAnyGetter
-  private Map<String, String> getFieldMap() {
+  public Map<String, String> getFieldMap() {
     return fieldMap;
   }
 

http://git-wip-us.apache.org/repos/asf/ambari/blob/a85ff23b/ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/archive/DocumentArchivingConfiguration.java
----------------------------------------------------------------------
diff --git a/ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/archive/DocumentArchivingConfiguration.java b/ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/archive/DocumentArchivingConfiguration.java
new file mode 100644
index 0000000..7588b99
--- /dev/null
+++ b/ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/archive/DocumentArchivingConfiguration.java
@@ -0,0 +1,142 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.ambari.infra.job.archive;
+
+import org.apache.ambari.infra.job.JobPropertyMap;
+import org.apache.ambari.infra.job.ObjectSource;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.batch.core.Job;
+import org.springframework.batch.core.Step;
+import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
+import org.springframework.batch.core.configuration.annotation.JobScope;
+import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
+import org.springframework.batch.core.configuration.annotation.StepScope;
+import org.springframework.batch.core.configuration.support.JobRegistryBeanPostProcessor;
+import org.springframework.beans.factory.annotation.Qualifier;
+import org.springframework.beans.factory.annotation.Value;
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
+
+import javax.annotation.PostConstruct;
+import javax.inject.Inject;
+import java.io.File;
+import java.nio.file.Paths;
+
+@Configuration
+public class DocumentArchivingConfiguration {
+  private static final Logger LOG = LoggerFactory.getLogger(DocumentArchivingConfiguration.class);
+
+  @Inject
+  private DocumentExportPropertyMap propertyMap;
+
+  @Inject
+  private StepBuilderFactory steps;
+
+  @Inject
+  private JobBuilderFactory jobs;
+
+  @Inject
+  @Qualifier("exportStep")
+  private Step exportStep;
+
+  @Inject
+  private JobRegistryBeanPostProcessor jobRegistryBeanPostProcessor;
+
+
+  @PostConstruct
+  public void createJobs() {
+    propertyMap.getSolrDataExport().values().forEach(DocumentExportProperties::validate);
+
+    propertyMap.getSolrDataExport().keySet().forEach(jobName -> {
+      LOG.info("Registering data archiving job {}", jobName);
+      Job job = logExportJob(jobName, exportStep);
+      jobRegistryBeanPostProcessor.postProcessAfterInitialization(job, jobName);
+    });
+  }
+
+  private Job logExportJob(String jobName, Step logExportStep) {
+    return jobs.get(jobName).listener(new JobPropertyMap<>(propertyMap)).start(logExportStep).build();
+  }
+
+  @Bean
+  @JobScope
+  public Step exportStep(DocumentExporter documentExporter) {
+    return steps.get("export")
+            .tasklet(documentExporter)
+            .build();
+  }
+
+  @Bean
+  @StepScope
+  public DocumentExporter documentExporter(DocumentItemReader documentItemReader,
+                                           @Value("#{stepExecution.jobExecution.id}") String jobId,
+                                           @Value("#{stepExecution.jobExecution.executionContext.get('jobProperties')}") DocumentExportProperties properties,
+                                           SolrDAO solrDAO) {
+    File path = Paths.get(
+            properties.getDestinationDirectoryPath(),
+            // TODO: jobId should remain the same after continuing job
+            String.format("%s_%s", properties.getSolr().getCollection(), jobId)).toFile(); // TODO: add end date
+    LOG.info("Destination directory path={}", path);
+    if (!path.exists()) {
+      if (!path.mkdirs()) {
+        LOG.warn("Unable to create directory {}", path);
+      }
+    }
+
+    CompositeFileAction fileAction = new CompositeFileAction(new TarGzCompressor());
+    properties.s3Properties().ifPresent(s3Properties -> fileAction.add(new S3Uploader(s3Properties)));
+    FileNameSuffixFormatter fileNameSuffixFormatter = FileNameSuffixFormatter.from(properties);
+    LocalItemWriterListener itemWriterListener = new LocalItemWriterListener(fileAction, solrDAO);
+
+    return new DocumentExporter(
+            documentItemReader,
+            firstDocument -> new LocalDocumentItemWriter(
+                    outFile(properties.getSolr().getCollection(), path, fileNameSuffixFormatter.format(firstDocument)), itemWriterListener),
+            properties.getWriteBlockSize());
+  }
+
+  @Bean
+  @StepScope
+  public SolrDAO solrDAO(@Value("#{stepExecution.jobExecution.executionContext.get('jobProperties')}") DocumentExportProperties properties) {
+    return new SolrDAO(properties.getSolr());
+  }
+
+  private File outFile(String collection, File directoryPath, String suffix) {
+    File file = new File(directoryPath, String.format("%s_-_%s.json", collection, suffix));
+    LOG.info("Exporting to temp file {}", file.getAbsolutePath());
+    return file;
+  }
+
+  @Bean
+  @StepScope
+  public DocumentItemReader reader(ObjectSource<Document> documentSource,
+                                   @Value("#{stepExecution.jobExecution.executionContext.get('jobProperties')}") DocumentExportProperties properties) {
+    return new DocumentItemReader(documentSource, properties.getReadBlockSize());
+  }
+
+  @Bean
+  @StepScope
+  public ObjectSource<Document> logSource(@Value("#{jobParameters[start]}") String start,
+                                          @Value("#{jobParameters[end]}") String end,
+                                          SolrDAO solrDAO) {
+
+    return new SolrDocumentSource(solrDAO, start, end);
+  }
+}

http://git-wip-us.apache.org/repos/asf/ambari/blob/a85ff23b/ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/archive/DocumentExportConfiguration.java
----------------------------------------------------------------------
diff --git a/ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/archive/DocumentExportConfiguration.java b/ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/archive/DocumentExportConfiguration.java
deleted file mode 100644
index 1895911..0000000
--- a/ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/archive/DocumentExportConfiguration.java
+++ /dev/null
@@ -1,140 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.ambari.infra.job.archive;
-
-import org.apache.ambari.infra.job.ObjectSource;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.springframework.batch.core.Job;
-import org.springframework.batch.core.Step;
-import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
-import org.springframework.batch.core.configuration.annotation.JobScope;
-import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
-import org.springframework.batch.core.configuration.annotation.StepScope;
-import org.springframework.batch.core.configuration.support.JobRegistryBeanPostProcessor;
-import org.springframework.beans.factory.annotation.Qualifier;
-import org.springframework.beans.factory.annotation.Value;
-import org.springframework.context.annotation.Bean;
-import org.springframework.context.annotation.Configuration;
-
-import javax.annotation.PostConstruct;
-import javax.inject.Inject;
-import java.io.File;
-import java.nio.file.Paths;
-
-@Configuration
-public class DocumentExportConfiguration {
-  private static final Logger LOG = LoggerFactory.getLogger(DocumentExportConfiguration.class);
-
-  @Inject
-  private DocumentExportPropertyMap propertyMap;
-
-  @Inject
-  private StepBuilderFactory steps;
-
-  @Inject
-  private JobBuilderFactory jobs;
-
-  @Inject
-  @Qualifier("exportStep")
-  private Step exportStep;
-
-  @Inject
-  private JobRegistryBeanPostProcessor jobRegistryBeanPostProcessor;
-
-
-  @PostConstruct
-  public void createJobs() {
-    propertyMap.getSolrDataExport().values().forEach(DocumentExportProperties::validate);
-
-    propertyMap.getSolrDataExport().keySet().forEach(jobName -> {
-      Job job = logExportJob(jobName, exportStep);
-      jobRegistryBeanPostProcessor.postProcessAfterInitialization(job, jobName);
-    });
-  }
-
-  private Job logExportJob(String jobName, Step logExportStep) {
-    return jobs.get(jobName).listener(new DocumentExportJobListener(propertyMap)).start(logExportStep).build();
-  }
-
-  @Bean
-  @JobScope
-  public Step exportStep(DocumentExporter documentExporter) {
-    return steps.get("export")
-            .tasklet(documentExporter)
-            .build();
-  }
-
-  @Bean
-  @StepScope
-  public DocumentExporter documentExporter(DocumentItemReader documentItemReader,
-                                           @Value("#{stepExecution.jobExecution.id}") String jobId,
-                                           @Value("#{stepExecution.jobExecution.executionContext.get('exportProperties')}") DocumentExportProperties properties) {
-    File path = Paths.get(
-            properties.getDestinationDirectoryPath(),
-            // TODO: jobId should remain the same after continuing job
-            String.format("%s_%s", properties.getQuery().getCollection(), jobId)).toFile(); // TODO: add end date
-    LOG.info("Destination directory path={}", path);
-    if (!path.exists()) {
-      if (!path.mkdirs()) {
-        LOG.warn("Unable to create directory {}", path);
-      }
-    }
-
-    CompositeFileAction fileAction = new CompositeFileAction(new TarGzCompressor());
-    properties.s3Properties().ifPresent(s3Properties -> fileAction.add(new S3Uploader(s3Properties)));
-
-    return new DocumentExporter(
-            documentItemReader,
-            firstDocument -> localDocumentItemWriter(properties, path, fileAction, firstDocument),
-            properties.getWriteBlockSize());
-  }
-
-  private LocalDocumentItemWriter localDocumentItemWriter(DocumentExportProperties properties, File path, FileAction fileAction, Document firstDocument) {
-    return new LocalDocumentItemWriter(outFile(properties.getQuery().getCollection(), path, firstDocument.get(properties.getFileNameSuffixColumn())),
-            file -> fileAction.perform(file, true));
-  }
-
-  private File outFile(String collection, File directoryPath, String suffix) {
-    // TODO: format date (suffix)
-    File file = new File(directoryPath, String.format("%s_-_%s.json", collection, suffix));
-    LOG.info("Exporting to temp file {}", file.getAbsolutePath());
-    return file;
-  }
-
-  @Bean
-  @StepScope
-  public DocumentItemReader reader(ObjectSource<Document> documentSource,
-                                   @Value("#{stepExecution.jobExecution.executionContext.get('exportProperties')}") DocumentExportProperties properties) {
-    return new DocumentItemReader(documentSource, properties.getReadBlockSize());
-  }
-
-  @Bean
-  @StepScope
-  public ObjectSource logSource(@Value("#{jobParameters[start]}") String start,
-                                @Value("#{jobParameters[end]}") String end,
-                                @Value("#{stepExecution.jobExecution.executionContext.get('exportProperties')}") DocumentExportProperties properties) {
-
-    return new SolrDocumentSource(
-            properties.getZooKeeperConnectionString(),
-            properties.getQuery(),
-            start,
-            end);
-  }
-}

http://git-wip-us.apache.org/repos/asf/ambari/blob/a85ff23b/ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/archive/DocumentExportJobListener.java
----------------------------------------------------------------------
diff --git a/ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/archive/DocumentExportJobListener.java b/ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/archive/DocumentExportJobListener.java
deleted file mode 100644
index 3b6c402..0000000
--- a/ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/archive/DocumentExportJobListener.java
+++ /dev/null
@@ -1,58 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.ambari.infra.job.archive;
-
-import org.springframework.batch.core.ExitStatus;
-import org.springframework.batch.core.JobExecution;
-import org.springframework.batch.core.JobExecutionListener;
-
-public class DocumentExportJobListener implements JobExecutionListener {
-
-  private final DocumentExportPropertyMap propertyMap;
-
-  public DocumentExportJobListener(DocumentExportPropertyMap propertyMap) {
-    this.propertyMap = propertyMap;
-  }
-
-
-  @Override
-  public void beforeJob(JobExecution jobExecution) {
-    try {
-      String jobName = jobExecution.getJobInstance().getJobName();
-      DocumentExportProperties defaultProperties = propertyMap.getSolrDataExport().get(jobName);
-      if (defaultProperties == null)
-        throw new UnsupportedOperationException("Properties not found for job " + jobName);
-
-      DocumentExportProperties properties = defaultProperties.deepCopy();
-      properties.apply(jobExecution.getJobParameters());
-      properties.validate();
-      jobExecution.getExecutionContext().put("exportProperties", properties);
-    }
-    catch (UnsupportedOperationException | IllegalArgumentException ex) {
-      jobExecution.stop();
-      jobExecution.setExitStatus(new ExitStatus(ExitStatus.FAILED.getExitCode(), ex.getMessage()));
-      throw ex;
-    }
-  }
-
-  @Override
-  public void afterJob(JobExecution jobExecution) {
-    jobExecution.setExitStatus(new ExitStatus(ExitStatus.COMPLETED.getExitCode()));
-  }
-}

http://git-wip-us.apache.org/repos/asf/ambari/blob/a85ff23b/ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/archive/DocumentExportProperties.java
----------------------------------------------------------------------
diff --git a/ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/archive/DocumentExportProperties.java b/ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/archive/DocumentExportProperties.java
index 37f6d1b..1484eed 100644
--- a/ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/archive/DocumentExportProperties.java
+++ b/ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/archive/DocumentExportProperties.java
@@ -18,34 +18,71 @@
  */
 package org.apache.ambari.infra.job.archive;
 
-import org.apache.htrace.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.ambari.infra.job.JobProperties;
+import org.apache.commons.csv.CSVParser;
+import org.apache.commons.csv.CSVRecord;
 import org.springframework.batch.core.JobParameters;
 
+import java.io.FileReader;
 import java.io.IOException;
 import java.io.UncheckedIOException;
+import java.util.Iterator;
+import java.util.Map;
 import java.util.Optional;
+import java.util.function.Supplier;
 
+import static org.apache.commons.csv.CSVFormat.DEFAULT;
 import static org.apache.commons.lang.StringUtils.isBlank;
 
-public class DocumentExportProperties {
-  private String zooKeeperConnectionString;
+public class DocumentExportProperties extends JobProperties<DocumentExportProperties> {
   private int readBlockSize;
   private int writeBlockSize;
   private String destinationDirectoryPath;
   private String fileNameSuffixColumn;
-  private SolrQueryProperties query;
-  private String s3AccessKey;
-  private String s3SecretKey;
+  private String fileNameSuffixDateFormat;
+  private SolrProperties solr;
+  private String s3AccessFile;
   private String s3KeyPrefix;
   private String s3BucketName;
   private String s3Endpoint;
+  private transient Supplier<Optional<S3Properties>> s3Properties;
+
+  public DocumentExportProperties() {
+    super(DocumentExportProperties.class);
+    s3Properties = this::loadS3Properties;
+  }
+
+  private Optional<S3Properties> loadS3Properties() {
+    if (isBlank(s3BucketName))
+      return Optional.empty();
+
+    String accessKey = System.getenv("AWS_ACCESS_KEY_ID");
+    String secretKey = System.getenv("AWS_SECRET_ACCESS_KEY");
+
+    if (isBlank(accessKey) || isBlank(secretKey)) {
+      if (isBlank(s3AccessFile))
+        return Optional.empty();
+      try (CSVParser csvParser = CSVParser.parse(new FileReader(s3AccessFile), DEFAULT.withHeader("Access key ID", "Secret access key"))) {
+        Iterator<CSVRecord> iterator = csvParser.iterator();
+        if (!iterator.hasNext()) {
+          return Optional.empty();
+        }
+
+        CSVRecord record = csvParser.iterator().next();
+        Map<String, Integer> header = csvParser.getHeaderMap();
+        accessKey = record.get(header.get("Access key ID"));
+        secretKey = record.get(header.get("Secret access key"));
+      } catch (IOException e) {
+        throw new UncheckedIOException(e);
+      }
+    }
 
-  public String getZooKeeperConnectionString() {
-    return zooKeeperConnectionString;
-  }
-
-  public void setZooKeeperConnectionString(String zooKeeperConnectionString) {
-    this.zooKeeperConnectionString = zooKeeperConnectionString;
+    return Optional.of(new S3Properties(
+            accessKey,
+            secretKey,
+            s3KeyPrefix,
+            s3BucketName,
+            s3Endpoint));
   }
 
   public int getReadBlockSize() {
@@ -80,28 +117,29 @@ public class DocumentExportProperties {
     this.fileNameSuffixColumn = fileNameSuffixColumn;
   }
 
-  public SolrQueryProperties getQuery() {
-    return query;
+  public String getFileNameSuffixDateFormat() {
+    return fileNameSuffixDateFormat;
   }
 
-  public void setQuery(SolrQueryProperties query) {
-    this.query = query;
+  public void setFileNameSuffixDateFormat(String fileNameSuffixDateFormat) {
+    this.fileNameSuffixDateFormat = fileNameSuffixDateFormat;
   }
 
-  public String getS3AccessKey() {
-    return s3AccessKey;
+  public SolrProperties getSolr() {
+    return solr;
   }
 
-  public void setS3AccessKey(String s3AccessKey) {
-    this.s3AccessKey = s3AccessKey;
+  public void setSolr(SolrProperties query) {
+    this.solr = query;
   }
 
-  public String getS3SecretKey() {
-    return s3SecretKey;
+  public String getS3AccessFile() {
+    return s3AccessFile;
   }
 
-  public void setS3SecretKey(String s3SecretKey) {
-    this.s3SecretKey = s3SecretKey;
+  public void setS3AccessFile(String s3AccessFile) {
+    this.s3AccessFile = s3AccessFile;
+    s3Properties = this::loadS3Properties;
   }
 
   public String getS3KeyPrefix() {
@@ -128,12 +166,16 @@ public class DocumentExportProperties {
     this.s3Endpoint = s3Endpoint;
   }
 
+  public Optional<S3Properties> s3Properties() {
+    return s3Properties.get();
+  }
+
+  @Override
   public void apply(JobParameters jobParameters) {
-    zooKeeperConnectionString = jobParameters.getString("zooKeeperConnectionString", zooKeeperConnectionString);
     readBlockSize = getIntJobParameter(jobParameters, "readBlockSize", readBlockSize);
     writeBlockSize = getIntJobParameter(jobParameters, "writeBlockSize", writeBlockSize);
     destinationDirectoryPath = jobParameters.getString("destinationDirectoryPath", destinationDirectoryPath);
-    query.apply(jobParameters);
+    solr.apply(jobParameters);
   }
 
   private int getIntJobParameter(JobParameters jobParameters, String parameterName, int defaultValue) {
@@ -143,26 +185,8 @@ public class DocumentExportProperties {
     return Integer.parseInt(valueText);
   }
 
-  public DocumentExportProperties deepCopy() {
-    try {
-      ObjectMapper objectMapper = new ObjectMapper();
-      String json = objectMapper.writeValueAsString(this);
-      return objectMapper.readValue(json, DocumentExportProperties.class);
-    } catch (IOException e) {
-      throw new UncheckedIOException(e);
-    }
-  }
-
-  public Optional<S3Properties> s3Properties() {
-    if (!isBlank(s3AccessKey) && !isBlank(s3SecretKey) && !isBlank(s3BucketName))
-      return Optional.of(new S3Properties(s3AccessKey, s3SecretKey, s3KeyPrefix, s3BucketName, s3Endpoint));
-    return Optional.empty();
-  }
-
+  @Override
   public void validate() {
-    if (isBlank(zooKeeperConnectionString))
-      throw new IllegalArgumentException("The property zooKeeperConnectionString can not be null or empty string!");
-
     if (readBlockSize == 0)
       throw new IllegalArgumentException("The property readBlockSize must be greater than 0!");
 
@@ -175,6 +199,7 @@ public class DocumentExportProperties {
     if (isBlank(fileNameSuffixColumn))
       throw new IllegalArgumentException("The property fileNameSuffixColumn can not be null or empty string!");
 
-    query.validate();
+    solr.validate();
+    s3Properties().ifPresent(S3Properties::validate);
   }
 }

http://git-wip-us.apache.org/repos/asf/ambari/blob/a85ff23b/ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/archive/DocumentExportPropertyMap.java
----------------------------------------------------------------------
diff --git a/ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/archive/DocumentExportPropertyMap.java b/ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/archive/DocumentExportPropertyMap.java
index 9af4afc..16e77d2 100644
--- a/ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/archive/DocumentExportPropertyMap.java
+++ b/ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/archive/DocumentExportPropertyMap.java
@@ -18,6 +18,7 @@
  */
 package org.apache.ambari.infra.job.archive;
 
+import org.apache.ambari.infra.job.PropertyMap;
 import org.springframework.boot.context.properties.ConfigurationProperties;
 import org.springframework.context.annotation.Configuration;
 
@@ -25,7 +26,7 @@ import java.util.Map;
 
 @Configuration
 @ConfigurationProperties(prefix = "infra-manager.jobs")
-public class DocumentExportPropertyMap {
+public class DocumentExportPropertyMap implements PropertyMap<DocumentExportProperties> {
   private Map<String, DocumentExportProperties> solrDataExport;
 
   public Map<String, DocumentExportProperties> getSolrDataExport() {
@@ -35,4 +36,9 @@ public class DocumentExportPropertyMap {
   public void setSolrDataExport(Map<String, DocumentExportProperties> solrDataExport) {
     this.solrDataExport = solrDataExport;
   }
+
+  @Override
+  public Map<String, DocumentExportProperties> getPropertyMap() {
+    return getSolrDataExport();
+  }
 }

http://git-wip-us.apache.org/repos/asf/ambari/blob/a85ff23b/ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/archive/DocumentWiper.java
----------------------------------------------------------------------
diff --git a/ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/archive/DocumentWiper.java b/ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/archive/DocumentWiper.java
new file mode 100644
index 0000000..2b2a355
--- /dev/null
+++ b/ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/archive/DocumentWiper.java
@@ -0,0 +1,23 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.ambari.infra.job.archive;
+
+public interface DocumentWiper {
+  void delete(Document firstDocument, Document lastDocument);
+}

http://git-wip-us.apache.org/repos/asf/ambari/blob/a85ff23b/ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/archive/FileAction.java
----------------------------------------------------------------------
diff --git a/ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/archive/FileAction.java b/ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/archive/FileAction.java
index d3f2a65..26a8c63 100644
--- a/ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/archive/FileAction.java
+++ b/ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/archive/FileAction.java
@@ -21,5 +21,5 @@ package org.apache.ambari.infra.job.archive;
 import java.io.File;
 
 public interface FileAction {
-  File perform(File inputFile, boolean deleteInput);
+  File perform(File inputFile);
 }

http://git-wip-us.apache.org/repos/asf/ambari/blob/a85ff23b/ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/archive/FileNameSuffixFormatter.java
----------------------------------------------------------------------
diff --git a/ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/archive/FileNameSuffixFormatter.java b/ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/archive/FileNameSuffixFormatter.java
new file mode 100644
index 0000000..af48ab9
--- /dev/null
+++ b/ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/archive/FileNameSuffixFormatter.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.ambari.infra.job.archive;
+
+import java.time.OffsetDateTime;
+import java.time.format.DateTimeFormatter;
+
+import static org.apache.ambari.infra.job.archive.SolrDocumentIterator.SOLR_DATE_FORMAT_TEXT;
+import static org.apache.commons.lang.StringUtils.isBlank;
+
+public class FileNameSuffixFormatter {
+  private static final DateTimeFormatter SOLR_DATETIME_FORMATTER = DateTimeFormatter.ofPattern(SOLR_DATE_FORMAT_TEXT);
+
+  public static FileNameSuffixFormatter from(DocumentExportProperties properties) {
+    return new FileNameSuffixFormatter(properties.getFileNameSuffixColumn(), properties.getFileNameSuffixDateFormat());
+  }
+
+
+  private final String columnName;
+
+  private final DateTimeFormatter dateFormat;
+
+  public FileNameSuffixFormatter(String columnName, String dateTimeFormat) {
+    this.columnName = columnName;
+    dateFormat = isBlank(dateTimeFormat) ? null : DateTimeFormatter.ofPattern(dateTimeFormat);
+  }
+
+  public String format(Document document) {
+    if (document == null)
+      throw new NullPointerException("Can not format file name suffix: input document is null!");
+
+    if (isBlank(document.get(columnName)))
+      throw new IllegalArgumentException("The specified document does not have a column " + columnName + " or it's value is blank!");
+
+    if (dateFormat == null)
+      return document.get(columnName);
+    OffsetDateTime date = OffsetDateTime.parse(document.get(columnName), SOLR_DATETIME_FORMATTER);
+    return date.format(dateFormat);
+  }
+}

http://git-wip-us.apache.org/repos/asf/ambari/blob/a85ff23b/ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/archive/ItemWriterListener.java
----------------------------------------------------------------------
diff --git a/ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/archive/ItemWriterListener.java b/ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/archive/ItemWriterListener.java
index 7427771..33a67cb 100644
--- a/ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/archive/ItemWriterListener.java
+++ b/ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/archive/ItemWriterListener.java
@@ -18,8 +18,6 @@
  */
 package org.apache.ambari.infra.job.archive;
 
-import java.io.File;
-
 public interface ItemWriterListener {
-  void onCompleted(File file);
+  void onCompleted(WriteCompletedEvent event);
 }

http://git-wip-us.apache.org/repos/asf/ambari/blob/a85ff23b/ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/archive/LocalDocumentItemWriter.java
----------------------------------------------------------------------
diff --git a/ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/archive/LocalDocumentItemWriter.java b/ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/archive/LocalDocumentItemWriter.java
index baad61b..531d2d5 100644
--- a/ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/archive/LocalDocumentItemWriter.java
+++ b/ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/archive/LocalDocumentItemWriter.java
@@ -20,16 +20,22 @@ package org.apache.ambari.infra.job.archive;
 
 import com.fasterxml.jackson.databind.ObjectMapper;
 import org.apache.commons.io.IOUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.io.*;
 
 public class LocalDocumentItemWriter implements DocumentItemWriter {
+  private static final Logger LOG = LoggerFactory.getLogger(LocalDocumentItemWriter.class);
+
   private static final ObjectMapper json = new ObjectMapper();
   private static final String ENCODING = "UTF-8";
 
   private final File outFile;
   private final BufferedWriter bufferedWriter;
   private final ItemWriterListener itemWriterListener;
+  private Document firstDocument = null;
+  private Document lastDocument = null;
 
   public LocalDocumentItemWriter(File outFile, ItemWriterListener itemWriterListener) {
     this.itemWriterListener = itemWriterListener;
@@ -48,6 +54,11 @@ public class LocalDocumentItemWriter implements DocumentItemWriter {
     try {
       bufferedWriter.write(json.writeValueAsString(document));
       bufferedWriter.newLine();
+
+      if (firstDocument == null)
+        firstDocument = document;
+
+      lastDocument = document;
     }
     catch (IOException e) {
       throw new UncheckedIOException(e);
@@ -57,14 +68,16 @@ public class LocalDocumentItemWriter implements DocumentItemWriter {
   @Override
   public void revert() {
     IOUtils.closeQuietly(bufferedWriter);
-    outFile.delete();
+    if (!outFile.delete())
+      LOG.warn("File {} was not deleted. Exists: {}", outFile.getAbsolutePath(), outFile.exists());
   }
 
   @Override
   public void close() {
     try {
       bufferedWriter.close();
-      itemWriterListener.onCompleted(outFile);
+      if (itemWriterListener != null)
+        itemWriterListener.onCompleted(new WriteCompletedEvent(outFile, firstDocument, lastDocument));
     } catch (IOException e) {
       throw new UncheckedIOException(e);
     }

http://git-wip-us.apache.org/repos/asf/ambari/blob/a85ff23b/ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/archive/LocalItemWriterListener.java
----------------------------------------------------------------------
diff --git a/ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/archive/LocalItemWriterListener.java b/ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/archive/LocalItemWriterListener.java
new file mode 100644
index 0000000..a24d524
--- /dev/null
+++ b/ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/archive/LocalItemWriterListener.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.ambari.infra.job.archive;
+
+public class LocalItemWriterListener implements ItemWriterListener {
+  private final FileAction fileAction;
+  private final DocumentWiper documentWiper;
+
+  public LocalItemWriterListener(FileAction fileAction, DocumentWiper documentWiper) {
+    this.fileAction = fileAction;
+    this.documentWiper = documentWiper;
+  }
+
+
+  @Override
+  public void onCompleted(WriteCompletedEvent event) {
+    fileAction.perform(event.getOutFile());
+    documentWiper.delete(event.getFirstDocument(), event.getLastDocument());
+  }
+}

http://git-wip-us.apache.org/repos/asf/ambari/blob/a85ff23b/ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/archive/S3Properties.java
----------------------------------------------------------------------
diff --git a/ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/archive/S3Properties.java b/ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/archive/S3Properties.java
index 0979f10..88b71cf 100644
--- a/ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/archive/S3Properties.java
+++ b/ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/archive/S3Properties.java
@@ -18,12 +18,14 @@
  */
 package org.apache.ambari.infra.job.archive;
 
+import static org.apache.commons.lang.StringUtils.isBlank;
+
 public class S3Properties {
-  private String s3AccessKey;
-  private String s3SecretKey;
-  private String s3KeyPrefix;
-  private String s3BucketName;
-  private String s3EndPoint;
+  private final String s3AccessKey;
+  private final String s3SecretKey;
+  private final String s3KeyPrefix;
+  private final String s3BucketName;
+  private final String s3EndPoint;
 
   public S3Properties(String s3AccessKey, String s3SecretKey, String s3KeyPrefix, String s3BucketName, String s3EndPoint) {
     this.s3AccessKey = s3AccessKey;
@@ -62,4 +64,15 @@ public class S3Properties {
             ", s3EndPoint='" + s3EndPoint + '\'' +
             '}';
   }
+
+  public void validate() {
+    if (isBlank(s3AccessKey))
+      throw new IllegalArgumentException("The property s3AccessKey can not be null or empty string!");
+
+    if (isBlank(s3SecretKey))
+      throw new IllegalArgumentException("The property s3SecretKey can not be null or empty string!");
+
+    if (isBlank(s3BucketName))
+      throw new IllegalArgumentException("The property s3BucketName can not be null or empty string!");
+  }
 }

http://git-wip-us.apache.org/repos/asf/ambari/blob/a85ff23b/ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/archive/S3Uploader.java
----------------------------------------------------------------------
diff --git a/ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/archive/S3Uploader.java b/ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/archive/S3Uploader.java
index deeb9c7..0ab68ed 100644
--- a/ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/archive/S3Uploader.java
+++ b/ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/archive/S3Uploader.java
@@ -29,7 +29,7 @@ import static org.apache.commons.lang.StringUtils.isBlank;
  */
 public class S3Uploader extends AbstractFileAction {
 
-  private static final Logger LOG = LoggerFactory.getLogger(DocumentExportConfiguration.class);
+  private static final Logger LOG = LoggerFactory.getLogger(S3Uploader.class);
 
   private final AmazonS3Client client;
   private final String keyPrefix;
@@ -49,7 +49,7 @@ public class S3Uploader extends AbstractFileAction {
   }
 
   @Override
-  public File perform(File inputFile) {
+  public File onPerform(File inputFile) {
     String key = keyPrefix + inputFile.getName();
 
     if (client.doesObjectExist(bucketName, key)) {

http://git-wip-us.apache.org/repos/asf/ambari/blob/a85ff23b/ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/archive/SolrDAO.java
----------------------------------------------------------------------
diff --git a/ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/archive/SolrDAO.java b/ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/archive/SolrDAO.java
new file mode 100644
index 0000000..fba08e7
--- /dev/null
+++ b/ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/archive/SolrDAO.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.ambari.infra.job.archive;
+
+import org.apache.ambari.infra.job.SolrDAOBase;
+import org.apache.solr.client.solrj.SolrQuery;
+import org.apache.solr.client.solrj.SolrServerException;
+import org.apache.solr.client.solrj.impl.CloudSolrClient;
+import org.apache.solr.client.solrj.response.QueryResponse;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.io.UncheckedIOException;
+
+public class SolrDAO extends SolrDAOBase implements DocumentWiper {
+  private static final Logger LOG = LoggerFactory.getLogger(SolrDAO.class);
+
+  private final SolrProperties queryProperties;
+
+  public SolrDAO(SolrProperties queryProperties) {
+    super(queryProperties.getZooKeeperConnectionString(), queryProperties.getCollection());
+    this.queryProperties = queryProperties;
+  }
+
+  @Override
+  public void delete(Document firstDocument, Document lastDocument) {
+    delete(new SolrParametrizedString(queryProperties.getDeleteQueryText())
+            .set("start", firstDocument.getFieldMap())
+            .set("end", lastDocument.getFieldMap()).toString());
+  }
+
+  public SolrDocumentIterator query(String start, String end, Document subIntervalFrom, int rows) {
+    SolrQuery query = queryProperties.toQueryBuilder()
+            .setInterval(start, end)
+            .setDocument(subIntervalFrom)
+            .build();
+    query.setRows(rows);
+
+    LOG.info("Executing solr query {}", query.toLocalParamsString());
+
+    try {
+      CloudSolrClient client = createClient();
+      QueryResponse response = client.query(query);
+      return new SolrDocumentIterator(response, client);
+    } catch (SolrServerException e) {
+      throw new RuntimeException(e);
+    } catch (IOException e) {
+      throw new UncheckedIOException(e);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/ambari/blob/a85ff23b/ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/archive/SolrDocumentIterator.java
----------------------------------------------------------------------
diff --git a/ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/archive/SolrDocumentIterator.java b/ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/archive/SolrDocumentIterator.java
index 2e7341d..f8d8382 100644
--- a/ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/archive/SolrDocumentIterator.java
+++ b/ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/archive/SolrDocumentIterator.java
@@ -34,7 +34,8 @@ import java.util.TimeZone;
 
 public class SolrDocumentIterator implements CloseableIterator<Document> {
 
-  private static final DateFormat SOLR_DATE_FORMAT = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSSX");
+  public static final String SOLR_DATE_FORMAT_TEXT = "yyyy-MM-dd'T'HH:mm:ss.SSSX";
+  private static final DateFormat SOLR_DATE_FORMAT = new SimpleDateFormat(SOLR_DATE_FORMAT_TEXT);
 
   static {
     SOLR_DATE_FORMAT.setTimeZone(TimeZone.getTimeZone("UTC"));

http://git-wip-us.apache.org/repos/asf/ambari/blob/a85ff23b/ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/archive/SolrDocumentSource.java
----------------------------------------------------------------------
diff --git a/ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/archive/SolrDocumentSource.java b/ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/archive/SolrDocumentSource.java
index 5ded9ac..39ddd1e 100644
--- a/ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/archive/SolrDocumentSource.java
+++ b/ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/archive/SolrDocumentSource.java
@@ -20,53 +20,20 @@ package org.apache.ambari.infra.job.archive;
 
 import org.apache.ambari.infra.job.CloseableIterator;
 import org.apache.ambari.infra.job.ObjectSource;
-import org.apache.solr.client.solrj.SolrQuery;
-import org.apache.solr.client.solrj.SolrServerException;
-import org.apache.solr.client.solrj.impl.CloudSolrClient;
-import org.apache.solr.client.solrj.response.QueryResponse;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-import java.io.UncheckedIOException;
-import java.time.format.DateTimeFormatter;
 
 public class SolrDocumentSource implements ObjectSource<Document> {
-  public static final DateTimeFormatter SOLR_DATETIME_FORMATTER = DateTimeFormatter.ofPattern("yyyy-MM-dd'T'HH:mm:ss.SSSX");
-  private static final Logger LOG = LoggerFactory.getLogger(SolrDocumentSource.class);
-
-  private final String zkConnectionString;
-  private final SolrQueryProperties properties;
+  private final SolrDAO solrDAO;
   private final String start;
   private final String end;
 
-  public SolrDocumentSource(String zkConnectionString, SolrQueryProperties properties, String start, String end) {
-    this.zkConnectionString = zkConnectionString;
-    this.properties = properties;
+  public SolrDocumentSource(SolrDAO solrDAO, String start, String end) {
+    this.solrDAO = solrDAO;
     this.start = start;
     this.end = end;
   }
 
   @Override
   public CloseableIterator<Document> open(Document current, int rows) {
-    CloudSolrClient client = new CloudSolrClient.Builder().withZkHost(zkConnectionString).build();
-    client.setDefaultCollection(properties.getCollection());
-
-    SolrQuery query = properties.toQueryBuilder()
-            .setInterval(start, end)
-            .setDocument(current)
-            .build();
-    query.setRows(rows);
-
-    LOG.info("Executing solr query {}", query.toLocalParamsString());
-
-    try {
-      QueryResponse response = client.query(query);
-      return new SolrDocumentIterator(response, client);
-    } catch (SolrServerException e) {
-      throw new RuntimeException(e);
-    } catch (IOException e) {
-      throw new UncheckedIOException(e);
-    }
+    return solrDAO.query(start, end, current, rows);
   }
 }

http://git-wip-us.apache.org/repos/asf/ambari/blob/a85ff23b/ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/archive/SolrParametrizedString.java
----------------------------------------------------------------------
diff --git a/ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/archive/SolrParametrizedString.java b/ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/archive/SolrParametrizedString.java
new file mode 100644
index 0000000..9770982
--- /dev/null
+++ b/ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/archive/SolrParametrizedString.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.ambari.infra.job.archive;
+
+import org.apache.solr.client.solrj.util.ClientUtils;
+
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+public class SolrParametrizedString {
+  private static final String PARAMETER_PATTERN = "\\$\\{%s[a-z0-9A-Z]+}";
+  private static final Pattern NO_PREFIX_PARAMETER_PATTERN = Pattern.compile(String.format(PARAMETER_PATTERN, ""));
+
+  private final String string;
+
+  public SolrParametrizedString(String string) {
+    this.string = string;
+  }
+
+  private Set<String> collectParamNames(Pattern regExPattern) {
+    Matcher matcher = regExPattern.matcher(string);
+    Set<String> parameters = new HashSet<>();
+    while (matcher.find())
+      parameters.add(matcher.group().replace("${", "").replace("}", ""));
+    return parameters;
+  }
+
+  @Override
+  public String toString() {
+    return string;
+  }
+
+  public SolrParametrizedString set(Map<String, String> parameterMap) {
+    return set(NO_PREFIX_PARAMETER_PATTERN, null, parameterMap);
+  }
+
+  public SolrParametrizedString set(String prefix, Map<String, String> parameterMap) {
+    String dottedPrefix = prefix + ".";
+    return set(Pattern.compile(String.format(PARAMETER_PATTERN, dottedPrefix)), dottedPrefix, parameterMap);
+  }
+
+  private SolrParametrizedString set(Pattern regExPattern, String prefix, Map<String, String> parameterMap) {
+    String newString = string;
+    for (String paramName : collectParamNames(regExPattern)) {
+      String paramSuffix = prefix == null ? paramName : paramName.replace(prefix, "");
+      if (parameterMap.get(paramSuffix) != null)
+        newString = newString.replace(String.format("${%s}", paramName), getValue(parameterMap, paramSuffix));
+    }
+    return new SolrParametrizedString(newString);
+  }
+
+  private String getValue(Map<String, String> parameterMap, String paramSuffix) {
+    String value = parameterMap.get(paramSuffix);
+    if ("*".equals(value))
+      return value;
+    return ClientUtils.escapeQueryChars(value);
+  }
+}

http://git-wip-us.apache.org/repos/asf/ambari/blob/a85ff23b/ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/archive/SolrProperties.java
----------------------------------------------------------------------
diff --git a/ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/archive/SolrProperties.java b/ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/archive/SolrProperties.java
new file mode 100644
index 0000000..a2a78c2
--- /dev/null
+++ b/ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/archive/SolrProperties.java
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.ambari.infra.job.archive;
+
+import org.springframework.batch.core.JobParameters;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import static org.apache.commons.lang.StringUtils.isBlank;
+
+public class SolrProperties {
+  private String zooKeeperConnectionString;
+  private String collection;
+  private String queryText;
+  private String filterQueryText;
+  private String[] sortColumn;
+  private String deleteQueryText;
+
+  public String getZooKeeperConnectionString() {
+    return zooKeeperConnectionString;
+  }
+
+  public void setZooKeeperConnectionString(String zooKeeperConnectionString) {
+    this.zooKeeperConnectionString = zooKeeperConnectionString;
+  }
+
+  public String getCollection() {
+    return collection;
+  }
+
+  public void setCollection(String collection) {
+    this.collection = collection;
+  }
+
+  public String getQueryText() {
+    return queryText;
+  }
+
+  public void setQueryText(String queryText) {
+    this.queryText = queryText;
+  }
+
+  public String getFilterQueryText() {
+    return filterQueryText;
+  }
+
+  public void setFilterQueryText(String filterQueryText) {
+    this.filterQueryText = filterQueryText;
+  }
+
+  public String[] getSortColumn() {
+    return sortColumn;
+  }
+
+  public void setSortColumn(String[] sortColumn) {
+    this.sortColumn = sortColumn;
+  }
+
+  public String getDeleteQueryText() {
+    return deleteQueryText;
+  }
+
+  public void setDeleteQueryText(String deleteQueryText) {
+    this.deleteQueryText = deleteQueryText;
+  }
+
+  public SolrQueryBuilder toQueryBuilder() {
+    return new SolrQueryBuilder().
+            setQueryText(queryText)
+            .setFilterQueryText(filterQueryText)
+            .addSort(sortColumn);
+  }
+
+  public void apply(JobParameters jobParameters) {
+    zooKeeperConnectionString = jobParameters.getString("zooKeeperConnectionString", zooKeeperConnectionString);
+    collection = jobParameters.getString("collection", collection);
+    queryText = jobParameters.getString("queryText", queryText);
+    filterQueryText = jobParameters.getString("filterQueryText", filterQueryText);
+    deleteQueryText = jobParameters.getString("deleteQueryText", deleteQueryText);
+
+    String sortValue;
+    List<String> sortColumns = new ArrayList<>();
+    int i = 0;
+    while ((sortValue = jobParameters.getString(String.format("sortColumn[%d]", i))) != null) {
+      sortColumns.add(sortValue);
+      ++i;
+    }
+
+    if (sortColumns.size() > 0)
+      sortColumn = sortColumns.toArray(new String[sortColumns.size()]);
+  }
+
+  public void validate() {
+    if (isBlank(zooKeeperConnectionString))
+      throw new IllegalArgumentException("The property zooKeeperConnectionString can not be null or empty string!");
+
+    if (isBlank(collection))
+      throw new IllegalArgumentException("The property collection can not be null or empty string!");
+  }
+}