You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ambari.apache.org by nc...@apache.org on 2017/11/30 14:25:19 UTC

[25/50] [abbrv] ambari git commit: AMBARI-22514. Initial implementation of Schedulable document deletion & archiving for Infra Solr (Krisztian Kasa via oleewere)

AMBARI-22514. Initial implementation of Schedulable document deletion & archiving for Infra Solr (Krisztian Kasa via oleewere)


Project: http://git-wip-us.apache.org/repos/asf/ambari/repo
Commit: http://git-wip-us.apache.org/repos/asf/ambari/commit/393fdb80
Tree: http://git-wip-us.apache.org/repos/asf/ambari/tree/393fdb80
Diff: http://git-wip-us.apache.org/repos/asf/ambari/diff/393fdb80

Branch: refs/heads/branch-feature-AMBARI-21674
Commit: 393fdb8048ff579e8a55cd1b477a23d1bf105576
Parents: 2bf3c8e
Author: Krisztian Kasa <ka...@gmail.com>
Authored: Tue Nov 28 15:45:22 2017 +0100
Committer: Oliver Szabo <ol...@gmail.com>
Committed: Tue Nov 28 15:47:59 2017 +0100

----------------------------------------------------------------------
 ambari-infra/ambari-infra-manager/pom.xml       |  11 ++
 .../infra/job/archive/CompositeFileAction.java  |  46 +++++
 .../ambari/infra/job/archive/Document.java      |  54 +++++
 .../infra/job/archive/DocumentDestination.java  |  23 +++
 .../archive/DocumentExportConfiguration.java    | 118 +++++++++++
 .../job/archive/DocumentExportJobListener.java  |  35 ++++
 .../job/archive/DocumentExportProperties.java   | 112 +++++++++++
 .../job/archive/DocumentExportStepListener.java |  47 +++++
 .../infra/job/archive/DocumentExporter.java     |  99 ++++++++++
 .../infra/job/archive/DocumentItemReader.java   | 135 +++++++++++++
 .../infra/job/archive/DocumentItemWriter.java   |  25 +++
 .../infra/job/archive/DocumentIterator.java     |  25 +++
 .../infra/job/archive/DocumentSource.java       |  24 +++
 .../ambari/infra/job/archive/FileAction.java    |  25 +++
 .../job/archive/LocalDocumentItemWriter.java    |  72 +++++++
 .../ambari/infra/job/archive/S3Properties.java  |  64 ++++++
 .../ambari/infra/job/archive/S3Uploader.java    |  51 +++++
 .../infra/job/archive/SolrDocumentIterator.java |  90 +++++++++
 .../infra/job/archive/SolrDocumentSource.java   |  68 +++++++
 .../infra/job/archive/SolrQueryBuilder.java     | 115 +++++++++++
 .../infra/job/archive/SolrQueryProperties.java  |  69 +++++++
 .../infra/job/archive/TarGzCompressor.java      |  50 +++++
 .../apache/ambari/infra/manager/JobManager.java |  21 +-
 .../src/main/resources/infra-manager.properties |  12 ++
 .../src/main/resources/log4j2.xml               |   2 +-
 .../infra/job/archive/DocumentExporterTest.java | 147 ++++++++++++++
 .../job/archive/DocumentItemReaderTest.java     | 197 +++++++++++++++++++
 .../archive/LocalDocumentItemWriterTest.java    |  98 +++++++++
 .../infra/job/archive/SolrQueryBuilderTest.java | 113 +++++++++++
 .../test-config/logfeeder/logfeeder.properties  |   2 +-
 30 files changed, 1940 insertions(+), 10 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ambari/blob/393fdb80/ambari-infra/ambari-infra-manager/pom.xml
----------------------------------------------------------------------
diff --git a/ambari-infra/ambari-infra-manager/pom.xml b/ambari-infra/ambari-infra-manager/pom.xml
index aa86da8..67bf7d1 100644
--- a/ambari-infra/ambari-infra-manager/pom.xml
+++ b/ambari-infra/ambari-infra-manager/pom.xml
@@ -141,6 +141,12 @@
       <version>3.4</version>
       <scope>test</scope>
     </dependency>
+    <dependency>
+      <groupId>org.hamcrest</groupId>
+      <artifactId>hamcrest-all</artifactId>
+      <version>1.3</version>
+      <scope>test</scope>
+    </dependency>
     <!-- Spring dependencies -->
     <dependency>
       <groupId>org.springframework</groupId>
@@ -417,6 +423,11 @@
       <groupId>com.google.guava</groupId>
       <version>20.0</version>
     </dependency>
+    <dependency>
+      <groupId>com.amazonaws</groupId>
+      <artifactId>aws-java-sdk-s3</artifactId>
+      <version>1.11.5</version>
+    </dependency>
   </dependencies>
 
 </project>

http://git-wip-us.apache.org/repos/asf/ambari/blob/393fdb80/ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/archive/CompositeFileAction.java
----------------------------------------------------------------------
diff --git a/ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/archive/CompositeFileAction.java b/ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/archive/CompositeFileAction.java
new file mode 100644
index 0000000..84ce160
--- /dev/null
+++ b/ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/archive/CompositeFileAction.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.ambari.infra.job.archive;
+
+import java.io.File;
+import java.util.List;
+
+import static java.util.Arrays.asList;
+
+public class CompositeFileAction implements FileAction {
+
+  private final List<FileAction> actions;
+
+  public CompositeFileAction(FileAction... actions) {
+    this.actions = asList(actions);
+  }
+
+  public void add(FileAction action) {
+    actions.add(action);
+  }
+
+  @Override
+  public File perform(File inputFile) {
+    File file = inputFile;
+    for (FileAction action : actions) {
+      file = action.perform(file);
+    }
+    return file;
+  }
+}

http://git-wip-us.apache.org/repos/asf/ambari/blob/393fdb80/ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/archive/Document.java
----------------------------------------------------------------------
diff --git a/ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/archive/Document.java b/ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/archive/Document.java
new file mode 100644
index 0000000..84f5ece
--- /dev/null
+++ b/ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/archive/Document.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.ambari.infra.job.archive;
+
+import com.fasterxml.jackson.annotation.JsonAnyGetter;
+import com.fasterxml.jackson.annotation.JsonAnySetter;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import static java.util.Collections.unmodifiableMap;
+
+// TODO: create entities for each solr collections
+public class Document {
+  private final Map<String, String> fieldMap;
+
+  private Document() {
+    fieldMap = new HashMap<>();
+  }
+
+  public Document(Map<String, String> fieldMap) {
+    this.fieldMap = unmodifiableMap(fieldMap);
+  }
+
+  public String get(String key) {
+    return fieldMap.get(key);
+  }
+
+  @JsonAnyGetter
+  private Map<String, String> getFieldMap() {
+    return fieldMap;
+  }
+
+  @JsonAnySetter
+  private void put(String key, String value) {
+    fieldMap.put(key, value);
+  }
+}

http://git-wip-us.apache.org/repos/asf/ambari/blob/393fdb80/ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/archive/DocumentDestination.java
----------------------------------------------------------------------
diff --git a/ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/archive/DocumentDestination.java b/ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/archive/DocumentDestination.java
new file mode 100644
index 0000000..f647a36
--- /dev/null
+++ b/ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/archive/DocumentDestination.java
@@ -0,0 +1,23 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.ambari.infra.job.archive;
+
+public interface DocumentDestination {
+  DocumentItemWriter open(Document firstDocument);
+}

