You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ambari.apache.org by rl...@apache.org on 2017/11/29 16:58:22 UTC
[05/24] ambari git commit: AMBARI-22514. Initial implementation of
Schedulable document deletion & archiving for Infra Solr (Krisztian Kasa via
oleewere)
AMBARI-22514. Initial implementation of Schedulable document deletion & archiving for Infra Solr (Krisztian Kasa via oleewere)
Project: http://git-wip-us.apache.org/repos/asf/ambari/repo
Commit: http://git-wip-us.apache.org/repos/asf/ambari/commit/393fdb80
Tree: http://git-wip-us.apache.org/repos/asf/ambari/tree/393fdb80
Diff: http://git-wip-us.apache.org/repos/asf/ambari/diff/393fdb80
Branch: refs/heads/branch-feature-AMBARI-20859
Commit: 393fdb8048ff579e8a55cd1b477a23d1bf105576
Parents: 2bf3c8e
Author: Krisztian Kasa <ka...@gmail.com>
Authored: Tue Nov 28 15:45:22 2017 +0100
Committer: Oliver Szabo <ol...@gmail.com>
Committed: Tue Nov 28 15:47:59 2017 +0100
----------------------------------------------------------------------
ambari-infra/ambari-infra-manager/pom.xml | 11 ++
.../infra/job/archive/CompositeFileAction.java | 46 +++++
.../ambari/infra/job/archive/Document.java | 54 +++++
.../infra/job/archive/DocumentDestination.java | 23 +++
.../archive/DocumentExportConfiguration.java | 118 +++++++++++
.../job/archive/DocumentExportJobListener.java | 35 ++++
.../job/archive/DocumentExportProperties.java | 112 +++++++++++
.../job/archive/DocumentExportStepListener.java | 47 +++++
.../infra/job/archive/DocumentExporter.java | 99 ++++++++++
.../infra/job/archive/DocumentItemReader.java | 135 +++++++++++++
.../infra/job/archive/DocumentItemWriter.java | 25 +++
.../infra/job/archive/DocumentIterator.java | 25 +++
.../infra/job/archive/DocumentSource.java | 24 +++
.../ambari/infra/job/archive/FileAction.java | 25 +++
.../job/archive/LocalDocumentItemWriter.java | 72 +++++++
.../ambari/infra/job/archive/S3Properties.java | 64 ++++++
.../ambari/infra/job/archive/S3Uploader.java | 51 +++++
.../infra/job/archive/SolrDocumentIterator.java | 90 +++++++++
.../infra/job/archive/SolrDocumentSource.java | 68 +++++++
.../infra/job/archive/SolrQueryBuilder.java | 115 +++++++++++
.../infra/job/archive/SolrQueryProperties.java | 69 +++++++
.../infra/job/archive/TarGzCompressor.java | 50 +++++
.../apache/ambari/infra/manager/JobManager.java | 21 +-
.../src/main/resources/infra-manager.properties | 12 ++
.../src/main/resources/log4j2.xml | 2 +-
.../infra/job/archive/DocumentExporterTest.java | 147 ++++++++++++++
.../job/archive/DocumentItemReaderTest.java | 197 +++++++++++++++++++
.../archive/LocalDocumentItemWriterTest.java | 98 +++++++++
.../infra/job/archive/SolrQueryBuilderTest.java | 113 +++++++++++
.../test-config/logfeeder/logfeeder.properties | 2 +-
30 files changed, 1940 insertions(+), 10 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ambari/blob/393fdb80/ambari-infra/ambari-infra-manager/pom.xml
----------------------------------------------------------------------
diff --git a/ambari-infra/ambari-infra-manager/pom.xml b/ambari-infra/ambari-infra-manager/pom.xml
index aa86da8..67bf7d1 100644
--- a/ambari-infra/ambari-infra-manager/pom.xml
+++ b/ambari-infra/ambari-infra-manager/pom.xml
@@ -141,6 +141,12 @@
<version>3.4</version>
<scope>test</scope>
</dependency>
+ <dependency>
+ <groupId>org.hamcrest</groupId>
+ <artifactId>hamcrest-all</artifactId>
+ <version>1.3</version>
+ <scope>test</scope>
+ </dependency>
<!-- Spring dependencies -->
<dependency>
<groupId>org.springframework</groupId>
@@ -417,6 +423,11 @@
<groupId>com.google.guava</groupId>
<version>20.0</version>
</dependency>
+ <dependency>
+ <groupId>com.amazonaws</groupId>
+ <artifactId>aws-java-sdk-s3</artifactId>
+ <version>1.11.5</version>
+ </dependency>
</dependencies>
</project>
http://git-wip-us.apache.org/repos/asf/ambari/blob/393fdb80/ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/archive/CompositeFileAction.java
----------------------------------------------------------------------
diff --git a/ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/archive/CompositeFileAction.java b/ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/archive/CompositeFileAction.java
new file mode 100644
index 0000000..84ce160
--- /dev/null
+++ b/ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/archive/CompositeFileAction.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.ambari.infra.job.archive;
+
+import java.io.File;
+import java.util.List;
+
+import static java.util.Arrays.asList;
+
+public class CompositeFileAction implements FileAction {
+
+ private final List<FileAction> actions;
+
+ public CompositeFileAction(FileAction... actions) {
+ this.actions = asList(actions);
+ }
+
+ public void add(FileAction action) {
+ actions.add(action);
+ }
+
+ @Override
+ public File perform(File inputFile) {
+ File file = inputFile;
+ for (FileAction action : actions) {
+ file = action.perform(file);
+ }
+ return file;
+ }
+}
http://git-wip-us.apache.org/repos/asf/ambari/blob/393fdb80/ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/archive/Document.java
----------------------------------------------------------------------
diff --git a/ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/archive/Document.java b/ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/archive/Document.java
new file mode 100644
index 0000000..84f5ece
--- /dev/null
+++ b/ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/archive/Document.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.ambari.infra.job.archive;
+
+import com.fasterxml.jackson.annotation.JsonAnyGetter;
+import com.fasterxml.jackson.annotation.JsonAnySetter;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import static java.util.Collections.unmodifiableMap;
+
+// TODO: create entities for each solr collections
+public class Document {
+ private final Map<String, String> fieldMap;
+
+ private Document() {
+ fieldMap = new HashMap<>();
+ }
+
+ public Document(Map<String, String> fieldMap) {
+ this.fieldMap = unmodifiableMap(fieldMap);
+ }
+
+ public String get(String key) {
+ return fieldMap.get(key);
+ }
+
+ @JsonAnyGetter
+ private Map<String, String> getFieldMap() {
+ return fieldMap;
+ }
+
+ @JsonAnySetter
+ private void put(String key, String value) {
+ fieldMap.put(key, value);
+ }
+}
http://git-wip-us.apache.org/repos/asf/ambari/blob/393fdb80/ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/archive/DocumentDestination.java
----------------------------------------------------------------------
diff --git a/ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/archive/DocumentDestination.java b/ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/archive/DocumentDestination.java
new file mode 100644
index 0000000..f647a36
--- /dev/null
+++ b/ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/archive/DocumentDestination.java
@@ -0,0 +1,23 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.ambari.infra.job.archive;
+
+public interface DocumentDestination {
+ DocumentItemWriter open(Document firstDocument);
+}
http://git-wip-us.apache.org/repos/asf/ambari/blob/393fdb80/ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/archive/DocumentExportConfiguration.java
----------------------------------------------------------------------
diff --git a/ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/archive/DocumentExportConfiguration.java b/ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/archive/DocumentExportConfiguration.java
new file mode 100644
index 0000000..69f41d3
--- /dev/null
+++ b/ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/archive/DocumentExportConfiguration.java
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.ambari.infra.job.archive;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.batch.core.Job;
+import org.springframework.batch.core.Step;
+import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
+import org.springframework.batch.core.configuration.annotation.JobScope;
+import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
+import org.springframework.batch.core.configuration.annotation.StepScope;
+import org.springframework.beans.factory.annotation.Qualifier;
+import org.springframework.beans.factory.annotation.Value;
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
+
+import javax.inject.Inject;
+import java.io.File;
+import java.nio.file.Paths;
+import java.time.OffsetDateTime;
+import java.time.ZoneOffset;
+import java.time.format.DateTimeFormatter;
+
+import static org.apache.ambari.infra.job.archive.SolrDocumentSource.SOLR_DATETIME_FORMATTER;
+import static org.apache.commons.lang.StringUtils.isBlank;
+
+@Configuration
+public class DocumentExportConfiguration {
+ private static final Logger LOG = LoggerFactory.getLogger(DocumentExportConfiguration.class);
+ private static final DateTimeFormatter FILENAME_DATETIME_FORMATTER = DateTimeFormatter.ofPattern("yyyy-MM-dd'T'HH_mm_ss.SSSX");
+
+ @Inject
+ private DocumentExportProperties properties;
+
+ @Inject
+ private StepBuilderFactory steps;
+
+ @Inject
+ private JobBuilderFactory jobs;
+
+
+
+ @Bean
+ public Job logExportJob(@Qualifier("exportStep") Step logExportStep) {
+ return jobs.get("solr_data_export").listener(new DocumentExportJobListener()).start(logExportStep).build();
+ }
+
+ @Bean
+ @JobScope
+ public Step exportStep(DocumentExporter documentExporter) {
+ return steps.get("export")
+ .tasklet(documentExporter)
+ .listener(new DocumentExportStepListener(properties))
+ .build();
+ }
+
+ @Bean
+ @StepScope
+ public DocumentExporter getDocumentExporter(DocumentItemReader documentItemReader,
+ @Value("#{stepExecution.jobExecution.id}") String jobId) {
+ File path = Paths.get(
+ properties.getDestinationDirectoryPath(),
+ String.format("%s_%s", properties.getQuery().getCollection(), jobId)).toFile(); // TODO: add end date
+ LOG.info("Destination directory path={}", path);
+ if (!path.exists()) {
+ if (!path.mkdirs()) {
+ LOG.warn("Unable to create directory {}", path);
+ }
+ }
+
+ CompositeFileAction fileAction = new CompositeFileAction(new TarGzCompressor());
+
+ return new DocumentExporter(
+ documentItemReader,
+ firstDocument -> new LocalDocumentItemWriter(
+ new File(path, String.format("%s_-_%s.json",
+ properties.getQuery().getCollection(),
+ firstDocument.get(properties.getFileNameSuffixColumn()))),
+ fileAction),
+ properties.getWriteBlockSize());
+ }
+
+ @Bean
+ @StepScope
+ public DocumentItemReader reader(DocumentSource documentSource) {
+ return new DocumentItemReader(documentSource, properties.getReadBlockSize());
+ }
+
+ @Bean
+ @StepScope
+ public DocumentSource logSource(@Value("#{jobParameters[endDate]}") String endDateText) {
+ OffsetDateTime endDate = OffsetDateTime.now(ZoneOffset.UTC);
+ if (!isBlank(endDateText))
+ endDate = OffsetDateTime.parse(endDateText);
+
+ return new SolrDocumentSource(
+ properties.getZooKeeperSocket(),
+ properties.getQuery(),
+ SOLR_DATETIME_FORMATTER.format(endDate));
+ }
+}
http://git-wip-us.apache.org/repos/asf/ambari/blob/393fdb80/ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/archive/DocumentExportJobListener.java
----------------------------------------------------------------------
diff --git a/ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/archive/DocumentExportJobListener.java b/ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/archive/DocumentExportJobListener.java
new file mode 100644
index 0000000..f1df46c
--- /dev/null
+++ b/ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/archive/DocumentExportJobListener.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.ambari.infra.job.archive;
+
+import org.springframework.batch.core.ExitStatus;
+import org.springframework.batch.core.JobExecution;
+import org.springframework.batch.core.JobExecutionListener;
+
+public class DocumentExportJobListener implements JobExecutionListener {
+ @Override
+ public void beforeJob(JobExecution jobExecution) {
+
+ }
+
+ @Override
+ public void afterJob(JobExecution jobExecution) {
+ jobExecution.setExitStatus(new ExitStatus(ExitStatus.COMPLETED.getExitCode()));
+ }
+}
http://git-wip-us.apache.org/repos/asf/ambari/blob/393fdb80/ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/archive/DocumentExportProperties.java
----------------------------------------------------------------------
diff --git a/ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/archive/DocumentExportProperties.java b/ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/archive/DocumentExportProperties.java
new file mode 100644
index 0000000..d6301c0
--- /dev/null
+++ b/ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/archive/DocumentExportProperties.java
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.ambari.infra.job.archive;
+
+import org.hibernate.validator.constraints.NotBlank;
+import org.springframework.batch.core.JobParameters;
+import org.springframework.boot.context.properties.ConfigurationProperties;
+import org.springframework.context.annotation.Configuration;
+import org.springframework.context.annotation.PropertySource;
+
+import javax.validation.constraints.Min;
+
+import static org.apache.commons.lang.StringUtils.isBlank;
+
+@Configuration
+@PropertySource(value = {"classpath:infra-manager.properties"})
+@ConfigurationProperties(prefix = "infra-manager.jobs.solr_data_export")
+public class DocumentExportProperties {
+ @NotBlank
+ private String zooKeeperSocket;
+ @Min(1)
+ private int readBlockSize;
+ @Min(1)
+ private int writeBlockSize;
+ @NotBlank
+ private String destinationDirectoryPath;
+ @NotBlank
+ private String fileNameSuffixColumn;
+ private SolrQueryProperties query;
+
+ public String getZooKeeperSocket() {
+ return zooKeeperSocket;
+ }
+
+ public void setZooKeeperSocket(String zooKeeperSocket) {
+ this.zooKeeperSocket = zooKeeperSocket;
+ }
+
+ public int getReadBlockSize() {
+ return readBlockSize;
+ }
+
+ public void setReadBlockSize(int readBlockSize) {
+ this.readBlockSize = readBlockSize;
+ }
+
+ public int getWriteBlockSize() {
+ return writeBlockSize;
+ }
+
+ public void setWriteBlockSize(int writeBlockSize) {
+ this.writeBlockSize = writeBlockSize;
+ }
+
+ public String getDestinationDirectoryPath() {
+ return destinationDirectoryPath;
+ }
+
+ public void setDestinationDirectoryPath(String destinationDirectoryPath) {
+ this.destinationDirectoryPath = destinationDirectoryPath;
+ }
+
+ public void apply(JobParameters jobParameters) {
+ // TODO: solr query params
+ zooKeeperSocket = jobParameters.getString("zooKeeperSocket", zooKeeperSocket);
+ readBlockSize = getIntJobParameter(jobParameters, "readBlockSize", readBlockSize);
+ writeBlockSize = getIntJobParameter(jobParameters, "writeBlockSize", writeBlockSize);
+ destinationDirectoryPath = jobParameters.getString("destinationDirectoryPath", destinationDirectoryPath);
+ query.setCollection(jobParameters.getString("collection", query.getCollection()));
+ query.setQueryText(jobParameters.getString("queryText", query.getQueryText()));
+ query.setFilterQueryText(jobParameters.getString("filterQueryText", query.getFilterQueryText()));
+ }
+
+ private int getIntJobParameter(JobParameters jobParameters, String parameterName, int defaultValue) {
+ String writeBlockSizeText = jobParameters.getString(parameterName);
+ if (isBlank(writeBlockSizeText))
+ return defaultValue;
+ return this.writeBlockSize = Integer.parseInt(writeBlockSizeText);
+ }
+
+ public String getFileNameSuffixColumn() {
+ return fileNameSuffixColumn;
+ }
+
+ public void setFileNameSuffixColumn(String fileNameSuffixColumn) {
+ this.fileNameSuffixColumn = fileNameSuffixColumn;
+ }
+
+ public SolrQueryProperties getQuery() {
+ return query;
+ }
+
+ public void setQuery(SolrQueryProperties query) {
+ this.query = query;
+ }
+}
http://git-wip-us.apache.org/repos/asf/ambari/blob/393fdb80/ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/archive/DocumentExportStepListener.java
----------------------------------------------------------------------
diff --git a/ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/archive/DocumentExportStepListener.java b/ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/archive/DocumentExportStepListener.java
new file mode 100644
index 0000000..3bab6d5
--- /dev/null
+++ b/ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/archive/DocumentExportStepListener.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.ambari.infra.job.archive;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.batch.core.ExitStatus;
+import org.springframework.batch.core.StepExecution;
+import org.springframework.batch.core.StepExecutionListener;
+
+public class DocumentExportStepListener implements StepExecutionListener {
+ private static final Logger LOG = LoggerFactory.getLogger(DocumentExportStepListener.class);
+
+ private final DocumentExportProperties properties;
+
+ public DocumentExportStepListener(DocumentExportProperties properties) {
+ this.properties = properties;
+ }
+
+ @Override
+ public void beforeStep(StepExecution stepExecution) {
+ properties.apply(stepExecution.getJobParameters());
+ LOG.info("LogExport step - before step execution");
+ }
+
+ @Override
+ public ExitStatus afterStep(StepExecution stepExecution) {
+ LOG.info("LogExport step - after step execution");
+ return stepExecution.getExitStatus();
+ }
+}
http://git-wip-us.apache.org/repos/asf/ambari/blob/393fdb80/ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/archive/DocumentExporter.java
----------------------------------------------------------------------
diff --git a/ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/archive/DocumentExporter.java b/ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/archive/DocumentExporter.java
new file mode 100644
index 0000000..6106c20
--- /dev/null
+++ b/ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/archive/DocumentExporter.java
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.ambari.infra.job.archive;
+
+import org.springframework.batch.core.ExitStatus;
+import org.springframework.batch.core.StepContribution;
+import org.springframework.batch.core.StepExecution;
+import org.springframework.batch.core.StepExecutionListener;
+import org.springframework.batch.core.scope.context.ChunkContext;
+import org.springframework.batch.core.step.tasklet.Tasklet;
+import org.springframework.batch.item.ExecutionContext;
+import org.springframework.batch.item.ItemStreamReader;
+import org.springframework.batch.repeat.RepeatStatus;
+
+public class DocumentExporter implements Tasklet, StepExecutionListener {
+
+ private boolean complete = false;
+ private final ItemStreamReader<Document> documentReader;
+ private final DocumentDestination documentDestination;
+ private final int writeBlockSize;
+
+ public DocumentExporter(ItemStreamReader<Document> documentReader, DocumentDestination documentDestination, int writeBlockSize) {
+ this.documentReader = documentReader;
+ this.documentDestination = documentDestination;
+ this.writeBlockSize = writeBlockSize;
+ }
+
+ @Override
+ public void beforeStep(StepExecution stepExecution) {
+
+ }
+
+ @Override
+ public ExitStatus afterStep(StepExecution stepExecution) {
+ if (complete) {
+ return ExitStatus.COMPLETED;
+ }
+ else {
+ return ExitStatus.FAILED;
+ }
+ }
+
+ @Override
+ public RepeatStatus execute(StepContribution contribution, ChunkContext chunkContext) throws Exception {
+ ExecutionContext executionContext = chunkContext.getStepContext().getStepExecution().getExecutionContext();
+ documentReader.open(executionContext);
+
+ DocumentItemWriter writer = null;
+ int writtenCount = 0;
+ try {
+ Document document;
+ while ((document = documentReader.read()) != null) {
+ if (writer != null && writtenCount >= writeBlockSize) {
+ writer.close();
+ writer = null;
+ writtenCount = 0;
+ documentReader.update(executionContext);
+ }
+
+ if (writer == null)
+ writer = documentDestination.open(document);
+
+ writer.write(document);
+ ++writtenCount;
+ }
+ }
+ catch (Exception e) {
+ if (writer != null) {
+ writer.revert();
+ writer = null;
+ }
+ throw e;
+ }
+ finally {
+ if (writer != null)
+ writer.close();
+ documentReader.close();
+ }
+
+ complete = true;
+ return RepeatStatus.FINISHED;
+ }
+}
http://git-wip-us.apache.org/repos/asf/ambari/blob/393fdb80/ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/archive/DocumentItemReader.java
----------------------------------------------------------------------
diff --git a/ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/archive/DocumentItemReader.java b/ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/archive/DocumentItemReader.java
new file mode 100644
index 0000000..a4378a4
--- /dev/null
+++ b/ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/archive/DocumentItemReader.java
@@ -0,0 +1,135 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.ambari.infra.job.archive;
+
+import org.springframework.batch.item.ExecutionContext;
+import org.springframework.batch.item.ItemStreamException;
+import org.springframework.batch.item.support.AbstractItemStreamItemReader;
+import org.springframework.batch.repeat.CompletionPolicy;
+import org.springframework.batch.repeat.RepeatContext;
+import org.springframework.batch.repeat.RepeatStatus;
+import org.springframework.batch.repeat.context.RepeatContextSupport;
+import org.springframework.util.ClassUtils;
+
+public class DocumentItemReader extends AbstractItemStreamItemReader<Document> implements CompletionPolicy {
+
+ public final static String POSITION = "last-read";
+
+ private final DocumentSource documentSource;
+ private final int readBlockSize;
+
+ private DocumentIterator documentIterator = null;
+ private int count = 0;
+ private boolean eof = false;
+ private Document current = null;
+ private Document previous = null;
+
+ public DocumentItemReader(DocumentSource documentSource, int readBlockSize) {
+ this.documentSource = documentSource;
+ this.readBlockSize = readBlockSize;
+ setName(ClassUtils.getShortName(DocumentItemReader.class));
+ }
+
+ @Override
+ public Document read() throws Exception {
+ if (documentIterator == null)
+ openStream();
+ Document next = getNext();
+ if (next == null && count > readBlockSize) {
+ openStream();
+ next = getNext();
+ }
+ eof = next == null;
+ if (eof && documentIterator != null)
+ documentIterator.close();
+
+ previous = current;
+ current = next;
+ return current;
+ }
+
+ private Document getNext() {
+ ++count;
+ return documentIterator.next();
+ }
+
+ private void openStream() {
+ closeStream();
+ documentIterator = documentSource.open(current, readBlockSize);
+ count = 0;
+ }
+
+ private void closeStream() {
+ if (documentIterator == null)
+ return;
+ try {
+ documentIterator.close();
+ }
+ catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ documentIterator = null;
+ }
+
+ @Override
+ public void open(ExecutionContext executionContext) {
+ super.open(executionContext);
+ current = null;
+ previous = null;
+ eof = false;
+ documentIterator = null;
+ if (!executionContext.containsKey(POSITION))
+ return;
+
+ current = (Document) executionContext.get(POSITION);
+ }
+
+ @Override
+ public void update(ExecutionContext executionContext) throws ItemStreamException {
+ super.update(executionContext);
+ if (previous != null)
+ executionContext.put(POSITION, previous);
+ }
+
+ @Override
+ public void close() {
+ closeStream();
+ }
+
+ @Override
+ public boolean isComplete(RepeatContext context, RepeatStatus result) {
+ return eof;
+ }
+
+ @Override
+ public boolean isComplete(RepeatContext context) {
+ return eof;
+ }
+
+ @Override
+ public RepeatContext start(RepeatContext parent) {
+ return new RepeatContextSupport(parent);
+ }
+
+ @Override
+ public void update(RepeatContext context) {
+ if (eof)
+ context.setCompleteOnly();
+ }
+}
http://git-wip-us.apache.org/repos/asf/ambari/blob/393fdb80/ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/archive/DocumentItemWriter.java
----------------------------------------------------------------------
diff --git a/ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/archive/DocumentItemWriter.java b/ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/archive/DocumentItemWriter.java
new file mode 100644
index 0000000..e96f6f1
--- /dev/null
+++ b/ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/archive/DocumentItemWriter.java
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.ambari.infra.job.archive;
+
+public interface DocumentItemWriter {
+ void write(Document document);
+ void revert();
+ void close();
+}
http://git-wip-us.apache.org/repos/asf/ambari/blob/393fdb80/ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/archive/DocumentIterator.java
----------------------------------------------------------------------
diff --git a/ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/archive/DocumentIterator.java b/ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/archive/DocumentIterator.java
new file mode 100644
index 0000000..6232cfc
--- /dev/null
+++ b/ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/archive/DocumentIterator.java
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.ambari.infra.job.archive;
+
+import java.util.Iterator;
+
+// TODO: generic closeable iterator
+public interface DocumentIterator extends Iterator<Document>, AutoCloseable {
+}
http://git-wip-us.apache.org/repos/asf/ambari/blob/393fdb80/ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/archive/DocumentSource.java
----------------------------------------------------------------------
diff --git a/ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/archive/DocumentSource.java b/ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/archive/DocumentSource.java
new file mode 100644
index 0000000..c9871a3
--- /dev/null
+++ b/ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/archive/DocumentSource.java
@@ -0,0 +1,24 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.ambari.infra.job.archive;
+
+// TODO: generic object source
+public interface DocumentSource {
+ DocumentIterator open(Document current, int rows);
+}
http://git-wip-us.apache.org/repos/asf/ambari/blob/393fdb80/ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/archive/FileAction.java
----------------------------------------------------------------------
diff --git a/ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/archive/FileAction.java b/ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/archive/FileAction.java
new file mode 100644
index 0000000..26a8c63
--- /dev/null
+++ b/ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/archive/FileAction.java
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.ambari.infra.job.archive;
+
+import java.io.File;
+
+public interface FileAction {
+ File perform(File inputFile);
+}
http://git-wip-us.apache.org/repos/asf/ambari/blob/393fdb80/ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/archive/LocalDocumentItemWriter.java
----------------------------------------------------------------------
diff --git a/ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/archive/LocalDocumentItemWriter.java b/ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/archive/LocalDocumentItemWriter.java
new file mode 100644
index 0000000..02d898d
--- /dev/null
+++ b/ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/archive/LocalDocumentItemWriter.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.ambari.infra.job.archive;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.commons.io.IOUtils;
+
+import java.io.*;
+
+public class LocalDocumentItemWriter implements DocumentItemWriter {
+ private static final ObjectMapper json = new ObjectMapper();
+ private static final String ENCODING = "UTF-8";
+
+ private final File outFile;
+ private final BufferedWriter bufferedWriter;
+ private final FileAction fileAction;
+
+ public LocalDocumentItemWriter(File outFile, FileAction fileAction) {
+ this.fileAction = fileAction;
+ this.outFile = outFile;
+ try {
+ this.bufferedWriter = new BufferedWriter(new OutputStreamWriter(new FileOutputStream(outFile), ENCODING));
+ } catch (UnsupportedEncodingException e) {
+ throw new RuntimeException(e);
+ } catch (FileNotFoundException e) {
+ throw new UncheckedIOException(e);
+ }
+ }
+
+ @Override
+ public void write(Document document) {
+ try {
+ bufferedWriter.write(json.writeValueAsString(document));
+ bufferedWriter.newLine();
+ }
+ catch (IOException e) {
+ throw new UncheckedIOException(e);
+ }
+ }
+
+ @Override
+ public void revert() {
+ IOUtils.closeQuietly(bufferedWriter);
+ outFile.delete();
+ }
+
+ @Override
+ public void close() {
+ try {
+ bufferedWriter.close();
+ fileAction.perform(outFile);
+ } catch (IOException e) {
+ throw new UncheckedIOException(e);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/ambari/blob/393fdb80/ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/archive/S3Properties.java
----------------------------------------------------------------------
diff --git a/ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/archive/S3Properties.java b/ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/archive/S3Properties.java
new file mode 100644
index 0000000..495401d
--- /dev/null
+++ b/ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/archive/S3Properties.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.ambari.infra.job.archive;
+
+import org.hibernate.validator.constraints.NotBlank;
+
+public class S3Properties {
+ @NotBlank
+ private String accessKey;
+ @NotBlank
+ private String secretKey;
+ @NotBlank
+ private String keyPrefix;
+ @NotBlank
+ private String bucketName;
+
+ public String getAccessKey() {
+ return accessKey;
+ }
+
+ public String getSecretKey() {
+ return secretKey;
+ }
+
+ public String getKeyPrefix() {
+ return keyPrefix;
+ }
+
+ public String getBucketName() {
+ return bucketName;
+ }
+
+ public void setAccessKey(String accessKey) {
+ this.accessKey = accessKey;
+ }
+
+ public void setSecretKey(String secretKey) {
+ this.secretKey = secretKey;
+ }
+
+ public void setKeyPrefix(String keyPrefix) {
+ this.keyPrefix = keyPrefix;
+ }
+
+ public void setBucketName(String bucketName) {
+ this.bucketName = bucketName;
+ }
+}
http://git-wip-us.apache.org/repos/asf/ambari/blob/393fdb80/ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/archive/S3Uploader.java
----------------------------------------------------------------------
diff --git a/ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/archive/S3Uploader.java b/ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/archive/S3Uploader.java
new file mode 100644
index 0000000..3214e50
--- /dev/null
+++ b/ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/archive/S3Uploader.java
@@ -0,0 +1,51 @@
+package org.apache.ambari.infra.job.archive;
+
+import com.amazonaws.auth.BasicAWSCredentials;
+import com.amazonaws.services.s3.AmazonS3Client;
+
+import java.io.File;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+public class S3Uploader implements FileAction {
+
+ private final AmazonS3Client client;
+ private final String keyPrefix;
+ private final String bucketName;
+
+ public S3Uploader(S3Properties s3Properties) {
+ this.keyPrefix = s3Properties.getKeyPrefix();
+ this.bucketName = s3Properties.getBucketName();
+ BasicAWSCredentials credentials = new BasicAWSCredentials(s3Properties.getAccessKey(), s3Properties.getSecretKey());
+ client = new AmazonS3Client(credentials);
+ }
+
+ @Override
+ public File perform(File inputFile) {
+ String key = keyPrefix + inputFile.getName();
+
+ if (client.doesObjectExist(bucketName, key)) {
+ System.out.println("Object '" + key + "' already exists");
+ System.exit(0);
+ }
+
+ client.putObject(bucketName, key, inputFile);
+ return inputFile;
+ }
+}
http://git-wip-us.apache.org/repos/asf/ambari/blob/393fdb80/ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/archive/SolrDocumentIterator.java
----------------------------------------------------------------------
diff --git a/ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/archive/SolrDocumentIterator.java b/ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/archive/SolrDocumentIterator.java
new file mode 100644
index 0000000..db4069b
--- /dev/null
+++ b/ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/archive/SolrDocumentIterator.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.ambari.infra.job.archive;
+
+import org.apache.solr.client.solrj.impl.CloudSolrClient;
+import org.apache.solr.client.solrj.response.QueryResponse;
+import org.apache.solr.common.SolrDocument;
+
+import java.io.IOException;
+import java.io.UncheckedIOException;
+import java.text.DateFormat;
+import java.text.SimpleDateFormat;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.TimeZone;
+
+public class SolrDocumentIterator implements DocumentIterator {
+
+ private static final DateFormat SOLR_DATE_FORMAT = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSSX");
+
+ static {
+ SOLR_DATE_FORMAT.setTimeZone(TimeZone.getTimeZone("UTC"));
+ }
+
+ private final Iterator<SolrDocument> documentIterator;
+ private final CloudSolrClient client;
+
+
+ public SolrDocumentIterator(QueryResponse response, CloudSolrClient client) {
+ documentIterator = response.getResults().iterator();
+ this.client = client;
+ }
+
+ @Override
+ public Document next() {
+ if (!documentIterator.hasNext())
+ return null;
+
+ SolrDocument document = documentIterator.next();
+ HashMap<String, String> fieldMap = new HashMap<>();
+ for (String key : document.getFieldNames()) {
+ fieldMap.put(key, toString(document.get(key)));
+ }
+
+ return new Document(fieldMap);
+ }
+
+ private String toString(Object value) {
+ if (value == null) {
+ return null;
+ }
+ else if (value instanceof Date) {
+ return SOLR_DATE_FORMAT.format(value);
+ }
+ else {
+ return value.toString();
+ }
+ }
+
+ @Override
+ public void close() {
+ try {
+ client.close();
+ } catch (IOException e) {
+ throw new UncheckedIOException(e);
+ }
+ }
+
+ @Override
+ public boolean hasNext() {
+ return documentIterator.hasNext();
+ }
+}
http://git-wip-us.apache.org/repos/asf/ambari/blob/393fdb80/ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/archive/SolrDocumentSource.java
----------------------------------------------------------------------
diff --git a/ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/archive/SolrDocumentSource.java b/ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/archive/SolrDocumentSource.java
new file mode 100644
index 0000000..2181ba3
--- /dev/null
+++ b/ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/archive/SolrDocumentSource.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.ambari.infra.job.archive;
+
+import org.apache.solr.client.solrj.SolrQuery;
+import org.apache.solr.client.solrj.SolrServerException;
+import org.apache.solr.client.solrj.impl.CloudSolrClient;
+import org.apache.solr.client.solrj.response.QueryResponse;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.io.UncheckedIOException;
+import java.time.format.DateTimeFormatter;
+
+public class SolrDocumentSource implements DocumentSource {
+ public static final DateTimeFormatter SOLR_DATETIME_FORMATTER = DateTimeFormatter.ofPattern("yyyy-MM-dd'T'HH:mm:ss.SSSX");
+ private static final Logger LOG = LoggerFactory.getLogger(SolrDocumentSource.class);
+
+ private final String zkHost;
+ private final SolrQueryProperties properties;
+ private final String endValue;
+
+ public SolrDocumentSource(String zkHost, SolrQueryProperties properties, String endValue) {
+ this.zkHost = zkHost;
+ this.properties = properties;
+ this.endValue = endValue;
+ }
+
+ @Override
+ public DocumentIterator open(Document current, int rows) {
+ CloudSolrClient client = new CloudSolrClient.Builder().withZkHost(zkHost).build();
+ client.setDefaultCollection(properties.getCollection());
+
+ SolrQuery query = properties.toQueryBuilder()
+ .setEndValue(endValue)
+ .setDocument(current)
+ .build();
+ query.setRows(rows);
+
+ LOG.info("Executing solr query {}", query.toLocalParamsString());
+
+ try {
+ QueryResponse response = client.query(query);
+ return new SolrDocumentIterator(response, client);
+ } catch (SolrServerException e) {
+ throw new RuntimeException(e);
+ } catch (IOException e) {
+ throw new UncheckedIOException(e);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/ambari/blob/393fdb80/ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/archive/SolrQueryBuilder.java
----------------------------------------------------------------------
diff --git a/ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/archive/SolrQueryBuilder.java b/ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/archive/SolrQueryBuilder.java
new file mode 100644
index 0000000..d0f6d40
--- /dev/null
+++ b/ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/archive/SolrQueryBuilder.java
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.ambari.infra.job.archive;
+
+import org.apache.solr.client.solrj.SolrQuery;
+
+import java.util.HashSet;
+import java.util.Set;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+import static org.apache.solr.client.solrj.SolrQuery.ORDER.asc;
+
+public class SolrQueryBuilder {
+
+ public static final Pattern PARAMETER_PATTERN = Pattern.compile("\\$\\{[a-z]+\\}");
+
+ private String queryText;
+ private String endValue;
+ private String filterQueryText;
+ private Document document;
+ private String[] sortFields;
+
+ public SolrQueryBuilder() {
+ this.queryText = "*:*";
+ }
+
+ public SolrQueryBuilder setQueryText(String queryText) {
+ this.queryText = queryText;
+ return this;
+ }
+
+ public SolrQueryBuilder setEndValue(String endValue) {
+ this.endValue = endValue;
+ return this;
+ }
+
+ public SolrQueryBuilder setFilterQueryText(String filterQueryText) {
+ this.filterQueryText = filterQueryText;
+ return this;
+ }
+
+
+ public SolrQueryBuilder setDocument(Document document) {
+ this.document = document;
+ return this;
+ }
+
+ public SolrQueryBuilder addSort(String... sortBy) {
+ this.sortFields = sortBy;
+ return this;
+ }
+
+ public SolrQuery build() {
+ SolrQuery solrQuery = new SolrQuery();
+
+ String query = queryText;
+ query = setEndValueOn(query);
+
+ solrQuery.setQuery(query);
+
+ if (filterQueryText != null) {
+ String filterQuery = filterQueryText;
+ filterQuery = setEndValueOn(filterQuery);
+
+ Set<String> paramNames = collectParamNames(filterQuery);
+ if (document != null) {
+ for (String parameter : paramNames) {
+ if (document.get(parameter) != null)
+ filterQuery = filterQuery.replace(String.format("${%s}", parameter), document.get(parameter));
+ }
+ }
+
+ if (document == null && paramNames.isEmpty() || document != null && !paramNames.isEmpty())
+ solrQuery.setFilterQueries(filterQuery);
+ }
+
+ if (sortFields != null) {
+ for (String field : sortFields)
+ solrQuery.addSort(field, asc);
+ }
+
+ return solrQuery;
+ }
+
+ private String setEndValueOn(String query) {
+ if (endValue != null)
+ query = query.replace("${end}", endValue);
+ return query;
+ }
+
+ private Set<String> collectParamNames(String filterQuery) {
+ Matcher matcher = PARAMETER_PATTERN.matcher(filterQuery);
+ Set<String> parameters = new HashSet<>();
+ while (matcher.find())
+ parameters.add(matcher.group().replace("${", "").replace("}", ""));
+ return parameters;
+ }
+}
http://git-wip-us.apache.org/repos/asf/ambari/blob/393fdb80/ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/archive/SolrQueryProperties.java
----------------------------------------------------------------------
diff --git a/ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/archive/SolrQueryProperties.java b/ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/archive/SolrQueryProperties.java
new file mode 100644
index 0000000..444a15b
--- /dev/null
+++ b/ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/archive/SolrQueryProperties.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.ambari.infra.job.archive;
+
+import org.hibernate.validator.constraints.NotBlank;
+
+public class SolrQueryProperties {
+ @NotBlank
+ private String collection;
+ @NotBlank
+ private String queryText;
+ private String filterQueryText;
+ private String[] sort;
+
+ public String getCollection() {
+ return collection;
+ }
+
+ public void setCollection(String collection) {
+ this.collection = collection;
+ }
+
+ public String getQueryText() {
+ return queryText;
+ }
+
+ public void setQueryText(String queryText) {
+ this.queryText = queryText;
+ }
+
+ public String getFilterQueryText() {
+ return filterQueryText;
+ }
+
+ public void setFilterQueryText(String filterQueryText) {
+ this.filterQueryText = filterQueryText;
+ }
+
+ public String[] getSort() {
+ return sort;
+ }
+
+ public void setSort(String[] sort) {
+ this.sort = sort;
+ }
+
+ public SolrQueryBuilder toQueryBuilder() {
+ return new SolrQueryBuilder().
+ setQueryText(queryText)
+ .setFilterQueryText(filterQueryText)
+ .addSort(sort);
+ }
+}
http://git-wip-us.apache.org/repos/asf/ambari/blob/393fdb80/ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/archive/TarGzCompressor.java
----------------------------------------------------------------------
diff --git a/ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/archive/TarGzCompressor.java b/ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/archive/TarGzCompressor.java
new file mode 100644
index 0000000..8e34ca9
--- /dev/null
+++ b/ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/archive/TarGzCompressor.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.ambari.infra.job.archive;
+
+import org.apache.commons.compress.archivers.tar.TarArchiveEntry;
+import org.apache.commons.compress.archivers.tar.TarArchiveOutputStream;
+import org.apache.commons.compress.compressors.gzip.GzipCompressorOutputStream;
+import org.apache.commons.io.IOUtils;
+
+import java.io.*;
+
+public class TarGzCompressor implements FileAction {
+ @Override
+ public File perform(File inputFile) {
+ File tarGzFile = new File(inputFile.getParent(), inputFile.getName() + ".tar.gz");
+ try (TarArchiveOutputStream tarArchiveOutputStream = new TarArchiveOutputStream(
+ new GzipCompressorOutputStream(new FileOutputStream(tarGzFile)))) {
+ TarArchiveEntry archiveEntry = new TarArchiveEntry(inputFile.getName());
+ archiveEntry.setSize(inputFile.length());
+ tarArchiveOutputStream.putArchiveEntry(archiveEntry);
+
+ try (FileInputStream fileInputStream = new FileInputStream(inputFile)) {
+ IOUtils.copy(fileInputStream, tarArchiveOutputStream);
+ }
+
+ tarArchiveOutputStream.closeArchiveEntry();
+ }
+ catch (IOException ex) {
+ throw new UncheckedIOException(ex);
+ }
+
+ return tarGzFile;
+ }
+}
http://git-wip-us.apache.org/repos/asf/ambari/blob/393fdb80/ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/manager/JobManager.java
----------------------------------------------------------------------
diff --git a/ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/manager/JobManager.java b/ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/manager/JobManager.java
index fc0a4f7..862119a 100644
--- a/ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/manager/JobManager.java
+++ b/ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/manager/JobManager.java
@@ -18,6 +18,7 @@
*/
package org.apache.ambari.infra.manager;
+import com.google.common.base.Splitter;
import com.google.common.collect.Lists;
import org.apache.ambari.infra.model.ExecutionContextResponse;
import org.apache.ambari.infra.model.JobDetailsResponse;
@@ -28,16 +29,14 @@ import org.apache.ambari.infra.model.JobOperationParams;
import org.apache.ambari.infra.model.StepExecutionContextResponse;
import org.apache.ambari.infra.model.StepExecutionInfoResponse;
import org.apache.ambari.infra.model.StepExecutionProgressResponse;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import org.springframework.batch.admin.history.StepExecutionHistory;
import org.springframework.batch.admin.service.JobService;
import org.springframework.batch.admin.service.NoSuchStepExecutionException;
import org.springframework.batch.admin.web.JobInfo;
import org.springframework.batch.admin.web.StepExecutionProgress;
-import org.springframework.batch.core.JobExecution;
-import org.springframework.batch.core.JobInstance;
-import org.springframework.batch.core.JobParametersBuilder;
-import org.springframework.batch.core.JobParametersInvalidException;
-import org.springframework.batch.core.StepExecution;
+import org.springframework.batch.core.*;
import org.springframework.batch.core.launch.JobExecutionNotRunningException;
import org.springframework.batch.core.launch.JobInstanceAlreadyExistsException;
import org.springframework.batch.core.launch.JobOperator;
@@ -54,7 +53,6 @@ import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
-import java.util.Date;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@@ -64,6 +62,8 @@ import java.util.TimeZone;
@Named
public class JobManager {
+ private static final Logger LOG = LoggerFactory.getLogger(JobManager.class);
+
@Inject
private JobService jobService;
@@ -83,9 +83,14 @@ public class JobManager {
public JobExecutionInfoResponse launchJob(String jobName, String params)
throws JobParametersInvalidException, JobInstanceAlreadyExistsException, NoSuchJobException,
JobExecutionAlreadyRunningException, JobRestartException, JobInstanceAlreadyCompleteException {
- // TODO: handle params
JobParametersBuilder jobParametersBuilder = new JobParametersBuilder();
- jobParametersBuilder.addDate("date", new Date());
+ if (params != null) {
+ LOG.info("Parsing parameters of job {} '{}'", jobName, params);
+ Splitter.on(',')
+ .trimResults()
+ .withKeyValueSeparator(Splitter.on('=').limit(2).trimResults())
+ .split(params).entrySet().forEach(entry -> jobParametersBuilder.addString(entry.getKey(), entry.getValue()));
+ }
return new JobExecutionInfoResponse(jobService.launch(jobName, jobParametersBuilder.toJobParameters()), timeZone);
}
http://git-wip-us.apache.org/repos/asf/ambari/blob/393fdb80/ambari-infra/ambari-infra-manager/src/main/resources/infra-manager.properties
----------------------------------------------------------------------
diff --git a/ambari-infra/ambari-infra-manager/src/main/resources/infra-manager.properties b/ambari-infra/ambari-infra-manager/src/main/resources/infra-manager.properties
index 8162376..7ef70aa 100644
--- a/ambari-infra/ambari-infra-manager/src/main/resources/infra-manager.properties
+++ b/ambari-infra/ambari-infra-manager/src/main/resources/infra-manager.properties
@@ -18,3 +18,15 @@ infra-manager.batch.db.username=admin
infra-manager.batch.db.password=admin
management.security.enabled=false
management.health.solr.enabled=false
+infra-manager.server.data.folder=/tmp
+
+infra-manager.jobs.solr_data_export.zoo_keeper_socket=zookeeper:2181
+infra-manager.jobs.solr_data_export.read_block_size=100
+infra-manager.jobs.solr_data_export.write_block_size=150
+infra-manager.jobs.solr_data_export.file_name_suffix_column=logtime
+infra-manager.jobs.solr_data_export.destination_directory_path=/tmp/ambariInfraManager
+infra-manager.jobs.solr_data_export.query.collection=hadoop_logs
+infra-manager.jobs.solr_data_export.query.query_text=logtime:[* TO "${end}"]
+infra-manager.jobs.solr_data_export.query.filter_query_text=(logtime:"${logtime}" AND id:{"${id}" TO *]) OR logtime:{"${logtime}" TO "${end}"]
+infra-manager.jobs.solr_data_export.query.sort[0]=logtime
+infra-manager.jobs.solr_data_export.query.sort[1]=id
http://git-wip-us.apache.org/repos/asf/ambari/blob/393fdb80/ambari-infra/ambari-infra-manager/src/main/resources/log4j2.xml
----------------------------------------------------------------------
diff --git a/ambari-infra/ambari-infra-manager/src/main/resources/log4j2.xml b/ambari-infra/ambari-infra-manager/src/main/resources/log4j2.xml
index ad1adcd..9737554 100644
--- a/ambari-infra/ambari-infra-manager/src/main/resources/log4j2.xml
+++ b/ambari-infra/ambari-infra-manager/src/main/resources/log4j2.xml
@@ -17,7 +17,7 @@
-->
<Configuration monitorinterval="30" status="info" strict="true">
<Properties>
- <Property name="logging.file">out/infra-manager.log</Property>
+ <Property name="logging.file">target/log/infra-manager.log</Property>
</Properties>
<Appenders>
<Appender type="Console" name="Console">
http://git-wip-us.apache.org/repos/asf/ambari/blob/393fdb80/ambari-infra/ambari-infra-manager/src/test/java/org/apache/ambari/infra/job/archive/DocumentExporterTest.java
----------------------------------------------------------------------
diff --git a/ambari-infra/ambari-infra-manager/src/test/java/org/apache/ambari/infra/job/archive/DocumentExporterTest.java b/ambari-infra/ambari-infra-manager/src/test/java/org/apache/ambari/infra/job/archive/DocumentExporterTest.java
new file mode 100644
index 0000000..88fbff0
--- /dev/null
+++ b/ambari-infra/ambari-infra-manager/src/test/java/org/apache/ambari/infra/job/archive/DocumentExporterTest.java
@@ -0,0 +1,147 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.ambari.infra.job.archive;
+
+import org.easymock.EasyMockRunner;
+import org.easymock.EasyMockSupport;
+import org.easymock.Mock;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.springframework.batch.core.JobExecution;
+import org.springframework.batch.core.StepExecution;
+import org.springframework.batch.core.scope.context.ChunkContext;
+import org.springframework.batch.core.scope.context.StepContext;
+import org.springframework.batch.item.ExecutionContext;
+import org.springframework.batch.item.ItemStreamReader;
+
+import java.io.IOException;
+import java.io.UncheckedIOException;
+import java.util.HashMap;
+
+import static org.easymock.EasyMock.expect;
+import static org.easymock.EasyMock.expectLastCall;
+
+@RunWith(EasyMockRunner.class)
+public class DocumentExporterTest extends EasyMockSupport {
+
+ private DocumentExporter documentExporter;
+ @Mock
+ private ItemStreamReader<Document> reader;
+ @Mock
+ private DocumentDestination documentDestination;
+ @Mock
+ private DocumentItemWriter documentItemWriter;
+ @Mock
+ private DocumentItemWriter documentItemWriter2;
+
+ private ExecutionContext executionContext;
+ private ChunkContext chunkContext;
+ private static final Document DOCUMENT = new Document(new HashMap<String, String>() {{ put("id", "1"); }});
+
+ @Before
+ public void setUp() throws Exception {
+ StepExecution stepExecution = new StepExecution("exportDoc", new JobExecution(1L));
+ chunkContext = new ChunkContext(new StepContext(stepExecution));
+ executionContext = stepExecution.getExecutionContext();
+ documentExporter = new DocumentExporter(reader, documentDestination, 2);
+ }
+
+ @After
+ public void tearDown() throws Exception {
+ verifyAll();
+ }
+
+ @Test
+ public void testNothingToRead() throws Exception {
+ reader.open(executionContext); expectLastCall();
+ expect(reader.read()).andReturn(null);
+ reader.close(); expectLastCall();
+ replayAll();
+
+ documentExporter.execute(null, chunkContext);
+ }
+
+ @Test
+ public void testWriteLessDocumentsThanWriteBlockSize() throws Exception {
+ reader.open(executionContext); expectLastCall();
+ expect(reader.read()).andReturn(DOCUMENT);
+ expect(documentDestination.open(DOCUMENT)).andReturn(documentItemWriter);
+ documentItemWriter.write(DOCUMENT); expectLastCall();
+ expect(reader.read()).andReturn(null);
+ reader.close(); expectLastCall();
+ documentItemWriter.close(); expectLastCall();
+ replayAll();
+
+ documentExporter.execute(null, chunkContext);
+ }
+
+ @Test
+ public void testWriteMoreDocumentsThanWriteBlockSize() throws Exception {
+ Document document2 = new Document(new HashMap<String, String>() {{ put("id", "2"); }});
+ Document document3 = new Document(new HashMap<String, String>() {{ put("id", "3"); }});
+
+ reader.open(executionContext); expectLastCall();
+ expect(reader.read()).andReturn(DOCUMENT);
+ expect(documentDestination.open(DOCUMENT)).andReturn(documentItemWriter);
+ documentItemWriter.write(DOCUMENT); expectLastCall();
+ expect(reader.read()).andReturn(document2);
+ documentItemWriter.write(document2); expectLastCall();
+ expect(reader.read()).andReturn(document3);
+ documentItemWriter.close(); expectLastCall();
+ expect(documentDestination.open(document3)).andReturn(documentItemWriter2);
+ documentItemWriter2.write(document3); expectLastCall();
+ expect(reader.read()).andReturn(null);
+ reader.update(executionContext);
+ reader.close(); expectLastCall();
+ documentItemWriter2.close(); expectLastCall();
+ replayAll();
+
+ documentExporter.execute(null, chunkContext);
+ }
+
+ @Test(expected = IOException.class)
+ public void testReadError() throws Exception {
+ reader.open(executionContext); expectLastCall();
+ expect(reader.read()).andReturn(DOCUMENT);
+ expect(documentDestination.open(DOCUMENT)).andReturn(documentItemWriter);
+ documentItemWriter.write(DOCUMENT); expectLastCall();
+ expect(reader.read()).andThrow(new IOException("TEST"));
+ documentItemWriter.revert(); expectLastCall();
+ reader.close(); expectLastCall();
+ replayAll();
+
+ documentExporter.execute(null, chunkContext);
+ }
+
+ @Test(expected = UncheckedIOException.class)
+ public void testWriteError() throws Exception {
+ reader.open(executionContext); expectLastCall();
+ expect(reader.read()).andReturn(DOCUMENT);
+ expect(documentDestination.open(DOCUMENT)).andReturn(documentItemWriter);
+ documentItemWriter.write(DOCUMENT); expectLastCall().andThrow(new UncheckedIOException(new IOException("TEST")));
+ documentItemWriter.revert(); expectLastCall();
+ reader.close(); expectLastCall();
+ replayAll();
+
+ documentExporter.execute(null, chunkContext);
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ambari/blob/393fdb80/ambari-infra/ambari-infra-manager/src/test/java/org/apache/ambari/infra/job/archive/DocumentItemReaderTest.java
----------------------------------------------------------------------
diff --git a/ambari-infra/ambari-infra-manager/src/test/java/org/apache/ambari/infra/job/archive/DocumentItemReaderTest.java b/ambari-infra/ambari-infra-manager/src/test/java/org/apache/ambari/infra/job/archive/DocumentItemReaderTest.java
new file mode 100644
index 0000000..942713f
--- /dev/null
+++ b/ambari-infra/ambari-infra-manager/src/test/java/org/apache/ambari/infra/job/archive/DocumentItemReaderTest.java
@@ -0,0 +1,197 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.ambari.infra.job.archive;
+
+import org.easymock.EasyMockRunner;
+import org.easymock.EasyMockSupport;
+import org.easymock.Mock;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.springframework.batch.item.ExecutionContext;
+
+import java.util.HashMap;
+
+import static org.easymock.EasyMock.expect;
+import static org.easymock.EasyMock.expectLastCall;
+import static org.hamcrest.core.Is.is;
+import static org.hamcrest.core.IsNull.nullValue;
+import static org.junit.Assert.assertThat;
+
+@RunWith(EasyMockRunner.class)
+public class DocumentItemReaderTest extends EasyMockSupport {
+ private static final Document DOCUMENT = new Document(new HashMap<String, String>() {{ put("id", "1"); }});
+ private static final Document DOCUMENT_2 = new Document(new HashMap<String, String>() {{ put("id", "2"); }});
+ private static final Document DOCUMENT_3 = new Document(new HashMap<String, String>() {{ put("id", "3"); }});
+ private static final int READ_BLOCK_SIZE = 2;
+
+ private DocumentItemReader documentItemReader;
+ @Mock
+ private DocumentSource documentSource;
+ @Mock
+ private DocumentIterator documentIterator;
+ @Mock
+ private DocumentIterator documentIterator2;
+
+ @Before
+ public void setUp() throws Exception {
+ documentItemReader = new DocumentItemReader(documentSource, READ_BLOCK_SIZE);
+ }
+
+ @After
+ public void tearDown() throws Exception {
+ verifyAll();
+ }
+
+ @Test
+ public void testReadWhenCollectionIsEmpty() throws Exception {
+ expect(documentSource.open(null, 2)).andReturn(documentIterator);
+ expect(documentIterator.next()).andReturn(null);
+ documentIterator.close(); expectLastCall();
+ replayAll();
+
+ assertThat(documentItemReader.read(), is(nullValue()));
+ assertThat(documentItemReader.isComplete(null), is(true));
+ assertThat(documentItemReader.isComplete(null, null), is(true));
+ }
+
+ @Test
+ public void testReadWhenCollectionContainsLessElementsThanReadBlockSize() throws Exception {
+ expect(documentSource.open(null, 2)).andReturn(documentIterator);
+ expect(documentIterator.next()).andReturn(DOCUMENT);
+ expect(documentIterator.next()).andReturn(null);
+ documentIterator.close(); expectLastCall();
+ replayAll();
+
+ assertThat(documentItemReader.read(), is(DOCUMENT));
+ assertThat(documentItemReader.isComplete(null), is(false));
+ assertThat(documentItemReader.isComplete(null, null), is(false));
+ assertThat(documentItemReader.read(), is(nullValue()));
+ assertThat(documentItemReader.isComplete(null), is(true));
+ assertThat(documentItemReader.isComplete(null, null), is(true));
+ }
+
+ @Test
+ public void testReadWhenCollectionContainsExactlySameCountElementsAsReadBlockSize() throws Exception {
+ expect(documentSource.open(null, 2)).andReturn(documentIterator);
+ expect(documentSource.open(DOCUMENT_2, 2)).andReturn(documentIterator2);
+ expect(documentIterator.next()).andReturn(DOCUMENT);
+ expect(documentIterator.next()).andReturn(DOCUMENT_2);
+ expect(documentIterator.next()).andReturn(null);
+ documentIterator.close(); expectLastCall();
+
+ expect(documentIterator2.next()).andReturn(null);
+ documentIterator2.close(); expectLastCall();
+ replayAll();
+
+ assertThat(documentItemReader.read(), is(DOCUMENT));
+ assertThat(documentItemReader.isComplete(null), is(false));
+ assertThat(documentItemReader.isComplete(null, null), is(false));
+ assertThat(documentItemReader.read(), is(DOCUMENT_2));
+ assertThat(documentItemReader.isComplete(null), is(false));
+ assertThat(documentItemReader.isComplete(null, null), is(false));
+ assertThat(documentItemReader.read(), is(nullValue()));
+ assertThat(documentItemReader.isComplete(null), is(true));
+ assertThat(documentItemReader.isComplete(null, null), is(true));
+ }
+
+ @Test
+ public void testReadWhenCollectionContainsMoreElementsThanReadBlockSize() throws Exception {
+ Document document3 = new Document(new HashMap<String, String>() {{ put("id", "2"); }});
+
+ expect(documentSource.open(null, 2)).andReturn(documentIterator);
+ expect(documentSource.open(DOCUMENT_2, 2)).andReturn(documentIterator2);
+ expect(documentIterator.next()).andReturn(DOCUMENT);
+ expect(documentIterator.next()).andReturn(DOCUMENT_2);
+ expect(documentIterator.next()).andReturn(null);
+ documentIterator.close(); expectLastCall();
+ expect(documentIterator2.next()).andReturn(document3);
+ expect(documentIterator2.next()).andReturn(null);
+ documentIterator2.close(); expectLastCall();
+
+ replayAll();
+
+ assertThat(documentItemReader.read(), is(DOCUMENT));
+ assertThat(documentItemReader.isComplete(null), is(false));
+ assertThat(documentItemReader.isComplete(null, null), is(false));
+
+ assertThat(documentItemReader.read(), is(DOCUMENT_2));
+ assertThat(documentItemReader.isComplete(null), is(false));
+ assertThat(documentItemReader.isComplete(null, null), is(false));
+
+ assertThat(documentItemReader.read(), is(document3));
+ assertThat(documentItemReader.isComplete(null), is(false));
+ assertThat(documentItemReader.isComplete(null, null), is(false));
+
+ assertThat(documentItemReader.read(), is(nullValue()));
+ assertThat(documentItemReader.isComplete(null), is(true));
+ assertThat(documentItemReader.isComplete(null, null), is(true));
+ }
+
+ @Test
+ public void testContinueWhenOnlyFirstElementWasRead() throws Exception {
+ expect(documentSource.open(null, 2)).andReturn(documentIterator);
+ expect(documentIterator.next()).andReturn(DOCUMENT);
+ documentIterator.close(); expectLastCall();
+ expect(documentSource.open(null, 2)).andReturn(documentIterator2);
+ expect(documentIterator2.next()).andReturn(DOCUMENT);
+ documentIterator2.close(); expectLastCall();
+ replayAll();
+
+ ExecutionContext executionContext = new ExecutionContext();
+ documentItemReader.open(executionContext);
+ assertThat(documentItemReader.read(), is(DOCUMENT));
+ documentItemReader.update(executionContext);
+ assertThat(executionContext.containsKey(DocumentItemReader.POSITION), is(false));
+ documentItemReader.close();
+
+ documentItemReader.open(executionContext);
+ assertThat(documentItemReader.read(), is(DOCUMENT));
+ documentItemReader.close();
+ }
+
+ @Test
+ public void testContinueWhenMoreThanOneElementWasRead() throws Exception {
+ expect(documentSource.open(null, 2)).andReturn(documentIterator);
+ expect(documentIterator.next()).andReturn(DOCUMENT);
+ expect(documentIterator.next()).andReturn(DOCUMENT_2);
+ documentIterator.close(); expectLastCall();
+ expect(documentSource.open(DOCUMENT, 2)).andReturn(documentIterator2);
+ expect(documentIterator2.next()).andReturn(DOCUMENT_2);
+ expect(documentIterator2.next()).andReturn(DOCUMENT_3);
+ documentIterator2.close(); expectLastCall();
+
+ replayAll();
+
+ ExecutionContext executionContext = new ExecutionContext();
+ documentItemReader.open(executionContext);
+ assertThat(documentItemReader.read(), is(DOCUMENT));
+ assertThat(documentItemReader.read(), is(DOCUMENT_2));
+ documentItemReader.update(executionContext);
+ assertThat(executionContext.get(DocumentItemReader.POSITION), is(DOCUMENT));
+ documentItemReader.close();
+
+ documentItemReader.open(executionContext);
+ assertThat(documentItemReader.read(), is(DOCUMENT_2));
+ assertThat(documentItemReader.read(), is(DOCUMENT_3));
+ documentItemReader.close();
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ambari/blob/393fdb80/ambari-infra/ambari-infra-manager/src/test/java/org/apache/ambari/infra/job/archive/LocalDocumentItemWriterTest.java
----------------------------------------------------------------------
diff --git a/ambari-infra/ambari-infra-manager/src/test/java/org/apache/ambari/infra/job/archive/LocalDocumentItemWriterTest.java b/ambari-infra/ambari-infra-manager/src/test/java/org/apache/ambari/infra/job/archive/LocalDocumentItemWriterTest.java
new file mode 100644
index 0000000..6411ff1
--- /dev/null
+++ b/ambari-infra/ambari-infra-manager/src/test/java/org/apache/ambari/infra/job/archive/LocalDocumentItemWriterTest.java
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.ambari.infra.job.archive;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.commons.io.FileUtils;
+import org.easymock.EasyMockRunner;
+import org.easymock.EasyMockSupport;
+import org.easymock.Mock;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+
+import static org.easymock.EasyMock.expect;
+import static org.hamcrest.CoreMatchers.is;
+import static org.junit.Assert.assertThat;
+
+@RunWith(EasyMockRunner.class)
+public class LocalDocumentItemWriterTest extends EasyMockSupport {
+
+ private static final Document DOCUMENT = new Document(new HashMap<String, String>() {{ put("id", "1"); }});
+ private static final Document DOCUMENT2 = new Document(new HashMap<String, String>() {{ put("id", "2"); }});
+ private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
+
+ private LocalDocumentItemWriter localDocumentItemWriter;
+ private File outFile;
+ @Mock
+ private FileAction fileAction;
+
+ @Before
+ public void setUp() throws Exception {
+ outFile = File.createTempFile("LocalDocumentItemWriterTest", "json.tmp");
+ localDocumentItemWriter = new LocalDocumentItemWriter(outFile, fileAction);
+ }
+
+ @After
+ public void tearDown() throws Exception {
+ outFile.delete();
+ verifyAll();
+ }
+
+ @Test
+ public void testWrite() throws Exception {
+ expect(fileAction.perform(outFile)).andReturn(outFile);
+ replayAll();
+
+ localDocumentItemWriter.write(DOCUMENT);
+ localDocumentItemWriter.write(DOCUMENT2);
+ localDocumentItemWriter.close();
+
+ List<Document> documentList = readBack(outFile);
+ assertThat(documentList.size(), is(2));
+ assertThat(documentList.get(0).get("id"), is(DOCUMENT.get("id")));
+ assertThat(documentList.get(1).get("id"), is(DOCUMENT2.get("id")));
+ }
+
+ private List<Document> readBack(File file) throws IOException {
+ List<Document> documentList = new ArrayList<>();
+ for (String line : FileUtils.readLines(file)) {
+ documentList.add(OBJECT_MAPPER.readValue(line, Document.class));
+ }
+ return documentList;
+ }
+
+ @Test
+ public void testRevert() throws Exception {
+ replayAll();
+
+ localDocumentItemWriter.write(DOCUMENT);
+ localDocumentItemWriter.revert();
+
+ assertThat(outFile.exists(), is(false));
+ }
+}
\ No newline at end of file