http://git-wip-us.apache.org/repos/asf/ambari/blob/393fdb80/ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/archive/DocumentExportConfiguration.java
----------------------------------------------------------------------
diff --git a/ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/archive/DocumentExportConfiguration.java b/ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/archive/DocumentExportConfiguration.java
new file mode 100644
index 0000000..69f41d3
--- /dev/null
+++ b/ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/archive/DocumentExportConfiguration.java
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.ambari.infra.job.archive;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.batch.core.Job;
+import org.springframework.batch.core.Step;
+import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
+import org.springframework.batch.core.configuration.annotation.JobScope;
+import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
+import org.springframework.batch.core.configuration.annotation.StepScope;
+import org.springframework.beans.factory.annotation.Qualifier;
+import org.springframework.beans.factory.annotation.Value;
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
+
+import javax.inject.Inject;
+import java.io.File;
+import java.nio.file.Paths;
+import java.time.OffsetDateTime;
+import java.time.ZoneOffset;
+import java.time.format.DateTimeFormatter;
+
+import static org.apache.ambari.infra.job.archive.SolrDocumentSource.SOLR_DATETIME_FORMATTER;
+import static org.apache.commons.lang.StringUtils.isBlank;
+
+@Configuration
+public class DocumentExportConfiguration {
+  private static final Logger LOG = LoggerFactory.getLogger(DocumentExportConfiguration.class);
+  private static final DateTimeFormatter FILENAME_DATETIME_FORMATTER = DateTimeFormatter.ofPattern("yyyy-MM-dd'T'HH_mm_ss.SSSX");
+
+  @Inject
+  private DocumentExportProperties properties;
+
+  @Inject
+  private StepBuilderFactory steps;
+
+  @Inject
+  private JobBuilderFactory jobs;
+
+
+
+  @Bean
+  public Job logExportJob(@Qualifier("exportStep") Step logExportStep) {
+    return jobs.get("solr_data_export").listener(new DocumentExportJobListener()).start(logExportStep).build();
+  }
+
+  @Bean
+  @JobScope
+  public Step exportStep(DocumentExporter documentExporter) {
+    return steps.get("export")
+            .tasklet(documentExporter)
+            .listener(new DocumentExportStepListener(properties))
+            .build();
+  }
+
+  @Bean
+  @StepScope
+  public DocumentExporter getDocumentExporter(DocumentItemReader documentItemReader,
+                                              @Value("#{stepExecution.jobExecution.id}") String jobId) {
+    File path = Paths.get(
+            properties.getDestinationDirectoryPath(),
+            String.format("%s_%s", properties.getQuery().getCollection(), jobId)).toFile(); // TODO: add end date
+    LOG.info("Destination directory path={}", path);
+    if (!path.exists()) {
+      if (!path.mkdirs()) {
+        LOG.warn("Unable to create directory {}", path);
+      }
+    }
+
+    CompositeFileAction fileAction = new CompositeFileAction(new TarGzCompressor());
+
+    return new DocumentExporter(
+            documentItemReader,
+            firstDocument -> new LocalDocumentItemWriter(
+                    new File(path, String.format("%s_-_%s.json",
+                            properties.getQuery().getCollection(),
+                            firstDocument.get(properties.getFileNameSuffixColumn()))),
+                    fileAction),
+            properties.getWriteBlockSize());
+  }
+
+  @Bean
+  @StepScope
+  public DocumentItemReader reader(DocumentSource documentSource) {
+    return new DocumentItemReader(documentSource, properties.getReadBlockSize());
+  }
+
+  @Bean
+  @StepScope
+  public DocumentSource logSource(@Value("#{jobParameters[endDate]}") String endDateText) {
+    OffsetDateTime endDate = OffsetDateTime.now(ZoneOffset.UTC);
+    if (!isBlank(endDateText))
+      endDate = OffsetDateTime.parse(endDateText);
+
+    return new SolrDocumentSource(
+            properties.getZooKeeperSocket(),
+            properties.getQuery(),
+            SOLR_DATETIME_FORMATTER.format(endDate));
+  }
+}

http://git-wip-us.apache.org/repos/asf/ambari/blob/393fdb80/ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/archive/DocumentExportJobListener.java
----------------------------------------------------------------------
diff --git a/ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/archive/DocumentExportJobListener.java b/ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/archive/DocumentExportJobListener.java
new file mode 100644
index 0000000..f1df46c
--- /dev/null
+++ b/ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/archive/DocumentExportJobListener.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.ambari.infra.job.archive;
+
+import org.springframework.batch.core.ExitStatus;
+import org.springframework.batch.core.JobExecution;
+import org.springframework.batch.core.JobExecutionListener;
+
+public class DocumentExportJobListener implements JobExecutionListener {
+  @Override
+  public void beforeJob(JobExecution jobExecution) {
+
+  }
+
+  @Override
+  public void afterJob(JobExecution jobExecution) {
+    jobExecution.setExitStatus(new ExitStatus(ExitStatus.COMPLETED.getExitCode()));
+  }
+}

http://git-wip-us.apache.org/repos/asf/ambari/blob/393fdb80/ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/archive/DocumentExportProperties.java
----------------------------------------------------------------------
diff --git a/ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/archive/DocumentExportProperties.java b/ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/archive/DocumentExportProperties.java
new file mode 100644
index 0000000..d6301c0
--- /dev/null
+++ b/ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/archive/DocumentExportProperties.java
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.ambari.infra.job.archive;
+
+import org.hibernate.validator.constraints.NotBlank;
+import org.springframework.batch.core.JobParameters;
+import org.springframework.boot.context.properties.ConfigurationProperties;
+import org.springframework.context.annotation.Configuration;
+import org.springframework.context.annotation.PropertySource;
+
+import javax.validation.constraints.Min;
+
+import static org.apache.commons.lang.StringUtils.isBlank;
+
+@Configuration
+@PropertySource(value = {"classpath:infra-manager.properties"})
+@ConfigurationProperties(prefix = "infra-manager.jobs.solr_data_export")
+public class DocumentExportProperties {
+  @NotBlank
+  private String zooKeeperSocket;
+  @Min(1)
+  private int readBlockSize;
+  @Min(1)
+  private int writeBlockSize;
+  @NotBlank
+  private String destinationDirectoryPath;
+  @NotBlank
+  private String fileNameSuffixColumn;
+  private SolrQueryProperties query;
+
+  public String getZooKeeperSocket() {
+    return zooKeeperSocket;
+  }
+
+  public void setZooKeeperSocket(String zooKeeperSocket) {
+    this.zooKeeperSocket = zooKeeperSocket;
+  }
+
+  public int getReadBlockSize() {
+    return readBlockSize;
+  }
+
+  public void setReadBlockSize(int readBlockSize) {
+    this.readBlockSize = readBlockSize;
+  }
+
+  public int getWriteBlockSize() {
+    return writeBlockSize;
+  }
+
+  public void setWriteBlockSize(int writeBlockSize) {
+    this.writeBlockSize = writeBlockSize;
+  }
+
+  public String getDestinationDirectoryPath() {
+    return destinationDirectoryPath;
+  }
+
+  public void setDestinationDirectoryPath(String destinationDirectoryPath) {
+    this.destinationDirectoryPath = destinationDirectoryPath;
+  }
+
+  public void apply(JobParameters jobParameters) {
+    // TODO: solr query params
+    zooKeeperSocket = jobParameters.getString("zooKeeperSocket", zooKeeperSocket);
+    readBlockSize = getIntJobParameter(jobParameters, "readBlockSize", readBlockSize);
+    writeBlockSize = getIntJobParameter(jobParameters, "writeBlockSize", writeBlockSize);
+    destinationDirectoryPath = jobParameters.getString("destinationDirectoryPath", destinationDirectoryPath);
+    query.setCollection(jobParameters.getString("collection", query.getCollection()));
+    query.setQueryText(jobParameters.getString("queryText", query.getQueryText()));
+    query.setFilterQueryText(jobParameters.getString("filterQueryText", query.getFilterQueryText()));
+  }
+
+  private int getIntJobParameter(JobParameters jobParameters, String parameterName, int defaultValue) {
+    String writeBlockSizeText = jobParameters.getString(parameterName);
+    if (isBlank(writeBlockSizeText))
+      return defaultValue;
+    return this.writeBlockSize = Integer.parseInt(writeBlockSizeText);
+  }
+
+  public String getFileNameSuffixColumn() {
+    return fileNameSuffixColumn;
+  }
+
+  public void setFileNameSuffixColumn(String fileNameSuffixColumn) {
+    this.fileNameSuffixColumn = fileNameSuffixColumn;
+  }
+
+  public SolrQueryProperties getQuery() {
+    return query;
+  }
+
+  public void setQuery(SolrQueryProperties query) {
+    this.query = query;
+  }
+}

http://git-wip-us.apache.org/repos/asf/ambari/blob/393fdb80/ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/archive/DocumentExportStepListener.java
----------------------------------------------------------------------
diff --git a/ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/archive/DocumentExportStepListener.java b/ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/archive/DocumentExportStepListener.java
new file mode 100644
index 0000000..3bab6d5
--- /dev/null
+++ b/ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/archive/DocumentExportStepListener.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.ambari.infra.job.archive;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.batch.core.ExitStatus;
+import org.springframework.batch.core.StepExecution;
+import org.springframework.batch.core.StepExecutionListener;
+
+public class DocumentExportStepListener implements StepExecutionListener {
+  private static final Logger LOG = LoggerFactory.getLogger(DocumentExportStepListener.class);
+
+  private final DocumentExportProperties properties;
+
+  public DocumentExportStepListener(DocumentExportProperties properties) {
+    this.properties = properties;
+  }
+
+  @Override
+  public void beforeStep(StepExecution stepExecution) {
+    properties.apply(stepExecution.getJobParameters());
+    LOG.info("LogExport step - before step execution");
+  }
+
+  @Override
+  public ExitStatus afterStep(StepExecution stepExecution) {
+    LOG.info("LogExport step - after step execution");
+    return stepExecution.getExitStatus();
+  }
+}

http://git-wip-us.apache.org/repos/asf/ambari/blob/393fdb80/ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/archive/DocumentExporter.java
----------------------------------------------------------------------
diff --git a/ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/archive/DocumentExporter.java b/ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/archive/DocumentExporter.java
new file mode 100644
index 0000000..6106c20
--- /dev/null
+++ b/ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/archive/DocumentExporter.java
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.ambari.infra.job.archive;
+
+import org.springframework.batch.core.ExitStatus;
+import org.springframework.batch.core.StepContribution;
+import org.springframework.batch.core.StepExecution;
+import org.springframework.batch.core.StepExecutionListener;
+import org.springframework.batch.core.scope.context.ChunkContext;
+import org.springframework.batch.core.step.tasklet.Tasklet;
+import org.springframework.batch.item.ExecutionContext;
+import org.springframework.batch.item.ItemStreamReader;
+import org.springframework.batch.repeat.RepeatStatus;
+
+public class DocumentExporter implements Tasklet, StepExecutionListener {
+
+  private boolean complete = false;
+  private final ItemStreamReader<Document> documentReader;
+  private final DocumentDestination documentDestination;
+  private final int writeBlockSize;
+
+  public DocumentExporter(ItemStreamReader<Document> documentReader, DocumentDestination documentDestination, int writeBlockSize) {
+    this.documentReader = documentReader;
+    this.documentDestination = documentDestination;
+    this.writeBlockSize = writeBlockSize;
+  }
+
+  @Override
+  public void beforeStep(StepExecution stepExecution) {
+
+  }
+
+  @Override
+  public ExitStatus afterStep(StepExecution stepExecution) {
+    if (complete) {
+      return ExitStatus.COMPLETED;
+    }
+		else {
+      return ExitStatus.FAILED;
+    }
+  }
+
+  @Override
+  public RepeatStatus execute(StepContribution contribution, ChunkContext chunkContext) throws Exception {
+    ExecutionContext executionContext = chunkContext.getStepContext().getStepExecution().getExecutionContext();
+    documentReader.open(executionContext);
+
+    DocumentItemWriter writer = null;
+    int writtenCount = 0;
+    try {
+      Document document;
+      while ((document = documentReader.read()) != null) {
+        if (writer != null && writtenCount >= writeBlockSize) {
+          writer.close();
+          writer = null;
+          writtenCount = 0;
+          documentReader.update(executionContext);
+        }
+
+        if (writer == null)
+          writer = documentDestination.open(document);
+
+        writer.write(document);
+        ++writtenCount;
+      }
+    }
+    catch (Exception e) {
+      if (writer != null) {
+        writer.revert();
+        writer = null;
+      }
+      throw e;
+    }
+    finally {
+      if (writer != null)
+        writer.close();
+      documentReader.close();
+    }
+
+    complete = true;
+    return RepeatStatus.FINISHED;
+  }
+}

http://git-wip-us.apache.org/repos/asf/ambari/blob/393fdb80/ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/archive/DocumentItemReader.java
----------------------------------------------------------------------
diff --git a/ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/archive/DocumentItemReader.java b/ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/archive/DocumentItemReader.java
new file mode 100644
index 0000000..a4378a4
--- /dev/null
+++ b/ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/archive/DocumentItemReader.java
@@ -0,0 +1,135 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.ambari.infra.job.archive;
+
+import org.springframework.batch.item.ExecutionContext;
+import org.springframework.batch.item.ItemStreamException;
+import org.springframework.batch.item.support.AbstractItemStreamItemReader;
+import org.springframework.batch.repeat.CompletionPolicy;
+import org.springframework.batch.repeat.RepeatContext;
+import org.springframework.batch.repeat.RepeatStatus;
+import org.springframework.batch.repeat.context.RepeatContextSupport;
+import org.springframework.util.ClassUtils;
+
+public class DocumentItemReader extends AbstractItemStreamItemReader<Document> implements CompletionPolicy {
+
+  public final static String POSITION = "last-read";
+
+  private final DocumentSource documentSource;
+  private final int readBlockSize;
+
+  private DocumentIterator documentIterator = null;
+  private int count = 0;
+  private boolean eof = false;
+  private Document current = null;
+  private Document previous = null;
+
+  public DocumentItemReader(DocumentSource documentSource, int readBlockSize) {
+    this.documentSource = documentSource;
+    this.readBlockSize = readBlockSize;
+    setName(ClassUtils.getShortName(DocumentItemReader.class));
+  }
+
+  @Override
+  public Document read() throws Exception {
+    if (documentIterator == null)
+      openStream();
+    Document next = getNext();
+    if (next == null && count > readBlockSize) {
+      openStream();
+      next = getNext();
+    }
+    eof = next == null;
+    if (eof && documentIterator != null)
+      documentIterator.close();
+
+    previous = current;
+    current = next;
+    return current;
+  }
+
+  private Document getNext() {
+    ++count;
+    return documentIterator.next();
+  }
+
+  private void openStream() {
+    closeStream();
+    documentIterator = documentSource.open(current, readBlockSize);
+    count = 0;
+  }
+
+  private void closeStream() {
+    if (documentIterator == null)
+      return;
+    try {
+      documentIterator.close();
+    }
+    catch (Exception e) {
+      throw new RuntimeException(e);
+    }
+    documentIterator = null;
+  }
+
+  @Override
+  public void open(ExecutionContext executionContext) {
+    super.open(executionContext);
+    current = null;
+    previous = null;
+    eof = false;
+    documentIterator = null;
+    if (!executionContext.containsKey(POSITION))
+      return;
+
+    current = (Document) executionContext.get(POSITION);
+  }
+
+  @Override
+  public void update(ExecutionContext executionContext) throws ItemStreamException {
+    super.update(executionContext);
+    if (previous != null)
+      executionContext.put(POSITION, previous);
+  }
+
+  @Override
+  public void close() {
+    closeStream();
+  }
+
+  @Override
+  public boolean isComplete(RepeatContext context, RepeatStatus result) {
+    return eof;
+  }
+
+  @Override
+  public boolean isComplete(RepeatContext context) {
+    return eof;
+  }
+
+  @Override
+  public RepeatContext start(RepeatContext parent) {
+    return new RepeatContextSupport(parent);
+  }
+
+  @Override
+  public void update(RepeatContext context) {
+    if (eof)
+      context.setCompleteOnly();
+  }
+}

http://git-wip-us.apache.org/repos/asf/ambari/blob/393fdb80/ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/archive/DocumentItemWriter.java
----------------------------------------------------------------------
diff --git a/ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/archive/DocumentItemWriter.java b/ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/archive/DocumentItemWriter.java
new file mode 100644
index 0000000..e96f6f1
--- /dev/null
+++ b/ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/archive/DocumentItemWriter.java
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.ambari.infra.job.archive;
+
+public interface DocumentItemWriter {
+  void write(Document document);
+  void revert();
+  void close();
+}

http://git-wip-us.apache.org/repos/asf/ambari/blob/393fdb80/ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/archive/DocumentIterator.java
----------------------------------------------------------------------
diff --git a/ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/archive/DocumentIterator.java b/ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/archive/DocumentIterator.java
new file mode 100644
index 0000000..6232cfc
--- /dev/null
+++ b/ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/archive/DocumentIterator.java
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.ambari.infra.job.archive;
+
+import java.util.Iterator;
+
+// TODO: generic closeable iterator
+public interface DocumentIterator extends Iterator<Document>, AutoCloseable {
+}

http://git-wip-us.apache.org/repos/asf/ambari/blob/393fdb80/ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/archive/DocumentSource.java
----------------------------------------------------------------------
diff --git a/ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/archive/DocumentSource.java b/ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/archive/DocumentSource.java
new file mode 100644
index 0000000..c9871a3
--- /dev/null
+++ b/ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/archive/DocumentSource.java
@@ -0,0 +1,24 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.ambari.infra.job.archive;
+
+// TODO: generic object source
+public interface DocumentSource {
+  DocumentIterator open(Document current, int rows);
+}

http://git-wip-us.apache.org/repos/asf/ambari/blob/393fdb80/ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/archive/FileAction.java
----------------------------------------------------------------------
diff --git a/ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/archive/FileAction.java b/ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/archive/FileAction.java
new file mode 100644
index 0000000..26a8c63
--- /dev/null
+++ b/ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/archive/FileAction.java
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.ambari.infra.job.archive;
+
+import java.io.File;
+
+public interface FileAction {
+  File perform(File inputFile);
+}

http://git-wip-us.apache.org/repos/asf/ambari/blob/393fdb80/ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/archive/LocalDocumentItemWriter.java
----------------------------------------------------------------------
diff --git a/ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/archive/LocalDocumentItemWriter.java b/ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/archive/LocalDocumentItemWriter.java
new file mode 100644
index 0000000..02d898d
--- /dev/null
+++ b/ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/archive/LocalDocumentItemWriter.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.ambari.infra.job.archive;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.commons.io.IOUtils;
+
+import java.io.*;
+
+public class LocalDocumentItemWriter implements DocumentItemWriter {
+  private static final ObjectMapper json = new ObjectMapper();
+  private static final String ENCODING = "UTF-8";
+
+  private final File outFile;
+  private final BufferedWriter bufferedWriter;
+  private final FileAction fileAction;
+
+  public LocalDocumentItemWriter(File outFile, FileAction fileAction) {
+    this.fileAction = fileAction;
+    this.outFile = outFile;
+    try {
+      this.bufferedWriter = new BufferedWriter(new OutputStreamWriter(new FileOutputStream(outFile), ENCODING));
+    } catch (UnsupportedEncodingException e) {
+      throw new RuntimeException(e);
+    } catch (FileNotFoundException e) {
+      throw new UncheckedIOException(e);
+    }
+  }
+
+  @Override
+  public void write(Document document) {
+    try {
+      bufferedWriter.write(json.writeValueAsString(document));
+      bufferedWriter.newLine();
+    }
+    catch (IOException e) {
+      throw new UncheckedIOException(e);
+    }
+  }
+
+  @Override
+  public void revert() {
+    IOUtils.closeQuietly(bufferedWriter);
+    outFile.delete();
+  }
+
+  @Override
+  public void close() {
+    try {
+      bufferedWriter.close();
+      fileAction.perform(outFile);
+    } catch (IOException e) {
+      throw new UncheckedIOException(e);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/ambari/blob/393fdb80/ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/archive/S3Properties.java
----------------------------------------------------------------------
diff --git a/ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/archive/S3Properties.java b/ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/archive/S3Properties.java
new file mode 100644
index 0000000..495401d
--- /dev/null
+++ b/ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/archive/S3Properties.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.ambari.infra.job.archive;
+
+import org.hibernate.validator.constraints.NotBlank;
+
+public class S3Properties {
+  @NotBlank
+  private String accessKey;
+  @NotBlank
+  private String secretKey;
+  @NotBlank
+  private String keyPrefix;
+  @NotBlank
+  private String bucketName;
+
+  public String getAccessKey() {
+    return accessKey;
+  }
+
+  public String getSecretKey() {
+    return secretKey;
+  }
+
+  public String getKeyPrefix() {
+    return keyPrefix;
+  }
+
+  public String getBucketName() {
+    return bucketName;
+  }
+
+  public void setAccessKey(String accessKey) {
+    this.accessKey = accessKey;
+  }
+
+  public void setSecretKey(String secretKey) {
+    this.secretKey = secretKey;
+  }
+
+  public void setKeyPrefix(String keyPrefix) {
+    this.keyPrefix = keyPrefix;
+  }
+
+  public void setBucketName(String bucketName) {
+    this.bucketName = bucketName;
+  }
+}

http://git-wip-us.apache.org/repos/asf/ambari/blob/393fdb80/ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/archive/S3Uploader.java
----------------------------------------------------------------------
diff --git a/ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/archive/S3Uploader.java b/ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/archive/S3Uploader.java
new file mode 100644
index 0000000..3214e50
--- /dev/null
+++ b/ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/archive/S3Uploader.java
@@ -0,0 +1,51 @@
+package org.apache.ambari.infra.job.archive;
+
+import com.amazonaws.auth.BasicAWSCredentials;
+import com.amazonaws.services.s3.AmazonS3Client;
+
+import java.io.File;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+public class S3Uploader implements FileAction {
+
+  private final AmazonS3Client client;
+  private final String keyPrefix;
+  private final String bucketName;
+
+  public S3Uploader(S3Properties s3Properties) {
+    this.keyPrefix = s3Properties.getKeyPrefix();
+    this.bucketName = s3Properties.getBucketName();
+    BasicAWSCredentials credentials = new BasicAWSCredentials(s3Properties.getAccessKey(), s3Properties.getSecretKey());
+    client = new AmazonS3Client(credentials);
+  }
+
+  @Override
+  public File perform(File inputFile) {
+    String key = keyPrefix + inputFile.getName();
+
+    if (client.doesObjectExist(bucketName, key)) {
+      System.out.println("Object '" + key + "' already exists");
+      System.exit(0);
+    }
+
+    client.putObject(bucketName, key, inputFile);
+    return inputFile;
+  }
+}

http://git-wip-us.apache.org/repos/asf/ambari/blob/393fdb80/ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/archive/SolrDocumentIterator.java
----------------------------------------------------------------------
diff --git a/ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/archive/SolrDocumentIterator.java b/ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/archive/SolrDocumentIterator.java
new file mode 100644
index 0000000..db4069b
--- /dev/null
+++ b/ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/archive/SolrDocumentIterator.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.ambari.infra.job.archive;
+
+import org.apache.solr.client.solrj.impl.CloudSolrClient;
+import org.apache.solr.client.solrj.response.QueryResponse;
+import org.apache.solr.common.SolrDocument;
+
+import java.io.IOException;
+import java.io.UncheckedIOException;
+import java.text.DateFormat;
+import java.text.SimpleDateFormat;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.TimeZone;
+
+public class SolrDocumentIterator implements DocumentIterator {
+
+  private static final DateFormat SOLR_DATE_FORMAT = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSSX");
+
+  static {
+    SOLR_DATE_FORMAT.setTimeZone(TimeZone.getTimeZone("UTC"));
+  }
+
+  private final Iterator<SolrDocument> documentIterator;
+  private final CloudSolrClient client;
+
+
+  public SolrDocumentIterator(QueryResponse response, CloudSolrClient client) {
+    documentIterator = response.getResults().iterator();
+    this.client = client;
+  }
+
+  @Override
+  public Document next() {
+    if (!documentIterator.hasNext())
+      return null;
+    
+    SolrDocument document = documentIterator.next();
+    HashMap<String, String> fieldMap = new HashMap<>();
+    for (String key : document.getFieldNames()) {
+      fieldMap.put(key, toString(document.get(key)));
+    }
+
+    return new Document(fieldMap);
+  }
+
+  private String toString(Object value) {
+    if (value == null) {
+      return null;
+    }
+    else if (value instanceof Date) {
+      return SOLR_DATE_FORMAT.format(value);
+    }
+    else {
+      return value.toString();
+    }
+  }
+
+  @Override
+  public void close() {
+    try {
+      client.close();
+    } catch (IOException e) {
+      throw new UncheckedIOException(e);
+    }
+  }
+
+  @Override
+  public boolean hasNext() {
+    return documentIterator.hasNext();
+  }
+}

http://git-wip-us.apache.org/repos/asf/ambari/blob/393fdb80/ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/archive/SolrDocumentSource.java
----------------------------------------------------------------------
diff --git a/ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/archive/SolrDocumentSource.java b/ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/archive/SolrDocumentSource.java
new file mode 100644
index 0000000..2181ba3
--- /dev/null
+++ b/ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/archive/SolrDocumentSource.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.ambari.infra.job.archive;
+
+import org.apache.solr.client.solrj.SolrQuery;
+import org.apache.solr.client.solrj.SolrServerException;
+import org.apache.solr.client.solrj.impl.CloudSolrClient;
+import org.apache.solr.client.solrj.response.QueryResponse;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.io.UncheckedIOException;
+import java.time.format.DateTimeFormatter;
+
+public class SolrDocumentSource implements DocumentSource {
+  public static final DateTimeFormatter SOLR_DATETIME_FORMATTER = DateTimeFormatter.ofPattern("yyyy-MM-dd'T'HH:mm:ss.SSSX");
+  private static final Logger LOG = LoggerFactory.getLogger(SolrDocumentSource.class);
+
+  private final String zkHost;
+  private final SolrQueryProperties properties;
+  private final String endValue;
+
+  public SolrDocumentSource(String zkHost, SolrQueryProperties properties, String endValue) {
+    this.zkHost = zkHost;
+    this.properties = properties;
+    this.endValue = endValue;
+  }
+
+  @Override
+  public DocumentIterator open(Document current, int rows) {
+    CloudSolrClient client = new CloudSolrClient.Builder().withZkHost(zkHost).build();
+    client.setDefaultCollection(properties.getCollection());
+
+    SolrQuery query = properties.toQueryBuilder()
+            .setEndValue(endValue)
+            .setDocument(current)
+            .build();
+    query.setRows(rows);
+
+    LOG.info("Executing solr query {}", query.toLocalParamsString());
+
+    try {
+      QueryResponse response = client.query(query);
+      return new SolrDocumentIterator(response, client);
+    } catch (SolrServerException e) {
+      throw new RuntimeException(e);
+    } catch (IOException e) {
+      throw new UncheckedIOException(e);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/ambari/blob/393fdb80/ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/archive/SolrQueryBuilder.java
----------------------------------------------------------------------
diff --git a/ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/archive/SolrQueryBuilder.java b/ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/archive/SolrQueryBuilder.java
new file mode 100644
index 0000000..d0f6d40
--- /dev/null
+++ b/ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/archive/SolrQueryBuilder.java
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.ambari.infra.job.archive;
+
+import org.apache.solr.client.solrj.SolrQuery;
+
+import java.util.HashSet;
+import java.util.Set;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+import static org.apache.solr.client.solrj.SolrQuery.ORDER.asc;
+
+public class SolrQueryBuilder {
+
+  public static final Pattern PARAMETER_PATTERN = Pattern.compile("\\$\\{[a-z]+\\}");
+
+  private String queryText;
+  private String endValue;
+  private String filterQueryText;
+  private Document document;
+  private String[] sortFields;
+
+  public SolrQueryBuilder() {
+    this.queryText = "*:*";
+  }
+
+  public SolrQueryBuilder setQueryText(String queryText) {
+    this.queryText = queryText;
+    return this;
+  }
+
+  public SolrQueryBuilder setEndValue(String endValue) {
+    this.endValue = endValue;
+    return this;
+  }
+
+  public SolrQueryBuilder setFilterQueryText(String filterQueryText) {
+    this.filterQueryText = filterQueryText;
+    return this;
+  }
+
+
+  public SolrQueryBuilder setDocument(Document document) {
+    this.document = document;
+    return this;
+  }
+
+  public SolrQueryBuilder addSort(String... sortBy) {
+    this.sortFields = sortBy;
+    return this;
+  }
+
+  public SolrQuery build() {
+    SolrQuery solrQuery = new SolrQuery();
+
+    String query = queryText;
+    query = setEndValueOn(query);
+
+    solrQuery.setQuery(query);
+
+    if (filterQueryText != null) {
+      String filterQuery = filterQueryText;
+      filterQuery = setEndValueOn(filterQuery);
+
+      Set<String> paramNames = collectParamNames(filterQuery);
+      if (document != null) {
+        for (String parameter : paramNames) {
+          if (document.get(parameter) != null)
+            filterQuery = filterQuery.replace(String.format("${%s}", parameter), document.get(parameter));
+        }
+      }
+
+      if (document == null && paramNames.isEmpty() || document != null && !paramNames.isEmpty())
+        solrQuery.setFilterQueries(filterQuery);
+    }
+
+    if (sortFields != null) {
+      for (String field : sortFields)
+        solrQuery.addSort(field, asc);
+    }
+
+    return solrQuery;
+  }
+
+  private String setEndValueOn(String query) {
+    if (endValue != null)
+      query = query.replace("${end}", endValue);
+    return query;
+  }
+
+  private Set<String> collectParamNames(String filterQuery) {
+    Matcher matcher = PARAMETER_PATTERN.matcher(filterQuery);
+    Set<String> parameters = new HashSet<>();
+    while (matcher.find())
+      parameters.add(matcher.group().replace("${", "").replace("}", ""));
+    return parameters;
+  }
+}

http://git-wip-us.apache.org/repos/asf/ambari/blob/393fdb80/ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/archive/SolrQueryProperties.java
----------------------------------------------------------------------
diff --git a/ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/archive/SolrQueryProperties.java b/ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/archive/SolrQueryProperties.java
new file mode 100644
index 0000000..444a15b
--- /dev/null
+++ b/ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/archive/SolrQueryProperties.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.ambari.infra.job.archive;
+
+import org.hibernate.validator.constraints.NotBlank;
+
+public class SolrQueryProperties {
+  @NotBlank
+  private String collection;
+  @NotBlank
+  private String queryText;
+  private String filterQueryText;
+  private String[] sort;
+
+  public String getCollection() {
+    return collection;
+  }
+
+  public void setCollection(String collection) {
+    this.collection = collection;
+  }
+
+  public String getQueryText() {
+    return queryText;
+  }
+
+  public void setQueryText(String queryText) {
+    this.queryText = queryText;
+  }
+
+  public String getFilterQueryText() {
+    return filterQueryText;
+  }
+
+  public void setFilterQueryText(String filterQueryText) {
+    this.filterQueryText = filterQueryText;
+  }
+
+  public String[] getSort() {
+    return sort;
+  }
+
+  public void setSort(String[] sort) {
+    this.sort = sort;
+  }
+
+  public SolrQueryBuilder toQueryBuilder() {
+    return new SolrQueryBuilder().
+            setQueryText(queryText)
+            .setFilterQueryText(filterQueryText)
+            .addSort(sort);
+  }
+}

http://git-wip-us.apache.org/repos/asf/ambari/blob/393fdb80/ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/archive/TarGzCompressor.java
----------------------------------------------------------------------
diff --git a/ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/archive/TarGzCompressor.java b/ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/archive/TarGzCompressor.java
new file mode 100644
index 0000000..8e34ca9
--- /dev/null
+++ b/ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/archive/TarGzCompressor.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.ambari.infra.job.archive;
+
+import org.apache.commons.compress.archivers.tar.TarArchiveEntry;
+import org.apache.commons.compress.archivers.tar.TarArchiveOutputStream;
+import org.apache.commons.compress.compressors.gzip.GzipCompressorOutputStream;
+import org.apache.commons.io.IOUtils;
+
+import java.io.*;
+
+public class TarGzCompressor implements FileAction {
+  @Override
+  public File perform(File inputFile) {
+    File tarGzFile = new File(inputFile.getParent(), inputFile.getName() + ".tar.gz");
+    try (TarArchiveOutputStream tarArchiveOutputStream = new TarArchiveOutputStream(
+            new GzipCompressorOutputStream(new FileOutputStream(tarGzFile)))) {
+      TarArchiveEntry archiveEntry = new TarArchiveEntry(inputFile.getName());
+      archiveEntry.setSize(inputFile.length());
+      tarArchiveOutputStream.putArchiveEntry(archiveEntry);
+
+      try (FileInputStream fileInputStream = new FileInputStream(inputFile)) {
+        IOUtils.copy(fileInputStream, tarArchiveOutputStream);
+      }
+
+      tarArchiveOutputStream.closeArchiveEntry();
+    }
+    catch (IOException ex) {
+      throw new UncheckedIOException(ex);
+    }
+
+    return tarGzFile;
+  }
+}

http://git-wip-us.apache.org/repos/asf/ambari/blob/393fdb80/ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/manager/JobManager.java
----------------------------------------------------------------------
diff --git a/ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/manager/JobManager.java b/ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/manager/JobManager.java
index fc0a4f7..862119a 100644
--- a/ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/manager/JobManager.java
+++ b/ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/manager/JobManager.java
@@ -18,6 +18,7 @@
  */
 package org.apache.ambari.infra.manager;
 
+import com.google.common.base.Splitter;
 import com.google.common.collect.Lists;
 import org.apache.ambari.infra.model.ExecutionContextResponse;
 import org.apache.ambari.infra.model.JobDetailsResponse;
@@ -28,16 +29,14 @@ import org.apache.ambari.infra.model.JobOperationParams;
 import org.apache.ambari.infra.model.StepExecutionContextResponse;
 import org.apache.ambari.infra.model.StepExecutionInfoResponse;
 import org.apache.ambari.infra.model.StepExecutionProgressResponse;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 import org.springframework.batch.admin.history.StepExecutionHistory;
 import org.springframework.batch.admin.service.JobService;
 import org.springframework.batch.admin.service.NoSuchStepExecutionException;
 import org.springframework.batch.admin.web.JobInfo;
 import org.springframework.batch.admin.web.StepExecutionProgress;
-import org.springframework.batch.core.JobExecution;
-import org.springframework.batch.core.JobInstance;
-import org.springframework.batch.core.JobParametersBuilder;
-import org.springframework.batch.core.JobParametersInvalidException;
-import org.springframework.batch.core.StepExecution;
+import org.springframework.batch.core.*;
 import org.springframework.batch.core.launch.JobExecutionNotRunningException;
 import org.springframework.batch.core.launch.JobInstanceAlreadyExistsException;
 import org.springframework.batch.core.launch.JobOperator;
@@ -54,7 +53,6 @@ import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.Comparator;
-import java.util.Date;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -64,6 +62,8 @@ import java.util.TimeZone;
 @Named
 public class JobManager {
 
+  private static final Logger LOG = LoggerFactory.getLogger(JobManager.class);
+
   @Inject
   private JobService jobService;
 
@@ -83,9 +83,14 @@ public class JobManager {
   public JobExecutionInfoResponse launchJob(String jobName, String params)
     throws JobParametersInvalidException, JobInstanceAlreadyExistsException, NoSuchJobException,
     JobExecutionAlreadyRunningException, JobRestartException, JobInstanceAlreadyCompleteException {
-    // TODO: handle params
     JobParametersBuilder jobParametersBuilder = new JobParametersBuilder();
-    jobParametersBuilder.addDate("date", new Date());
+    if (params != null) {
+      LOG.info("Parsing parameters of job {} '{}'", jobName, params);
+      Splitter.on(',')
+              .trimResults()
+              .withKeyValueSeparator(Splitter.on('=').limit(2).trimResults())
+              .split(params).entrySet().forEach(entry -> jobParametersBuilder.addString(entry.getKey(), entry.getValue()));
+    }
     return new JobExecutionInfoResponse(jobService.launch(jobName, jobParametersBuilder.toJobParameters()), timeZone);
   }
 

http://git-wip-us.apache.org/repos/asf/ambari/blob/393fdb80/ambari-infra/ambari-infra-manager/src/main/resources/infra-manager.properties
----------------------------------------------------------------------
diff --git a/ambari-infra/ambari-infra-manager/src/main/resources/infra-manager.properties b/ambari-infra/ambari-infra-manager/src/main/resources/infra-manager.properties
index 8162376..7ef70aa 100644
--- a/ambari-infra/ambari-infra-manager/src/main/resources/infra-manager.properties
+++ b/ambari-infra/ambari-infra-manager/src/main/resources/infra-manager.properties
@@ -18,3 +18,15 @@ infra-manager.batch.db.username=admin
 infra-manager.batch.db.password=admin
 management.security.enabled=false
 management.health.solr.enabled=false
+infra-manager.server.data.folder=/tmp
+
+infra-manager.jobs.solr_data_export.zoo_keeper_socket=zookeeper:2181
+infra-manager.jobs.solr_data_export.read_block_size=100
+infra-manager.jobs.solr_data_export.write_block_size=150
+infra-manager.jobs.solr_data_export.file_name_suffix_column=logtime
+infra-manager.jobs.solr_data_export.destination_directory_path=/tmp/ambariInfraManager
+infra-manager.jobs.solr_data_export.query.collection=hadoop_logs
+infra-manager.jobs.solr_data_export.query.query_text=logtime:[* TO "${end}"]
+infra-manager.jobs.solr_data_export.query.filter_query_text=(logtime:"${logtime}" AND id:{"${id}" TO *]) OR logtime:{"${logtime}" TO "${end}"]
+infra-manager.jobs.solr_data_export.query.sort[0]=logtime
+infra-manager.jobs.solr_data_export.query.sort[1]=id

http://git-wip-us.apache.org/repos/asf/ambari/blob/393fdb80/ambari-infra/ambari-infra-manager/src/main/resources/log4j2.xml
----------------------------------------------------------------------
diff --git a/ambari-infra/ambari-infra-manager/src/main/resources/log4j2.xml b/ambari-infra/ambari-infra-manager/src/main/resources/log4j2.xml
index ad1adcd..9737554 100644
--- a/ambari-infra/ambari-infra-manager/src/main/resources/log4j2.xml
+++ b/ambari-infra/ambari-infra-manager/src/main/resources/log4j2.xml
@@ -17,7 +17,7 @@
 -->
 <Configuration monitorinterval="30" status="info" strict="true">
   <Properties>
-    <Property name="logging.file">out/infra-manager.log</Property>
+    <Property name="logging.file">target/log/infra-manager.log</Property>
   </Properties>
   <Appenders>
     <Appender type="Console" name="Console">

http://git-wip-us.apache.org/repos/asf/ambari/blob/393fdb80/ambari-infra/ambari-infra-manager/src/test/java/org/apache/ambari/infra/job/archive/DocumentExporterTest.java
----------------------------------------------------------------------
diff --git a/ambari-infra/ambari-infra-manager/src/test/java/org/apache/ambari/infra/job/archive/DocumentExporterTest.java b/ambari-infra/ambari-infra-manager/src/test/java/org/apache/ambari/infra/job/archive/DocumentExporterTest.java
new file mode 100644
index 0000000..88fbff0
--- /dev/null
+++ b/ambari-infra/ambari-infra-manager/src/test/java/org/apache/ambari/infra/job/archive/DocumentExporterTest.java
@@ -0,0 +1,147 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.ambari.infra.job.archive;
+
+import org.easymock.EasyMockRunner;
+import org.easymock.EasyMockSupport;
+import org.easymock.Mock;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.springframework.batch.core.JobExecution;
+import org.springframework.batch.core.StepExecution;
+import org.springframework.batch.core.scope.context.ChunkContext;
+import org.springframework.batch.core.scope.context.StepContext;
+import org.springframework.batch.item.ExecutionContext;
+import org.springframework.batch.item.ItemStreamReader;
+
+import java.io.IOException;
+import java.io.UncheckedIOException;
+import java.util.HashMap;
+
+import static org.easymock.EasyMock.expect;
+import static org.easymock.EasyMock.expectLastCall;
+
+@RunWith(EasyMockRunner.class)
+public class DocumentExporterTest extends EasyMockSupport {
+
+  private DocumentExporter documentExporter;
+  @Mock
+  private ItemStreamReader<Document> reader;
+  @Mock
+  private DocumentDestination documentDestination;
+  @Mock
+  private DocumentItemWriter documentItemWriter;
+  @Mock
+  private DocumentItemWriter documentItemWriter2;
+
+  private ExecutionContext executionContext;
+  private ChunkContext chunkContext;
+  private static final Document DOCUMENT = new Document(new HashMap<String, String>() {{ put("id", "1"); }});
+
+  @Before
+  public void setUp() throws Exception {
+    StepExecution stepExecution = new StepExecution("exportDoc", new JobExecution(1L));
+    chunkContext = new ChunkContext(new StepContext(stepExecution));
+    executionContext = stepExecution.getExecutionContext();
+    documentExporter = new DocumentExporter(reader, documentDestination, 2);
+  }
+
+  @After
+  public void tearDown() throws Exception {
+    verifyAll();
+  }
+
+  @Test
+  public void testNothingToRead() throws Exception {
+    reader.open(executionContext); expectLastCall();
+    expect(reader.read()).andReturn(null);
+    reader.close(); expectLastCall();
+    replayAll();
+
+    documentExporter.execute(null, chunkContext);
+  }
+
+  @Test
+  public void testWriteLessDocumentsThanWriteBlockSize() throws Exception {
+    reader.open(executionContext); expectLastCall();
+    expect(reader.read()).andReturn(DOCUMENT);
+    expect(documentDestination.open(DOCUMENT)).andReturn(documentItemWriter);
+    documentItemWriter.write(DOCUMENT); expectLastCall();
+    expect(reader.read()).andReturn(null);
+    reader.close(); expectLastCall();
+    documentItemWriter.close(); expectLastCall();
+    replayAll();
+
+    documentExporter.execute(null, chunkContext);
+  }
+
+  @Test
+  public void testWriteMoreDocumentsThanWriteBlockSize() throws Exception {
+    Document document2 = new Document(new HashMap<String, String>() {{ put("id", "2"); }});
+    Document document3 = new Document(new HashMap<String, String>() {{ put("id", "3"); }});
+
+    reader.open(executionContext); expectLastCall();
+    expect(reader.read()).andReturn(DOCUMENT);
+    expect(documentDestination.open(DOCUMENT)).andReturn(documentItemWriter);
+    documentItemWriter.write(DOCUMENT); expectLastCall();
+    expect(reader.read()).andReturn(document2);
+    documentItemWriter.write(document2); expectLastCall();
+    expect(reader.read()).andReturn(document3);
+    documentItemWriter.close(); expectLastCall();
+    expect(documentDestination.open(document3)).andReturn(documentItemWriter2);
+    documentItemWriter2.write(document3); expectLastCall();
+    expect(reader.read()).andReturn(null);
+    reader.update(executionContext);
+    reader.close(); expectLastCall();
+    documentItemWriter2.close(); expectLastCall();
+    replayAll();
+
+    documentExporter.execute(null, chunkContext);
+  }
+
+  @Test(expected = IOException.class)
+  public void testReadError() throws Exception {
+    reader.open(executionContext); expectLastCall();
+    expect(reader.read()).andReturn(DOCUMENT);
+    expect(documentDestination.open(DOCUMENT)).andReturn(documentItemWriter);
+    documentItemWriter.write(DOCUMENT); expectLastCall();
+    expect(reader.read()).andThrow(new IOException("TEST"));
+    documentItemWriter.revert(); expectLastCall();
+    reader.close(); expectLastCall();
+    replayAll();
+
+    documentExporter.execute(null, chunkContext);
+  }
+
+  @Test(expected = UncheckedIOException.class)
+  public void testWriteError() throws Exception {
+    reader.open(executionContext); expectLastCall();
+    expect(reader.read()).andReturn(DOCUMENT);
+    expect(documentDestination.open(DOCUMENT)).andReturn(documentItemWriter);
+    documentItemWriter.write(DOCUMENT); expectLastCall().andThrow(new UncheckedIOException(new IOException("TEST")));
+    documentItemWriter.revert(); expectLastCall();
+    reader.close(); expectLastCall();
+    replayAll();
+
+    documentExporter.execute(null, chunkContext);
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ambari/blob/393fdb80/ambari-infra/ambari-infra-manager/src/test/java/org/apache/ambari/infra/job/archive/DocumentItemReaderTest.java
----------------------------------------------------------------------
diff --git a/ambari-infra/ambari-infra-manager/src/test/java/org/apache/ambari/infra/job/archive/DocumentItemReaderTest.java b/ambari-infra/ambari-infra-manager/src/test/java/org/apache/ambari/infra/job/archive/DocumentItemReaderTest.java
new file mode 100644
index 0000000..942713f
--- /dev/null
+++ b/ambari-infra/ambari-infra-manager/src/test/java/org/apache/ambari/infra/job/archive/DocumentItemReaderTest.java
@@ -0,0 +1,197 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.ambari.infra.job.archive;
+
+import org.easymock.EasyMockRunner;
+import org.easymock.EasyMockSupport;
+import org.easymock.Mock;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.springframework.batch.item.ExecutionContext;
+
+import java.util.HashMap;
+
+import static org.easymock.EasyMock.expect;
+import static org.easymock.EasyMock.expectLastCall;
+import static org.hamcrest.core.Is.is;
+import static org.hamcrest.core.IsNull.nullValue;
+import static org.junit.Assert.assertThat;
+
+@RunWith(EasyMockRunner.class)
+public class DocumentItemReaderTest extends EasyMockSupport {
+  private static final Document DOCUMENT = new Document(new HashMap<String, String>() {{ put("id", "1"); }});
+  private static final Document DOCUMENT_2 = new Document(new HashMap<String, String>() {{ put("id", "2"); }});
+  private static final Document DOCUMENT_3 = new Document(new HashMap<String, String>() {{ put("id", "3"); }});
+  private static final int READ_BLOCK_SIZE = 2;
+
+  private DocumentItemReader documentItemReader;
+  @Mock
+  private DocumentSource documentSource;
+  @Mock
+  private DocumentIterator documentIterator;
+  @Mock
+  private DocumentIterator documentIterator2;
+
+  @Before
+  public void setUp() throws Exception {
+    documentItemReader = new DocumentItemReader(documentSource, READ_BLOCK_SIZE);
+  }
+
+  @After
+  public void tearDown() throws Exception {
+    verifyAll();
+  }
+
+  @Test
+  public void testReadWhenCollectionIsEmpty() throws Exception {
+    expect(documentSource.open(null, 2)).andReturn(documentIterator);
+    expect(documentIterator.next()).andReturn(null);
+    documentIterator.close(); expectLastCall();
+    replayAll();
+
+    assertThat(documentItemReader.read(), is(nullValue()));
+    assertThat(documentItemReader.isComplete(null), is(true));
+    assertThat(documentItemReader.isComplete(null, null), is(true));
+  }
+
+  @Test
+  public void testReadWhenCollectionContainsLessElementsThanReadBlockSize() throws Exception {
+    expect(documentSource.open(null, 2)).andReturn(documentIterator);
+    expect(documentIterator.next()).andReturn(DOCUMENT);
+    expect(documentIterator.next()).andReturn(null);
+    documentIterator.close(); expectLastCall();
+    replayAll();
+
+    assertThat(documentItemReader.read(), is(DOCUMENT));
+    assertThat(documentItemReader.isComplete(null), is(false));
+    assertThat(documentItemReader.isComplete(null, null), is(false));
+    assertThat(documentItemReader.read(), is(nullValue()));
+    assertThat(documentItemReader.isComplete(null), is(true));
+    assertThat(documentItemReader.isComplete(null, null), is(true));
+  }
+
+  @Test
+  public void testReadWhenCollectionContainsExactlySameCountElementsAsReadBlockSize() throws Exception {
+    expect(documentSource.open(null, 2)).andReturn(documentIterator);
+    expect(documentSource.open(DOCUMENT_2, 2)).andReturn(documentIterator2);
+    expect(documentIterator.next()).andReturn(DOCUMENT);
+    expect(documentIterator.next()).andReturn(DOCUMENT_2);
+    expect(documentIterator.next()).andReturn(null);
+    documentIterator.close(); expectLastCall();
+
+    expect(documentIterator2.next()).andReturn(null);
+    documentIterator2.close(); expectLastCall();
+    replayAll();
+
+    assertThat(documentItemReader.read(), is(DOCUMENT));
+    assertThat(documentItemReader.isComplete(null), is(false));
+    assertThat(documentItemReader.isComplete(null, null), is(false));
+    assertThat(documentItemReader.read(), is(DOCUMENT_2));
+    assertThat(documentItemReader.isComplete(null), is(false));
+    assertThat(documentItemReader.isComplete(null, null), is(false));
+    assertThat(documentItemReader.read(), is(nullValue()));
+    assertThat(documentItemReader.isComplete(null), is(true));
+    assertThat(documentItemReader.isComplete(null, null), is(true));
+  }
+
+  @Test
+  public void testReadWhenCollectionContainsMoreElementsThanReadBlockSize() throws Exception {
+    Document document3 = new Document(new HashMap<String, String>() {{ put("id", "2"); }});
+
+    expect(documentSource.open(null, 2)).andReturn(documentIterator);
+    expect(documentSource.open(DOCUMENT_2, 2)).andReturn(documentIterator2);
+    expect(documentIterator.next()).andReturn(DOCUMENT);
+    expect(documentIterator.next()).andReturn(DOCUMENT_2);
+    expect(documentIterator.next()).andReturn(null);
+    documentIterator.close(); expectLastCall();
+    expect(documentIterator2.next()).andReturn(document3);
+    expect(documentIterator2.next()).andReturn(null);
+    documentIterator2.close(); expectLastCall();
+
+    replayAll();
+
+    assertThat(documentItemReader.read(), is(DOCUMENT));
+    assertThat(documentItemReader.isComplete(null), is(false));
+    assertThat(documentItemReader.isComplete(null, null), is(false));
+
+    assertThat(documentItemReader.read(), is(DOCUMENT_2));
+    assertThat(documentItemReader.isComplete(null), is(false));
+    assertThat(documentItemReader.isComplete(null, null), is(false));
+
+    assertThat(documentItemReader.read(), is(document3));
+    assertThat(documentItemReader.isComplete(null), is(false));
+    assertThat(documentItemReader.isComplete(null, null), is(false));
+
+    assertThat(documentItemReader.read(), is(nullValue()));
+    assertThat(documentItemReader.isComplete(null), is(true));
+    assertThat(documentItemReader.isComplete(null, null), is(true));
+  }
+
+  @Test
+  public void testContinueWhenOnlyFirstElementWasRead() throws Exception {
+    expect(documentSource.open(null, 2)).andReturn(documentIterator);
+    expect(documentIterator.next()).andReturn(DOCUMENT);
+    documentIterator.close(); expectLastCall();
+    expect(documentSource.open(null, 2)).andReturn(documentIterator2);
+    expect(documentIterator2.next()).andReturn(DOCUMENT);
+    documentIterator2.close(); expectLastCall();
+    replayAll();
+
+    ExecutionContext executionContext = new ExecutionContext();
+    documentItemReader.open(executionContext);
+    assertThat(documentItemReader.read(), is(DOCUMENT));
+    documentItemReader.update(executionContext);
+    assertThat(executionContext.containsKey(DocumentItemReader.POSITION), is(false));
+    documentItemReader.close();
+
+    documentItemReader.open(executionContext);
+    assertThat(documentItemReader.read(), is(DOCUMENT));
+    documentItemReader.close();
+  }
+
+  @Test
+  public void testContinueWhenMoreThanOneElementWasRead() throws Exception {
+    expect(documentSource.open(null, 2)).andReturn(documentIterator);
+    expect(documentIterator.next()).andReturn(DOCUMENT);
+    expect(documentIterator.next()).andReturn(DOCUMENT_2);
+    documentIterator.close(); expectLastCall();
+    expect(documentSource.open(DOCUMENT, 2)).andReturn(documentIterator2);
+    expect(documentIterator2.next()).andReturn(DOCUMENT_2);
+    expect(documentIterator2.next()).andReturn(DOCUMENT_3);
+    documentIterator2.close(); expectLastCall();
+
+    replayAll();
+
+    ExecutionContext executionContext = new ExecutionContext();
+    documentItemReader.open(executionContext);
+    assertThat(documentItemReader.read(), is(DOCUMENT));
+    assertThat(documentItemReader.read(), is(DOCUMENT_2));
+    documentItemReader.update(executionContext);
+    assertThat(executionContext.get(DocumentItemReader.POSITION), is(DOCUMENT));
+    documentItemReader.close();
+
+    documentItemReader.open(executionContext);
+    assertThat(documentItemReader.read(), is(DOCUMENT_2));
+    assertThat(documentItemReader.read(), is(DOCUMENT_3));
+    documentItemReader.close();
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ambari/blob/393fdb80/ambari-infra/ambari-infra-manager/src/test/java/org/apache/ambari/infra/job/archive/LocalDocumentItemWriterTest.java
----------------------------------------------------------------------
diff --git a/ambari-infra/ambari-infra-manager/src/test/java/org/apache/ambari/infra/job/archive/LocalDocumentItemWriterTest.java b/ambari-infra/ambari-infra-manager/src/test/java/org/apache/ambari/infra/job/archive/LocalDocumentItemWriterTest.java
new file mode 100644
index 0000000..6411ff1
--- /dev/null
+++ b/ambari-infra/ambari-infra-manager/src/test/java/org/apache/ambari/infra/job/archive/LocalDocumentItemWriterTest.java
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.ambari.infra.job.archive;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.commons.io.FileUtils;
+import org.easymock.EasyMockRunner;
+import org.easymock.EasyMockSupport;
+import org.easymock.Mock;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+
+import static org.easymock.EasyMock.expect;
+import static org.hamcrest.CoreMatchers.is;
+import static org.junit.Assert.assertThat;
+
+@RunWith(EasyMockRunner.class)
+public class LocalDocumentItemWriterTest extends EasyMockSupport {
+
+  private static final Document DOCUMENT = new Document(new HashMap<String, String>() {{ put("id", "1"); }});
+  private static final Document DOCUMENT2 = new Document(new HashMap<String, String>() {{ put("id", "2"); }});
+  private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
+
+  private LocalDocumentItemWriter localDocumentItemWriter;
+  private File outFile;
+  @Mock
+  private FileAction fileAction;
+
+  @Before
+  public void setUp() throws Exception {
+    outFile = File.createTempFile("LocalDocumentItemWriterTest", "json.tmp");
+    localDocumentItemWriter = new LocalDocumentItemWriter(outFile, fileAction);
+  }
+
+  @After
+  public void tearDown() throws Exception {
+    outFile.delete();
+    verifyAll();
+  }
+
+  @Test
+  public void testWrite() throws Exception {
+    expect(fileAction.perform(outFile)).andReturn(outFile);
+    replayAll();
+
+    localDocumentItemWriter.write(DOCUMENT);
+    localDocumentItemWriter.write(DOCUMENT2);
+    localDocumentItemWriter.close();
+
+    List<Document> documentList = readBack(outFile);
+    assertThat(documentList.size(), is(2));
+    assertThat(documentList.get(0).get("id"), is(DOCUMENT.get("id")));
+    assertThat(documentList.get(1).get("id"), is(DOCUMENT2.get("id")));
+  }
+
+  private List<Document> readBack(File file) throws IOException {
+    List<Document> documentList = new ArrayList<>();
+    for (String line : FileUtils.readLines(file)) {
+      documentList.add(OBJECT_MAPPER.readValue(line, Document.class));
+    }
+    return documentList;
+  }
+
+  @Test
+  public void testRevert() throws Exception {
+    replayAll();
+
+    localDocumentItemWriter.write(DOCUMENT);
+    localDocumentItemWriter.revert();
+
+    assertThat(outFile.exists(), is(false));
+  }
+}
\ No newline at end of file