You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ambari.apache.org by ol...@apache.org on 2018/01/26 10:22:17 UTC
[ambari] branch trunk updated: AMBARI-22799 - define scheduling of
archiving Infra Solr Documents
This is an automated email from the ASF dual-hosted git repository.
oleewere pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/ambari.git
The following commit(s) were added to refs/heads/trunk by this push:
new 19a8e75 AMBARI-22799 - define scheduling of archiving Infra Solr Documents
19a8e75 is described below
commit 19a8e75d3887d9354a3b14a36c362f73d962e9bf
Author: kkasa <ka...@gmail.com>
AuthorDate: Tue Jan 16 16:13:24 2018 +0100
AMBARI-22799 - define scheduling of archiving Infra Solr Documents
---
.../org/apache/ambari/infra/HttpResponse.java} | 20 +++-
.../java/org/apache/ambari/infra/InfraClient.java | 32 ++++--
.../org/apache/ambari/infra/JobExecutionInfo.java} | 28 ++++-
.../ambari/infra/steps/AbstractInfraSteps.java | 12 ++-
.../apache/ambari/infra/steps/ExportJobsSteps.java | 66 +++++++++---
.../test/resources/stories/infra_api_tests.story | 26 +++--
.../ambari-infra-manager/docs/api/swagger.yaml | 2 +-
.../InfraManagerSchedulingConfig.java} | 15 ++-
.../infra/conf/batch/InfraManagerBatchConfig.java | 25 +++--
.../infra/job/AbstractJobsConfiguration.java | 78 ++++++++++++++
...ertyMap.java => JobConfigurationException.java} | 8 +-
...{PropertyMap.java => JobContextRepository.java} | 7 +-
.../ambari/infra/job/JobContextRepositoryImpl.java | 52 ++++++++++
.../org/apache/ambari/infra/job/JobProperties.java | 35 +++++++
.../org/apache/ambari/infra/job/JobScheduler.java | 89 ++++++++++++++++
.../{JobPropertyMap.java => JobsPropertyMap.java} | 12 ++-
...{PropertyMap.java => SchedulingProperties.java} | 30 +++++-
.../archive/DocumentArchivingConfiguration.java | 59 +++++------
.../job/archive/DocumentArchivingProperties.java | 1 +
.../job/archive/DocumentArchivingPropertyMap.java | 8 +-
.../ambari/infra/job/archive/DocumentExporter.java | 22 +++-
.../infra/job/archive/FileNameSuffixFormatter.java | 2 +-
.../deleting/DocumentDeletingConfiguration.java | 53 ++++------
.../job/deleting/DocumentDeletingPropertyMap.java | 8 +-
.../apache/ambari/infra/manager/JobManager.java | 64 +++++++-----
.../java/org/apache/ambari/infra/manager/Jobs.java | 42 ++++++++
.../org/apache/ambari/infra/rest/JobResource.java | 28 +++--
.../src/main/resources/infra-manager.properties | 8 +-
.../src/main/resources/log4j2.xml | 3 +
.../apache/ambari/infra/job/JobSchedulerTest.java | 114 +++++++++++++++++++++
.../infra/job/archive/DocumentExporterTest.java | 110 ++++++++++++++++----
.../vagrant-infra-manager.properties.sample | 30 +++---
32 files changed, 863 insertions(+), 226 deletions(-)
diff --git a/ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/PropertyMap.java b/ambari-infra/ambari-infra-manager-it/src/test/java/org/apache/ambari/infra/HttpResponse.java
similarity index 72%
copy from ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/PropertyMap.java
copy to ambari-infra/ambari-infra-manager-it/src/test/java/org/apache/ambari/infra/HttpResponse.java
index e7b9e77..3d8711b 100644
--- a/ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/PropertyMap.java
+++ b/ambari-infra/ambari-infra-manager-it/src/test/java/org/apache/ambari/infra/HttpResponse.java
@@ -16,10 +16,22 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.ambari.infra.job;
+package org.apache.ambari.infra;
-import java.util.Map;
+public class HttpResponse {
+ private final int code;
+ private final String body;
-public interface PropertyMap<T extends JobProperties<T>> {
- Map<String, T> getPropertyMap();
+ public HttpResponse(int code, String body) {
+ this.code = code;
+ this.body = body;
+ }
+
+ public int getCode() {
+ return code;
+ }
+
+ public String getBody() {
+ return body;
+ }
}
diff --git a/ambari-infra/ambari-infra-manager-it/src/test/java/org/apache/ambari/infra/InfraClient.java b/ambari-infra/ambari-infra-manager-it/src/test/java/org/apache/ambari/infra/InfraClient.java
index b798ce1..0118c76 100644
--- a/ambari-infra/ambari-infra-manager-it/src/test/java/org/apache/ambari/infra/InfraClient.java
+++ b/ambari-infra/ambari-infra-manager-it/src/test/java/org/apache/ambari/infra/InfraClient.java
@@ -25,6 +25,7 @@ import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.commons.io.IOUtils;
import org.apache.http.client.ClientProtocolException;
import org.apache.http.client.methods.CloseableHttpResponse;
+import org.apache.http.client.methods.HttpDelete;
import org.apache.http.client.methods.HttpGet;
import org.apache.http.client.methods.HttpPost;
import org.apache.http.client.methods.HttpRequestBase;
@@ -71,11 +72,14 @@ public class InfraClient implements AutoCloseable {
execute(new HttpGet(baseUrl));
}
- private String execute(HttpRequestBase post) {
+ private HttpResponse execute(HttpRequestBase post) {
try (CloseableHttpResponse response = httpClient.execute(post)) {
String responseBodyText = IOUtils.toString(response.getEntity().getContent(), Charset.defaultCharset());
- LOG.info("Response code {} body {} ", response.getStatusLine().getStatusCode(), responseBodyText);
- return responseBodyText;
+ int statusCode = response.getStatusLine().getStatusCode();
+ LOG.info("Response code {} body {} ", statusCode, responseBodyText);
+ if (!(200 <= statusCode && statusCode <= 299))
+ throw new RuntimeException("Error while executing http request: " + responseBodyText);
+ return new HttpResponse(statusCode, responseBodyText);
} catch (ClientProtocolException e) {
throw new RuntimeException(e);
} catch (IOException e) {
@@ -83,16 +87,16 @@ public class InfraClient implements AutoCloseable {
}
}
- public String startJob(String jobName, String parameters) {
+ public JobExecutionInfo startJob(String jobName, String parameters) {
URIBuilder uriBuilder = new URIBuilder(baseUrl);
uriBuilder.setScheme("http");
uriBuilder.setPath(uriBuilder.getPath() + "/" + jobName);
if (!isBlank(parameters))
uriBuilder.addParameter("params", parameters);
try {
- String responseText = execute(new HttpPost(uriBuilder.build()));
+ String responseText = execute(new HttpPost(uriBuilder.build())).getBody();
Map<String, Object> responseContent = new ObjectMapper().readValue(responseText, new TypeReference<HashMap<String,Object>>() {});
- return responseContent.get("jobId").toString();
+ return new JobExecutionInfo(responseContent.get("jobId").toString(), ((Map)responseContent.get("jobExecutionData")).get("id").toString());
} catch (URISyntaxException | JsonParseException | JsonMappingException e) {
throw new RuntimeException(e);
} catch (IOException e) {
@@ -106,7 +110,21 @@ public class InfraClient implements AutoCloseable {
uriBuilder.setPath(String.format("%s/%s/%s/executions", uriBuilder.getPath(), jobName, jobId));
uriBuilder.addParameter("operation", "RESTART");
try {
- execute(new HttpPost(uriBuilder.build()));
+ HttpResponse httpResponse = execute(new HttpPost(uriBuilder.build()));
+ if (httpResponse.getCode() != 200)
+ throw new RuntimeException(httpResponse.getBody());
+ } catch (URISyntaxException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ public void stopJob(String jobExecutionId) {
+ URIBuilder uriBuilder = new URIBuilder(baseUrl);
+ uriBuilder.setScheme("http");
+ uriBuilder.setPath(String.format("%s/executions/%s", uriBuilder.getPath(), jobExecutionId));
+ uriBuilder.addParameter("operation", "STOP");
+ try {
+ execute(new HttpDelete(uriBuilder.build()));
} catch (URISyntaxException e) {
throw new RuntimeException(e);
}
diff --git a/ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/PropertyMap.java b/ambari-infra/ambari-infra-manager-it/src/test/java/org/apache/ambari/infra/JobExecutionInfo.java
similarity index 59%
copy from ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/PropertyMap.java
copy to ambari-infra/ambari-infra-manager-it/src/test/java/org/apache/ambari/infra/JobExecutionInfo.java
index e7b9e77..92b7834 100644
--- a/ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/PropertyMap.java
+++ b/ambari-infra/ambari-infra-manager-it/src/test/java/org/apache/ambari/infra/JobExecutionInfo.java
@@ -16,10 +16,30 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.ambari.infra.job;
+package org.apache.ambari.infra;
-import java.util.Map;
+public class JobExecutionInfo {
+ private final String jobId;
+ private final String executionId;
-public interface PropertyMap<T extends JobProperties<T>> {
- Map<String, T> getPropertyMap();
+ public JobExecutionInfo(String jobId, String executionId) {
+ this.jobId = jobId;
+ this.executionId = executionId;
+ }
+
+ public String getJobId() {
+ return jobId;
+ }
+
+ public String getExecutionId() {
+ return executionId;
+ }
+
+ @Override
+ public String toString() {
+ return "JobExecutionInfo{" +
+ "jobId='" + jobId + '\'' +
+ ", executionId='" + executionId + '\'' +
+ '}';
+ }
}
diff --git a/ambari-infra/ambari-infra-manager-it/src/test/java/org/apache/ambari/infra/steps/AbstractInfraSteps.java b/ambari-infra/ambari-infra-manager-it/src/test/java/org/apache/ambari/infra/steps/AbstractInfraSteps.java
index ece1c59..fb8a100 100644
--- a/ambari-infra/ambari-infra-manager-it/src/test/java/org/apache/ambari/infra/steps/AbstractInfraSteps.java
+++ b/ambari-infra/ambari-infra-manager-it/src/test/java/org/apache/ambari/infra/steps/AbstractInfraSteps.java
@@ -64,6 +64,7 @@ public abstract class AbstractInfraSteps {
private static final int FAKE_S3_PORT = 4569;
private static final int HDFS_PORT = 9000;
private static final String AUDIT_LOGS_COLLECTION = "audit_logs";
+ private static final String HADOOP_LOGS_COLLECTION = "hadoop_logs";
protected static final String S3_BUCKET_NAME = "testbucket";
private String ambariFolder;
private String shellScriptLocation;
@@ -111,8 +112,8 @@ public abstract class AbstractInfraSteps {
SOLR_PORT,
AUDIT_LOGS_COLLECTION)).build();
- LOG.info("Creating collection");
- runCommand(new String[]{"docker", "exec", "docker_solr_1", "solr", "create_collection", "-c", AUDIT_LOGS_COLLECTION, "-d", "configsets/"+ AUDIT_LOGS_COLLECTION +"/conf", "-n", AUDIT_LOGS_COLLECTION + "_conf"});
+ createSolrCollection(AUDIT_LOGS_COLLECTION);
+ createSolrCollection(HADOOP_LOGS_COLLECTION);
LOG.info("Initializing s3 client");
s3client = new AmazonS3Client(new BasicAWSCredentials("remote-identity", "remote-credential"));
@@ -122,6 +123,11 @@ public abstract class AbstractInfraSteps {
checkInfraManagerReachable();
}
+ private void createSolrCollection(String collectionName) {
+ LOG.info("Creating collection");
+ runCommand(new String[]{"docker", "exec", "docker_solr_1", "solr", "create_collection", "-c", collectionName, "-d", "configsets/"+ collectionName +"/conf", "-n", collectionName + "_conf"});
+ }
+
private void runCommand(String[] command) {
try {
LOG.info("Exec command: {}", StringUtils.join(command, " "));
@@ -146,7 +152,7 @@ public abstract class AbstractInfraSteps {
});
}
- private void doWithin(int sec, String actionName, Runnable runnable) {
+ protected void doWithin(int sec, String actionName, Runnable runnable) {
long start = currentTimeMillis();
Exception exception;
while (true) {
diff --git a/ambari-infra/ambari-infra-manager-it/src/test/java/org/apache/ambari/infra/steps/ExportJobsSteps.java b/ambari-infra/ambari-infra-manager-it/src/test/java/org/apache/ambari/infra/steps/ExportJobsSteps.java
index 7e54a31..d4224c6 100644
--- a/ambari-infra/ambari-infra-manager-it/src/test/java/org/apache/ambari/infra/steps/ExportJobsSteps.java
+++ b/ambari-infra/ambari-infra-manager-it/src/test/java/org/apache/ambari/infra/steps/ExportJobsSteps.java
@@ -23,6 +23,7 @@ import com.amazonaws.services.s3.model.ListObjectsRequest;
import com.amazonaws.services.s3.model.ObjectListing;
import com.amazonaws.services.s3.model.ObjectMetadata;
import org.apache.ambari.infra.InfraClient;
+import org.apache.ambari.infra.JobExecutionInfo;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.LocatedFileStatus;
import org.apache.hadoop.fs.Path;
@@ -56,7 +57,7 @@ import static org.junit.Assert.assertThat;
public class ExportJobsSteps extends AbstractInfraSteps {
private static final Logger LOG = LoggerFactory.getLogger(ExportJobsSteps.class);
- private Map<String, String> launchedJobs = new HashMap<>();
+ private Map<String, JobExecutionInfo> launchedJobs = new HashMap<>();
@Given("$count documents in solr")
public void addDocuments(int count) throws Exception {
@@ -85,22 +86,39 @@ public class ExportJobsSteps extends AbstractInfraSteps {
@When("start $jobName job")
public void startJob(String jobName) throws Exception {
- startJob(jobName, null);
+ startJob(jobName, null, 0);
}
- @When("start $jobName job with parameters $parameters")
- public void startJob(String jobName, String parameters) throws Exception {
+ @When("start $jobName job with parameters $parameters after $waitSec seconds")
+ public void startJob(String jobName, String parameters, int waitSec) throws Exception {
+ Thread.sleep(waitSec * 1000);
try (InfraClient httpClient = getInfraClient()) {
- String jobId = httpClient.startJob(jobName, parameters);
- LOG.info("Job {} started jobId: {}", jobName, jobId);
- launchedJobs.put(jobName, jobId);
+ JobExecutionInfo jobExecutionInfo = httpClient.startJob(jobName, parameters);
+ LOG.info("Job {} started: {}", jobName, jobExecutionInfo);
+ launchedJobs.put(jobName, jobExecutionInfo);
}
}
- @When("restart $jobName job")
- public void restartJob(String jobName) throws Exception {
+ @When("restart $jobName job within $waitSec seconds")
+ public void restartJob(String jobName, int waitSec) {
+ doWithin(waitSec, "Restarting job " + jobName, () -> {
+ try (InfraClient httpClient = getInfraClient()) {
+ httpClient.restartJob(jobName, launchedJobs.get(jobName).getJobId());
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ });
+ }
+
+ @When("stop job $jobName after at least $count file exists in s3 with filename containing text $text within $waitSec seconds")
+ public void stopJob(String jobName, int count, String text, int waitSec) throws Exception {
+ AmazonS3Client s3Client = getS3client();
+ ListObjectsRequest listObjectsRequest = new ListObjectsRequest().withBucketName(S3_BUCKET_NAME);
+ doWithin(waitSec, "check uploaded files to s3", () -> s3Client.doesBucketExist(S3_BUCKET_NAME)
+ && fileCountOnS3(text, s3Client, listObjectsRequest) > count);
+
try (InfraClient httpClient = getInfraClient()) {
- httpClient.restartJob(jobName, launchedJobs.get(jobName));
+ httpClient.stopJob(launchedJobs.get(jobName).getExecutionId());
}
}
@@ -125,9 +143,25 @@ public class ExportJobsSteps extends AbstractInfraSteps {
AmazonS3Client s3Client = getS3client();
ListObjectsRequest listObjectsRequest = new ListObjectsRequest().withBucketName(S3_BUCKET_NAME);
doWithin(waitSec, "check uploaded files to s3", () -> s3Client.doesBucketExist(S3_BUCKET_NAME)
- && s3Client.listObjects(listObjectsRequest).getObjectSummaries().stream()
- .filter(s3ObjectSummary -> s3ObjectSummary.getKey().contains(text))
- .count() == count);
+ && fileCountOnS3(text, s3Client, listObjectsRequest) == count);
+ }
+
+ private long fileCountOnS3(String text, AmazonS3Client s3Client, ListObjectsRequest listObjectsRequest) {
+ return s3Client.listObjects(listObjectsRequest).getObjectSummaries().stream()
+ .filter(s3ObjectSummary -> s3ObjectSummary.getKey().contains(text))
+ .count();
+ }
+
+ @Then("Less than $count files exists on s3 server with filenames containing the text $text after $waitSec seconds")
+ public void checkLessThanFileExistsOnS3(long count, String text, int waitSec) {
+ AmazonS3Client s3Client = getS3client();
+ ListObjectsRequest listObjectsRequest = new ListObjectsRequest().withBucketName(S3_BUCKET_NAME);
+ doWithin(waitSec, "check uploaded files to s3", () -> s3Client.doesBucketExist(S3_BUCKET_NAME) && between(
+ fileCountOnS3(text, s3Client, listObjectsRequest), 1L, count - 1L));
+ }
+
+ private boolean between(long count, long from, long to) {
+ return from <= count && count <= to;
}
@Then("No file exists on s3 server with filenames containing the text $text")
@@ -184,9 +218,9 @@ public class ExportJobsSteps extends AbstractInfraSteps {
}
}
- @Then("Check $count files exists on local filesystem with filenames containing the text $text in the folder $path")
- public void checkNumberOfFilesOnLocalFilesystem(long count, String text, String path) {
- File destinationDirectory = new File(getLocalDataFolder(), path);
+ @Then("Check $count files exists on local filesystem with filenames containing the text $text in the folder $path for job $jobName")
+ public void checkNumberOfFilesOnLocalFilesystem(long count, String text, String path, String jobName) {
+ File destinationDirectory = new File(getLocalDataFolder(), path.replace("${jobId}", launchedJobs.get(jobName).getJobId()));
LOG.info("Destination directory path: {}", destinationDirectory.getAbsolutePath());
doWithin(5, "Destination directory exists", destinationDirectory::exists);
diff --git a/ambari-infra/ambari-infra-manager-it/src/test/resources/stories/infra_api_tests.story b/ambari-infra/ambari-infra-manager-it/src/test/resources/stories/infra_api_tests.story
index d1eb4a4..122a634 100644
--- a/ambari-infra/ambari-infra-manager-it/src/test/resources/stories/infra_api_tests.story
+++ b/ambari-infra/ambari-infra-manager-it/src/test/resources/stories/infra_api_tests.story
@@ -8,7 +8,7 @@ Then Check filenames contains the text audit_logs on s3 server after 20 seconds
Scenario: Exporting 10 documents using writeBlockSize=3 produces 4 files
Given 10 documents in solr with logtime from 2010-10-09T05:00:00.000Z to 2010-10-09T20:00:00.000Z
-When start archive_audit_logs job with parameters writeBlockSize=3,start=2010-10-09T00:00:00.000Z,end=2010-10-11T00:00:00.000Z
+When start archive_audit_logs job with parameters writeBlockSize=3,start=2010-10-09T00:00:00.000Z,end=2010-10-11T00:00:00.000Z after 2 seconds
Then Check 4 files exists on s3 server with filenames containing the text solr_archive_audit_logs_-_2010-10-09 after 20 seconds
And solr does not contain documents between 2010-10-09T05:00:00.000Z and 2010-10-09T20:00:00.000Z after 5 seconds
@@ -16,7 +16,7 @@ And solr does not contain documents between 2010-10-09T05:00:00.000Z and 2010-10
Scenario: Running archiving job with a bigger start value than end value exports and deletes 0 documents
Given 10 documents in solr with logtime from 2010-01-01T05:00:00.000Z to 2010-01-04T05:00:00.000Z
-When start archive_audit_logs job with parameters writeBlockSize=3,start=2010-01-03T05:00:00.000Z,end=2010-01-02T05:00:00.000Z
+When start archive_audit_logs job with parameters writeBlockSize=3,start=2010-01-03T05:00:00.000Z,end=2010-01-02T05:00:00.000Z after 2 seconds
Then No file exists on s3 server with filenames containing the text solr_archive_audit_logs_-_2010-01-0
And solr contains 10 documents between 2010-01-01T05:00:00.000Z and 2010-01-04T05:00:00.000Z
@@ -25,11 +25,11 @@ Scenario: Archiving job fails when part of the data is exported. After resolving
Given 200 documents in solr with logtime from 2011-10-09T05:00:00.000Z to 2011-10-09T20:00:00.000Z
And a file on s3 with key solr_archive_audit_logs_-_2011-10-09T08-00-00.000Z.json.tar.gz
-When start archive_audit_logs job with parameters writeBlockSize=20,start=2010-11-09T00:00:00.000Z,end=2011-10-11T00:00:00.000Z
+When start archive_audit_logs job with parameters writeBlockSize=20,start=2010-11-09T00:00:00.000Z,end=2011-10-11T00:00:00.000Z after 2 seconds
Then Check 3 files exists on s3 server with filenames containing the text solr_archive_audit_logs_-_2011-10-09 after 20 seconds
And solr does not contain documents between 2011-10-09T05:00:00.000Z and 2011-10-09T07:59:59.999Z after 5 seconds
When delete file with key solr_archive_audit_logs_-_2011-10-09T08-00-00.000Z.json.tar.gz from s3
-And restart archive_audit_logs job
+And restart archive_audit_logs job within 2 seconds
Then Check 10 files exists on s3 server with filenames containing the text solr_archive_audit_logs_-_2011-10-09 after 20 seconds
And solr does not contain documents between 2011-10-09T05:00:00.000Z and 2011-10-09T20:00:00.000Z after 5 seconds
@@ -37,14 +37,14 @@ And solr does not contain documents between 2011-10-09T05:00:00.000Z and 2011-10
Scenario: After Deleting job deletes documents from solr no document found in the specified interval
Given 10 documents in solr with logtime from 2012-10-09T05:00:00.000Z to 2012-10-09T20:00:00.000Z
-When start delete_audit_logs job with parameters start=2012-10-09T05:00:00.000Z,end=2012-10-09T20:00:00.000Z
+When start delete_audit_logs job with parameters start=2012-10-09T05:00:00.000Z,end=2012-10-09T20:00:00.000Z after 2 seconds
Then solr does not contain documents between 2012-10-09T05:00:00.000Z and 2012-10-09T20:00:00.000Z after 5 seconds
Scenario: Archiving documents to hdfs
Given 1000 documents in solr with logtime from 2014-01-04T05:00:00.000Z to 2014-01-06T20:00:00.000Z
-When start archive_audit_logs job with parameters start=2014-01-04T05:00:00.000Z,end=2014-01-06T20:00:00.000Z,destination=HDFS
+When start archive_audit_logs job with parameters start=2014-01-04T05:00:00.000Z,end=2014-01-06T20:00:00.000Z,destination=HDFS after 2 seconds
Then Check 7 files exists on hdfs with filenames containing the text audit_logs_-_2014-01-0 in the folder /test_audit_logs after 10 seconds
And solr does not contain documents between 2014-01-04T05:00:00.000Z and 2014-01-06T20:00:00.000Z after 10 seconds
@@ -52,6 +52,16 @@ And solr does not contain documents between 2014-01-04T05:00:00.000Z and 2014-01
Scenario: Archiving documents to local filesystem
Given 200 documents in solr with logtime from 2014-02-04T05:00:00.000Z to 2014-02-06T20:00:00.000Z
-When start archive_audit_logs job with parameters start=2014-02-04T05:00:00.000Z,end=2014-02-06T20:00:00.000Z,destination=LOCAL,localDestinationDirectory=/root/archive
-Then Check 2 files exists on local filesystem with filenames containing the text audit_logs_-_2014-02-0 in the folder audit_logs_8_2014-02-06T20-00-00.000Z
+When start archive_audit_logs job with parameters start=2014-02-04T05:00:00.000Z,end=2014-02-06T20:00:00.000Z,destination=LOCAL,localDestinationDirectory=/root/archive after 2 seconds
+Then Check 2 files exists on local filesystem with filenames containing the text audit_logs_-_2014-02-0 in the folder audit_logs_${jobId}_2014-02-06T20-00-00.000Z for job archive_audit_logs
And solr does not contain documents between 2014-02-04T05:00:00.000Z and 2014-02-06T20:00:00.000Z after 10 seconds
+
+
+Scenario: Launch Archiving job. Initiate stop and check that part of the data is archived. After restart all data must be extracted.
+
+Given 200 documents in solr with logtime from 2014-03-09T05:00:00.000Z to 2014-03-09T20:00:00.000Z
+When start archive_audit_logs job with parameters writeBlockSize=20,start=2014-03-09T05:00:00.000Z,end=2014-03-09T20:00:00.000Z after 2 seconds
+And stop job archive_audit_logs after at least 1 file exists in s3 with filename containing text solr_archive_audit_logs_-_2014-03-09 within 10 seconds
+Then Less than 10 files exists on s3 server with filenames containing the text solr_archive_audit_logs_-_2014-03-09 after 20 seconds
+When restart archive_audit_logs job within 10 seconds
+Then Check 10 files exists on s3 server with filenames containing the text solr_archive_audit_logs_-_2014-03-09 after 20 seconds
diff --git a/ambari-infra/ambari-infra-manager/docs/api/swagger.yaml b/ambari-infra/ambari-infra-manager/docs/api/swagger.yaml
index 824629f..6fad22d 100644
--- a/ambari-infra/ambari-infra-manager/docs/api/swagger.yaml
+++ b/ambari-infra/ambari-infra-manager/docs/api/swagger.yaml
@@ -65,7 +65,7 @@ paths:
- "jobs"
summary: "Get job and step details for job execution instance."
description: ""
- operationId: "getExectionInfo"
+ operationId: "getExecutionInfo"
produces:
- "application/json"
parameters:
diff --git a/ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/PropertyMap.java b/ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/conf/InfraManagerSchedulingConfig.java
similarity index 65%
copy from ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/PropertyMap.java
copy to ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/conf/InfraManagerSchedulingConfig.java
index e7b9e77..bb495a2 100644
--- a/ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/PropertyMap.java
+++ b/ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/conf/InfraManagerSchedulingConfig.java
@@ -16,10 +16,17 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.ambari.infra.job;
+package org.apache.ambari.infra.conf;
-import java.util.Map;
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
+import org.springframework.scheduling.TaskScheduler;
+import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler;
-public interface PropertyMap<T extends JobProperties<T>> {
- Map<String, T> getPropertyMap();
+@Configuration
+public class InfraManagerSchedulingConfig {
+ @Bean
+ public TaskScheduler taskScheduler() {
+ return new ThreadPoolTaskScheduler();
+ }
}
diff --git a/ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/conf/batch/InfraManagerBatchConfig.java b/ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/conf/batch/InfraManagerBatchConfig.java
index b1169b4..706ed8b 100644
--- a/ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/conf/batch/InfraManagerBatchConfig.java
+++ b/ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/conf/batch/InfraManagerBatchConfig.java
@@ -30,6 +30,7 @@ import org.springframework.batch.core.configuration.JobRegistry;
import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;
import org.springframework.batch.core.configuration.support.JobRegistryBeanPostProcessor;
import org.springframework.batch.core.explore.JobExplorer;
+import org.springframework.batch.core.explore.support.JobExplorerFactoryBean;
import org.springframework.batch.core.launch.JobLauncher;
import org.springframework.batch.core.launch.JobOperator;
import org.springframework.batch.core.launch.support.SimpleJobLauncher;
@@ -56,7 +57,6 @@ import org.springframework.transaction.PlatformTransactionManager;
import javax.inject.Inject;
import javax.sql.DataSource;
-import java.net.MalformedURLException;
@Configuration
@EnableBatchProcessing
@@ -85,9 +85,6 @@ public class InfraManagerBatchConfig {
@Inject
private JobRegistry jobRegistry;
- @Inject
- private JobExplorer jobExplorer;
-
@Bean
public DataSource dataSource() {
DriverManagerDataSource dataSource = new DriverManagerDataSource();
@@ -99,8 +96,7 @@ public class InfraManagerBatchConfig {
}
@Bean
- public DataSourceInitializer dataSourceInitializer(DataSource dataSource)
- throws MalformedURLException {
+ public DataSourceInitializer dataSourceInitializer() {
ResourceDatabasePopulator databasePopulator = new ResourceDatabasePopulator();
if (dropDatabaseOnStartup) {
databasePopulator.addScript(dropRepositoryTables);
@@ -110,7 +106,7 @@ public class InfraManagerBatchConfig {
databasePopulator.setContinueOnError(true);
DataSourceInitializer initializer = new DataSourceInitializer();
- initializer.setDataSource(dataSource);
+ initializer.setDataSource(dataSource());
initializer.setDatabasePopulator(databasePopulator);
return initializer;
@@ -125,14 +121,14 @@ public class InfraManagerBatchConfig {
public JobRepository jobRepository() throws Exception {
JobRepositoryFactoryBean factory = new JobRepositoryFactoryBean();
factory.setDataSource(dataSource());
- factory.setTransactionManager(getTransactionManager());
+ factory.setTransactionManager(transactionManager());
factory.setSerializer(executionContextSerializer());
factory.afterPropertiesSet();
return factory.getObject();
}
@Bean
- public PlatformTransactionManager getTransactionManager() {
+ public PlatformTransactionManager transactionManager() {
return new ResourcelessTransactionManager();
}
@@ -148,7 +144,7 @@ public class InfraManagerBatchConfig {
@Bean
public JobOperator jobOperator() throws Exception {
SimpleJobOperator jobOperator = new SimpleJobOperator();
- jobOperator.setJobExplorer(jobExplorer);
+ jobOperator.setJobExplorer(jobExplorer());
jobOperator.setJobLauncher(jobLauncher());
jobOperator.setJobRegistry(jobRegistry);
jobOperator.setJobRepository(jobRepository());
@@ -156,6 +152,15 @@ public class InfraManagerBatchConfig {
}
@Bean
+ public JobExplorer jobExplorer() throws Exception {
+ JobExplorerFactoryBean factoryBean = new JobExplorerFactoryBean();
+ factoryBean.setSerializer(executionContextSerializer());
+ factoryBean.setDataSource(dataSource());
+ factoryBean.afterPropertiesSet();
+ return factoryBean.getObject();
+ }
+
+ @Bean
public JobRegistryBeanPostProcessor jobRegistryBeanPostProcessor() {
JobRegistryBeanPostProcessor jobRegistryBeanPostProcessor = new JobRegistryBeanPostProcessor();
jobRegistryBeanPostProcessor.setJobRegistry(jobRegistry);
diff --git a/ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/AbstractJobsConfiguration.java b/ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/AbstractJobsConfiguration.java
new file mode 100644
index 0000000..a57d0e0
--- /dev/null
+++ b/ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/AbstractJobsConfiguration.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.ambari.infra.job;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.batch.core.Job;
+import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
+import org.springframework.batch.core.configuration.support.JobRegistryBeanPostProcessor;
+import org.springframework.batch.core.job.builder.JobBuilder;
+import org.springframework.boot.context.event.ApplicationReadyEvent;
+import org.springframework.context.event.EventListener;
+
+import javax.annotation.PostConstruct;
+import java.util.Map;
+
+public abstract class AbstractJobsConfiguration<T extends JobProperties<T>> {
+ private static final Logger LOG = LoggerFactory.getLogger(AbstractJobsConfiguration.class);
+
+ private final Map<String, T> propertyMap;
+ private final JobScheduler scheduler;
+ private final JobBuilderFactory jobs;
+ private final JobRegistryBeanPostProcessor jobRegistryBeanPostProcessor;
+
+ protected AbstractJobsConfiguration(Map<String, T> propertyMap, JobScheduler scheduler, JobBuilderFactory jobs, JobRegistryBeanPostProcessor jobRegistryBeanPostProcessor) {
+ this.propertyMap = propertyMap;
+ this.scheduler = scheduler;
+ this.jobs = jobs;
+ this.jobRegistryBeanPostProcessor = jobRegistryBeanPostProcessor;
+ }
+
+ @PostConstruct
+ public void registerJobs() {
+ if (propertyMap == null)
+ return;
+
+ for (String jobName : propertyMap.keySet())
+ propertyMap.get(jobName).validate(jobName);
+
+ propertyMap.keySet().stream()
+ .filter(key -> propertyMap.get(key).isEnabled())
+ .forEach(jobName -> {
+ LOG.info("Registering job {}", jobName);
+ JobBuilder jobBuilder = jobs.get(jobName).listener(new JobsPropertyMap<>(propertyMap));
+ Job job = buildJob(jobBuilder);
+ jobRegistryBeanPostProcessor.postProcessAfterInitialization(job, jobName);
+ });
+ }
+
+ @EventListener(ApplicationReadyEvent.class)
+ public void scheduleJobs() {
+ if (propertyMap == null)
+ return;
+
+ propertyMap.keySet().stream()
+ .filter(key -> propertyMap.get(key).isEnabled())
+ .forEach(jobName -> propertyMap.get(jobName).scheduling().ifPresent(
+ schedulingProperties -> scheduler.schedule(jobName, schedulingProperties)));
+ }
+
+ protected abstract Job buildJob(JobBuilder jobBuilder);
+}
diff --git a/ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/PropertyMap.java b/ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/JobConfigurationException.java
similarity index 84%
copy from ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/PropertyMap.java
copy to ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/JobConfigurationException.java
index e7b9e77..8c16daa 100644
--- a/ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/PropertyMap.java
+++ b/ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/JobConfigurationException.java
@@ -18,8 +18,8 @@
*/
package org.apache.ambari.infra.job;
-import java.util.Map;
-
-public interface PropertyMap<T extends JobProperties<T>> {
- Map<String, T> getPropertyMap();
+public class JobConfigurationException extends RuntimeException {
+ public JobConfigurationException(String message, Exception ex) {
+ super(message, ex);
+ }
}
diff --git a/ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/PropertyMap.java b/ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/JobContextRepository.java
similarity index 79%
copy from ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/PropertyMap.java
copy to ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/JobContextRepository.java
index e7b9e77..eb7f717 100644
--- a/ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/PropertyMap.java
+++ b/ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/JobContextRepository.java
@@ -18,8 +18,9 @@
*/
package org.apache.ambari.infra.job;
-import java.util.Map;
+import org.springframework.batch.core.StepExecution;
-public interface PropertyMap<T extends JobProperties<T>> {
- Map<String, T> getPropertyMap();
+public interface JobContextRepository {
+ StepExecution getStepExecution(Long jobExecutionId, Long id);
+ void updateExecutionContext(StepExecution stepExecution);
}
diff --git a/ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/JobContextRepositoryImpl.java b/ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/JobContextRepositoryImpl.java
new file mode 100644
index 0000000..fbb256f
--- /dev/null
+++ b/ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/JobContextRepositoryImpl.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.ambari.infra.job;
+
+import org.springframework.batch.admin.service.JobService;
+import org.springframework.batch.admin.service.NoSuchStepExecutionException;
+import org.springframework.batch.core.StepExecution;
+import org.springframework.batch.core.launch.NoSuchJobExecutionException;
+import org.springframework.batch.core.repository.JobRepository;
+
+import javax.inject.Inject;
+import javax.inject.Named;
+
+@Named
+public class JobContextRepositoryImpl implements JobContextRepository {
+
+ @Inject
+ private JobRepository jobRepository;
+ @Inject
+ private JobService jobService;
+
+
+ @Override
+ public StepExecution getStepExecution(Long jobExecutionId, Long id) {
+ try {
+ return jobService.getStepExecution(jobExecutionId, id);
+ } catch (NoSuchStepExecutionException | NoSuchJobExecutionException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ @Override
+ public void updateExecutionContext(StepExecution stepExecution) {
+ jobRepository.updateExecutionContext(stepExecution);
+ }
+}
diff --git a/ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/JobProperties.java b/ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/JobProperties.java
index 292e15e..53909ae 100644
--- a/ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/JobProperties.java
+++ b/ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/JobProperties.java
@@ -23,14 +23,32 @@ import org.springframework.batch.core.JobParameters;
import java.io.IOException;
import java.io.UncheckedIOException;
+import java.util.Optional;
public abstract class JobProperties<T extends JobProperties<T>> {
+
+ private SchedulingProperties scheduling;
private final Class<T> clazz;
+ private boolean enabled;
protected JobProperties(Class<T> clazz) {
this.clazz = clazz;
}
+ public SchedulingProperties getScheduling() {
+ return scheduling;
+ }
+
+ public Optional<SchedulingProperties> scheduling() {
+ if (scheduling != null && scheduling.isEnabled())
+ return Optional.of(scheduling);
+ return Optional.empty();
+ }
+
+ public void setScheduling(SchedulingProperties scheduling) {
+ this.scheduling = scheduling;
+ }
+
public T deepCopy() {
try {
ObjectMapper objectMapper = new ObjectMapper();
@@ -44,4 +62,21 @@ public abstract class JobProperties<T extends JobProperties<T>> {
public abstract void apply(JobParameters jobParameters);
public abstract void validate();
+
+ public void validate(String jobName) {
+ try {
+ validate();
+ }
+ catch (Exception ex) {
+ throw new JobConfigurationException(String.format("Configuration of job %s is invalid!", jobName), ex);
+ }
+ }
+
+ public boolean isEnabled() {
+ return enabled;
+ }
+
+ public void setEnabled(boolean enabled) {
+ this.enabled = enabled;
+ }
}
diff --git a/ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/JobScheduler.java b/ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/JobScheduler.java
new file mode 100644
index 0000000..324c0b3
--- /dev/null
+++ b/ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/JobScheduler.java
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.ambari.infra.job;
+
+import org.apache.ambari.infra.manager.Jobs;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.batch.core.ExitStatus;
+import org.springframework.batch.core.JobExecution;
+import org.springframework.batch.core.JobParametersBuilder;
+import org.springframework.batch.core.JobParametersInvalidException;
+import org.springframework.batch.core.launch.NoSuchJobException;
+import org.springframework.batch.core.launch.NoSuchJobExecutionException;
+import org.springframework.batch.core.repository.JobExecutionAlreadyRunningException;
+import org.springframework.batch.core.repository.JobInstanceAlreadyCompleteException;
+import org.springframework.batch.core.repository.JobRestartException;
+import org.springframework.scheduling.TaskScheduler;
+import org.springframework.scheduling.support.CronTrigger;
+
+import javax.inject.Inject;
+import javax.inject.Named;
+import java.time.Duration;
+import java.time.OffsetDateTime;
+
+import static org.apache.ambari.infra.job.archive.FileNameSuffixFormatter.SOLR_DATETIME_FORMATTER;
+import static org.apache.commons.lang.StringUtils.isBlank;
+
+@Named
+public class JobScheduler {
+ private static final Logger LOG = LoggerFactory.getLogger(JobScheduler.class);
+
+ private final TaskScheduler scheduler;
+ private final Jobs jobs;
+
+ @Inject
+ public JobScheduler(TaskScheduler scheduler, Jobs jobs) {
+ this.scheduler = scheduler;
+ this.jobs = jobs;
+ }
+
+ public void schedule(String jobName, SchedulingProperties schedulingProperties) {
+ try {
+ jobs.lastRun(jobName).ifPresent(this::restartIfFailed);
+ } catch (NoSuchJobException | NoSuchJobExecutionException e) {
+ throw new RuntimeException(e);
+ }
+
+ scheduler.schedule(() -> launchJob(jobName, schedulingProperties.getIntervalEndDelta()), new CronTrigger(schedulingProperties.getCron()));
+ LOG.info("Job {} scheduled for running. Cron: {}", jobName, schedulingProperties.getCron());
+ }
+
+ private void restartIfFailed(JobExecution jobExecution) {
+ if (jobExecution.getExitStatus() == ExitStatus.FAILED) {
+ try {
+ jobs.restart(jobExecution.getId());
+ } catch (JobInstanceAlreadyCompleteException | NoSuchJobException | JobExecutionAlreadyRunningException | JobRestartException | JobParametersInvalidException | NoSuchJobExecutionException e) {
+ throw new RuntimeException(e);
+ }
+ }
+ }
+
+ private void launchJob(String jobName, String endDelta) {
+ try {
+ JobParametersBuilder jobParametersBuilder = new JobParametersBuilder();
+ if (!isBlank(endDelta))
+ jobParametersBuilder.addString("end", SOLR_DATETIME_FORMATTER.format(OffsetDateTime.now().minus(Duration.parse(endDelta))));
+
+ jobs.launchJob(jobName, jobParametersBuilder.toJobParameters());
+ } catch (JobParametersInvalidException | NoSuchJobException | JobExecutionAlreadyRunningException | JobRestartException | JobInstanceAlreadyCompleteException e) {
+ throw new RuntimeException(e);
+ }
+ }
+}
diff --git a/ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/JobPropertyMap.java b/ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/JobsPropertyMap.java
similarity index 85%
rename from ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/JobPropertyMap.java
rename to ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/JobsPropertyMap.java
index b5061f8..094e797 100644
--- a/ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/JobPropertyMap.java
+++ b/ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/JobsPropertyMap.java
@@ -22,11 +22,13 @@ import org.springframework.batch.core.ExitStatus;
import org.springframework.batch.core.JobExecution;
import org.springframework.batch.core.JobExecutionListener;
-public class JobPropertyMap<T extends JobProperties<T>> implements JobExecutionListener {
+import java.util.Map;
- private final PropertyMap<T> propertyMap;
+public class JobsPropertyMap<T extends JobProperties<T>> implements JobExecutionListener {
- public JobPropertyMap(PropertyMap<T> propertyMap) {
+ private final Map<String, T> propertyMap;
+
+ public JobsPropertyMap(Map<String, T> propertyMap) {
this.propertyMap = propertyMap;
}
@@ -34,13 +36,13 @@ public class JobPropertyMap<T extends JobProperties<T>> implements JobExecutionL
public void beforeJob(JobExecution jobExecution) {
try {
String jobName = jobExecution.getJobInstance().getJobName();
- T defaultProperties = propertyMap.getPropertyMap().get(jobName);
+ T defaultProperties = propertyMap.get(jobName);
if (defaultProperties == null)
throw new UnsupportedOperationException("Properties not found for job " + jobName);
T properties = defaultProperties.deepCopy();
properties.apply(jobExecution.getJobParameters());
- properties.validate();
+ properties.validate(jobName);
jobExecution.getExecutionContext().put("jobProperties", properties);
}
catch (UnsupportedOperationException | IllegalArgumentException ex) {
diff --git a/ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/PropertyMap.java b/ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/SchedulingProperties.java
similarity index 60%
rename from ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/PropertyMap.java
rename to ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/SchedulingProperties.java
index e7b9e77..af81b4f 100644
--- a/ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/PropertyMap.java
+++ b/ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/SchedulingProperties.java
@@ -18,8 +18,32 @@
*/
package org.apache.ambari.infra.job;
-import java.util.Map;
+public class SchedulingProperties {
+ private boolean enabled = false;
+ private String cron;
+ private String intervalEndDelta;
-public interface PropertyMap<T extends JobProperties<T>> {
- Map<String, T> getPropertyMap();
+ public boolean isEnabled() {
+ return enabled;
+ }
+
+ public void setEnabled(boolean enabled) {
+ this.enabled = enabled;
+ }
+
+ public String getCron() {
+ return cron;
+ }
+
+ public void setCron(String cron) {
+ this.cron = cron;
+ }
+
+ public String getIntervalEndDelta() {
+ return intervalEndDelta;
+ }
+
+ public void setIntervalEndDelta(String intervalEndDelta) {
+ this.intervalEndDelta = intervalEndDelta;
+ }
}
diff --git a/ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/archive/DocumentArchivingConfiguration.java b/ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/archive/DocumentArchivingConfiguration.java
index 837b9c4..89f94bd 100644
--- a/ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/archive/DocumentArchivingConfiguration.java
+++ b/ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/archive/DocumentArchivingConfiguration.java
@@ -19,7 +19,9 @@
package org.apache.ambari.infra.job.archive;
import org.apache.ambari.infra.conf.InfraManagerDataConfig;
-import org.apache.ambari.infra.job.JobPropertyMap;
+import org.apache.ambari.infra.job.AbstractJobsConfiguration;
+import org.apache.ambari.infra.job.JobContextRepository;
+import org.apache.ambari.infra.job.JobScheduler;
import org.apache.ambari.infra.job.ObjectSource;
import org.apache.hadoop.fs.Path;
import org.slf4j.Logger;
@@ -31,55 +33,41 @@ import org.springframework.batch.core.configuration.annotation.JobScope;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepScope;
import org.springframework.batch.core.configuration.support.JobRegistryBeanPostProcessor;
+import org.springframework.batch.core.job.builder.JobBuilder;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
-import javax.annotation.PostConstruct;
import javax.inject.Inject;
import java.io.File;
import static org.apache.commons.lang.StringUtils.isBlank;
@Configuration
-public class DocumentArchivingConfiguration {
+public class DocumentArchivingConfiguration extends AbstractJobsConfiguration<DocumentArchivingProperties> {
private static final Logger LOG = LoggerFactory.getLogger(DocumentArchivingConfiguration.class);
private static final DocumentWiper NOT_DELETE = (firstDocument, lastDocument) -> { };
- @Inject
- private DocumentArchivingPropertyMap propertyMap;
-
- @Inject
- private StepBuilderFactory steps;
+ private final StepBuilderFactory steps;
+ private final Step exportStep;
@Inject
- private JobBuilderFactory jobs;
-
- @Inject
- @Qualifier("exportStep")
- private Step exportStep;
-
- @Inject
- private JobRegistryBeanPostProcessor jobRegistryBeanPostProcessor;
-
-
- @PostConstruct
- public void createJobs() {
- if (propertyMap == null || propertyMap.getSolrDataArchiving() == null)
- return;
-
- propertyMap.getSolrDataArchiving().values().forEach(DocumentArchivingProperties::validate);
-
- propertyMap.getSolrDataArchiving().keySet().forEach(jobName -> {
- LOG.info("Registering data archiving job {}", jobName);
- Job job = logExportJob(jobName, exportStep);
- jobRegistryBeanPostProcessor.postProcessAfterInitialization(job, jobName);
- });
+ public DocumentArchivingConfiguration(
+ DocumentArchivingPropertyMap jobsPropertyMap,
+ JobScheduler scheduler,
+ StepBuilderFactory steps,
+ JobBuilderFactory jobs,
+ @Qualifier("exportStep") Step exportStep,
+ JobRegistryBeanPostProcessor jobRegistryBeanPostProcessor) {
+ super(jobsPropertyMap.getSolrDataArchiving(), scheduler, jobs, jobRegistryBeanPostProcessor);
+ this.exportStep = exportStep;
+ this.steps = steps;
}
- private Job logExportJob(String jobName, Step logExportStep) {
- return jobs.get(jobName).listener(new JobPropertyMap<>(propertyMap)).start(logExportStep).build();
+ @Override
+ protected Job buildJob(JobBuilder jobBuilder) {
+ return jobBuilder.start(exportStep).build();
}
@Bean
@@ -93,11 +81,12 @@ public class DocumentArchivingConfiguration {
@Bean
@StepScope
public DocumentExporter documentExporter(DocumentItemReader documentItemReader,
- @Value("#{stepExecution.jobExecution.id}") String jobId,
+ @Value("#{stepExecution.jobExecution.jobId}") String jobId,
@Value("#{stepExecution.jobExecution.executionContext.get('jobProperties')}") DocumentArchivingProperties properties,
InfraManagerDataConfig infraManagerDataConfig,
@Value("#{jobParameters[end]}") String intervalEnd,
- DocumentWiper documentWiper) {
+ DocumentWiper documentWiper,
+ JobContextRepository jobContextRepository) {
File baseDir = new File(infraManagerDataConfig.getDataFolder(), "exporting");
CompositeFileAction fileAction = new CompositeFileAction(new TarGzCompressor());
@@ -134,7 +123,7 @@ public class DocumentArchivingConfiguration {
documentItemReader,
firstDocument -> new LocalDocumentItemWriter(
outFile(properties.getSolr().getCollection(), destinationDirectory, fileNameSuffixFormatter.format(firstDocument)), itemWriterListener),
- properties.getWriteBlockSize());
+ properties.getWriteBlockSize(), jobContextRepository);
}
@Bean
diff --git a/ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/archive/DocumentArchivingProperties.java b/ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/archive/DocumentArchivingProperties.java
index b90402a..f8fa33b 100644
--- a/ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/archive/DocumentArchivingProperties.java
+++ b/ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/archive/DocumentArchivingProperties.java
@@ -259,6 +259,7 @@ public class DocumentArchivingProperties extends JobProperties<DocumentArchiving
"The property hdfsDestinationDirectory can not be null or empty string when destination is set to %s!", HDFS.name()));
}
+ requireNonNull(solr, "No solr query was specified for archiving job!");
solr.validate();
}
}
diff --git a/ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/archive/DocumentArchivingPropertyMap.java b/ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/archive/DocumentArchivingPropertyMap.java
index 7d1a738..a009031 100644
--- a/ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/archive/DocumentArchivingPropertyMap.java
+++ b/ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/archive/DocumentArchivingPropertyMap.java
@@ -18,7 +18,6 @@
*/
package org.apache.ambari.infra.job.archive;
-import org.apache.ambari.infra.job.PropertyMap;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Configuration;
@@ -26,7 +25,7 @@ import java.util.Map;
@Configuration
@ConfigurationProperties(prefix = "infra-manager.jobs")
-public class DocumentArchivingPropertyMap implements PropertyMap<DocumentArchivingProperties> {
+public class DocumentArchivingPropertyMap {
private Map<String, DocumentArchivingProperties> solrDataArchiving;
public Map<String, DocumentArchivingProperties> getSolrDataArchiving() {
@@ -36,9 +35,4 @@ public class DocumentArchivingPropertyMap implements PropertyMap<DocumentArchivi
public void setSolrDataArchiving(Map<String, DocumentArchivingProperties> solrDataArchiving) {
this.solrDataArchiving = solrDataArchiving;
}
-
- @Override
- public Map<String, DocumentArchivingProperties> getPropertyMap() {
- return getSolrDataArchiving();
- }
}
diff --git a/ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/archive/DocumentExporter.java b/ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/archive/DocumentExporter.java
index 6106c20..d87fdea 100644
--- a/ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/archive/DocumentExporter.java
+++ b/ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/archive/DocumentExporter.java
@@ -18,6 +18,10 @@
*/
package org.apache.ambari.infra.job.archive;
+import org.apache.ambari.infra.job.JobContextRepository;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.batch.core.BatchStatus;
import org.springframework.batch.core.ExitStatus;
import org.springframework.batch.core.StepContribution;
import org.springframework.batch.core.StepExecution;
@@ -30,15 +34,19 @@ import org.springframework.batch.repeat.RepeatStatus;
public class DocumentExporter implements Tasklet, StepExecutionListener {
+ private static final Logger LOG = LoggerFactory.getLogger(DocumentExporter.class);
+
private boolean complete = false;
private final ItemStreamReader<Document> documentReader;
private final DocumentDestination documentDestination;
private final int writeBlockSize;
+ private final JobContextRepository jobContextRepository;
- public DocumentExporter(ItemStreamReader<Document> documentReader, DocumentDestination documentDestination, int writeBlockSize) {
+ public DocumentExporter(ItemStreamReader<Document> documentReader, DocumentDestination documentDestination, int writeBlockSize, JobContextRepository jobContextRepository) {
this.documentReader = documentReader;
this.documentDestination = documentDestination;
this.writeBlockSize = writeBlockSize;
+ this.jobContextRepository = jobContextRepository;
}
@Override
@@ -58,7 +66,8 @@ public class DocumentExporter implements Tasklet, StepExecutionListener {
@Override
public RepeatStatus execute(StepContribution contribution, ChunkContext chunkContext) throws Exception {
- ExecutionContext executionContext = chunkContext.getStepContext().getStepExecution().getExecutionContext();
+ StepExecution stepExecution = chunkContext.getStepContext().getStepExecution();
+ ExecutionContext executionContext = stepExecution.getExecutionContext();
documentReader.open(executionContext);
DocumentItemWriter writer = null;
@@ -67,10 +76,19 @@ public class DocumentExporter implements Tasklet, StepExecutionListener {
Document document;
while ((document = documentReader.read()) != null) {
if (writer != null && writtenCount >= writeBlockSize) {
+ stepExecution = jobContextRepository.getStepExecution(stepExecution.getJobExecutionId(), stepExecution.getId());
+ if (stepExecution.getJobExecution().getStatus() == BatchStatus.STOPPING) {
+ LOG.info("Received stop signal.");
+ writer.revert();
+ writer = null;
+ return RepeatStatus.CONTINUABLE;
+ }
+
writer.close();
writer = null;
writtenCount = 0;
documentReader.update(executionContext);
+ jobContextRepository.updateExecutionContext(stepExecution);
}
if (writer == null)
diff --git a/ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/archive/FileNameSuffixFormatter.java b/ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/archive/FileNameSuffixFormatter.java
index 85ec00b..f9016e6 100644
--- a/ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/archive/FileNameSuffixFormatter.java
+++ b/ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/archive/FileNameSuffixFormatter.java
@@ -26,7 +26,7 @@ import static org.apache.ambari.infra.job.archive.SolrDocumentIterator.SOLR_DATE
import static org.apache.commons.lang.StringUtils.isBlank;
public class FileNameSuffixFormatter {
- private static final DateTimeFormatter SOLR_DATETIME_FORMATTER = DateTimeFormatter.ofPattern(SOLR_DATE_FORMAT_TEXT);
+ public static final DateTimeFormatter SOLR_DATETIME_FORMATTER = DateTimeFormatter.ofPattern(SOLR_DATE_FORMAT_TEXT);
public static FileNameSuffixFormatter from(DocumentArchivingProperties properties) {
return new FileNameSuffixFormatter(properties.getFileNameSuffixColumn(), properties.getFileNameSuffixDateFormat());
diff --git a/ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/deleting/DocumentDeletingConfiguration.java b/ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/deleting/DocumentDeletingConfiguration.java
index ce8970d..4a68c49 100644
--- a/ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/deleting/DocumentDeletingConfiguration.java
+++ b/ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/deleting/DocumentDeletingConfiguration.java
@@ -18,9 +18,8 @@
*/
package org.apache.ambari.infra.job.deleting;
-import org.apache.ambari.infra.job.JobPropertyMap;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import org.apache.ambari.infra.job.AbstractJobsConfiguration;
+import org.apache.ambari.infra.job.JobScheduler;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
@@ -28,50 +27,36 @@ import org.springframework.batch.core.configuration.annotation.JobScope;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepScope;
import org.springframework.batch.core.configuration.support.JobRegistryBeanPostProcessor;
+import org.springframework.batch.core.job.builder.JobBuilder;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
-import javax.annotation.PostConstruct;
import javax.inject.Inject;
@Configuration
-public class DocumentDeletingConfiguration {
- private static final Logger LOG = LoggerFactory.getLogger(DocumentDeletingConfiguration.class);
+public class DocumentDeletingConfiguration extends AbstractJobsConfiguration<DocumentDeletingProperties> {
- @Inject
- private DocumentDeletingPropertyMap propertyMap;
-
- @Inject
- private StepBuilderFactory steps;
-
- @Inject
- private JobBuilderFactory jobs;
+ private final StepBuilderFactory steps;
+ private final Step deleteStep;
@Inject
- private JobRegistryBeanPostProcessor jobRegistryBeanPostProcessor;
-
- @Inject
- @Qualifier("deleteStep")
- private Step deleteStep;
-
- @PostConstruct
- public void createJobs() {
- if (propertyMap == null || propertyMap.getSolrDataDeleting() == null)
- return;
-
- propertyMap.getSolrDataDeleting().values().forEach(DocumentDeletingProperties::validate);
-
- propertyMap.getSolrDataDeleting().keySet().forEach(jobName -> {
- LOG.info("Registering data deleting job {}", jobName);
- Job job = logDeleteJob(jobName, deleteStep);
- jobRegistryBeanPostProcessor.postProcessAfterInitialization(job, jobName);
- });
+ public DocumentDeletingConfiguration(
+ DocumentDeletingPropertyMap documentDeletingPropertyMap,
+ JobScheduler scheduler,
+ StepBuilderFactory steps,
+ JobBuilderFactory jobs,
+ JobRegistryBeanPostProcessor jobRegistryBeanPostProcessor,
+ @Qualifier("deleteStep") Step deleteStep) {
+ super(documentDeletingPropertyMap.getSolrDataDeleting(), scheduler, jobs, jobRegistryBeanPostProcessor);
+ this.steps = steps;
+ this.deleteStep = deleteStep;
}
- private Job logDeleteJob(String jobName, Step logExportStep) {
- return jobs.get(jobName).listener(new JobPropertyMap<>(propertyMap)).start(logExportStep).build();
+ @Override
+ protected Job buildJob(JobBuilder jobBuilder) {
+ return jobBuilder.start(deleteStep).build();
}
@Bean
diff --git a/ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/deleting/DocumentDeletingPropertyMap.java b/ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/deleting/DocumentDeletingPropertyMap.java
index fccfd59..1dc0caf 100644
--- a/ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/deleting/DocumentDeletingPropertyMap.java
+++ b/ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/deleting/DocumentDeletingPropertyMap.java
@@ -18,7 +18,6 @@
*/
package org.apache.ambari.infra.job.deleting;
-import org.apache.ambari.infra.job.PropertyMap;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Configuration;
@@ -26,7 +25,7 @@ import java.util.Map;
@Configuration
@ConfigurationProperties(prefix = "infra-manager.jobs")
-public class DocumentDeletingPropertyMap implements PropertyMap<DocumentDeletingProperties> {
+public class DocumentDeletingPropertyMap {
private Map<String, DocumentDeletingProperties> solrDataDeleting;
public Map<String, DocumentDeletingProperties> getSolrDataDeleting() {
@@ -36,9 +35,4 @@ public class DocumentDeletingPropertyMap implements PropertyMap<DocumentDeleting
public void setSolrDataDeleting(Map<String, DocumentDeletingProperties> solrDataDeleting) {
this.solrDataDeleting = solrDataDeleting;
}
-
- @Override
- public Map<String, DocumentDeletingProperties> getPropertyMap() {
- return getSolrDataDeleting();
- }
}
diff --git a/ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/manager/JobManager.java b/ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/manager/JobManager.java
index 862119a..f35387d 100644
--- a/ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/manager/JobManager.java
+++ b/ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/manager/JobManager.java
@@ -18,7 +18,6 @@
*/
package org.apache.ambari.infra.manager;
-import com.google.common.base.Splitter;
import com.google.common.collect.Lists;
import org.apache.ambari.infra.model.ExecutionContextResponse;
import org.apache.ambari.infra.model.JobDetailsResponse;
@@ -36,9 +35,13 @@ import org.springframework.batch.admin.service.JobService;
import org.springframework.batch.admin.service.NoSuchStepExecutionException;
import org.springframework.batch.admin.web.JobInfo;
import org.springframework.batch.admin.web.StepExecutionProgress;
-import org.springframework.batch.core.*;
+import org.springframework.batch.core.JobExecution;
+import org.springframework.batch.core.JobInstance;
+import org.springframework.batch.core.JobParameters;
+import org.springframework.batch.core.JobParametersInvalidException;
+import org.springframework.batch.core.StepExecution;
+import org.springframework.batch.core.explore.JobExplorer;
import org.springframework.batch.core.launch.JobExecutionNotRunningException;
-import org.springframework.batch.core.launch.JobInstanceAlreadyExistsException;
import org.springframework.batch.core.launch.JobOperator;
import org.springframework.batch.core.launch.NoSuchJobException;
import org.springframework.batch.core.launch.NoSuchJobExecutionException;
@@ -51,16 +54,16 @@ import javax.inject.Inject;
import javax.inject.Named;
import java.util.ArrayList;
import java.util.Collection;
-import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.Optional;
import java.util.Set;
import java.util.TimeZone;
@Named
-public class JobManager {
+public class JobManager implements Jobs {
private static final Logger LOG = LoggerFactory.getLogger(JobManager.class);
@@ -70,6 +73,9 @@ public class JobManager {
@Inject
private JobOperator jobOperator;
+ @Inject
+ private JobExplorer jobExplorer;
+
private TimeZone timeZone = TimeZone.getDefault();
public Set<String> getAllJobNames() {
@@ -80,18 +86,28 @@ public class JobManager {
* Launch a new job instance (based on job name) and applies customized parameters to it.
* Also add a new date parameter to make sure the job instance will be unique
*/
- public JobExecutionInfoResponse launchJob(String jobName, String params)
- throws JobParametersInvalidException, JobInstanceAlreadyExistsException, NoSuchJobException,
+ @Override
+ public JobExecutionInfoResponse launchJob(String jobName, JobParameters jobParameters)
+ throws JobParametersInvalidException, NoSuchJobException,
JobExecutionAlreadyRunningException, JobRestartException, JobInstanceAlreadyCompleteException {
- JobParametersBuilder jobParametersBuilder = new JobParametersBuilder();
- if (params != null) {
- LOG.info("Parsing parameters of job {} '{}'", jobName, params);
- Splitter.on(',')
- .trimResults()
- .withKeyValueSeparator(Splitter.on('=').limit(2).trimResults())
- .split(params).entrySet().forEach(entry -> jobParametersBuilder.addString(entry.getKey(), entry.getValue()));
- }
- return new JobExecutionInfoResponse(jobService.launch(jobName, jobParametersBuilder.toJobParameters()), timeZone);
+
+ Set<JobExecution> running = jobExplorer.findRunningJobExecutions(jobName);
+ if (!running.isEmpty())
+ throw new JobExecutionAlreadyRunningException("An instance of this job is already active: "+jobName);
+
+ return new JobExecutionInfoResponse(jobService.launch(jobName, jobParameters), timeZone);
+ }
+
+ @Override
+ public void restart(Long jobExecutionId)
+ throws JobInstanceAlreadyCompleteException, NoSuchJobException, JobExecutionAlreadyRunningException,
+ JobParametersInvalidException, JobRestartException, NoSuchJobExecutionException {
+ jobService.restart(jobExecutionId);
+ }
+
+ @Override
+ public Optional<JobExecution> lastRun(String jobName) throws NoSuchJobException {
+ return jobService.listJobExecutionsForJob(jobName, 0, 1).stream().findFirst();
}
/**
@@ -111,19 +127,14 @@ public class JobManager {
/**
* Gather job execution details by job execution id.
*/
- public JobExecutionDetailsResponse getExectionInfo(Long jobExecutionId) throws NoSuchJobExecutionException {
+ public JobExecutionDetailsResponse getExecutionInfo(Long jobExecutionId) throws NoSuchJobExecutionException {
JobExecution jobExecution = jobService.getJobExecution(jobExecutionId);
- List<StepExecutionInfoResponse> stepExecutionInfos = new ArrayList<StepExecutionInfoResponse>();
+ List<StepExecutionInfoResponse> stepExecutionInfoList = new ArrayList<>();
for (StepExecution stepExecution : jobExecution.getStepExecutions()) {
- stepExecutionInfos.add(new StepExecutionInfoResponse(stepExecution, timeZone));
+ stepExecutionInfoList.add(new StepExecutionInfoResponse(stepExecution, timeZone));
}
- Collections.sort(stepExecutionInfos, new Comparator<StepExecutionInfoResponse>() {
- @Override
- public int compare(StepExecutionInfoResponse o1, StepExecutionInfoResponse o2) {
- return o1.getId().compareTo(o2.getId());
- }
- });
- return new JobExecutionDetailsResponse(new JobExecutionInfoResponse(jobExecution, timeZone), stepExecutionInfos);
+ stepExecutionInfoList.sort(Comparator.comparing(StepExecutionInfoResponse::getId));
+ return new JobExecutionDetailsResponse(new JobExecutionInfoResponse(jobExecution, timeZone), stepExecutionInfoList);
}
/**
@@ -139,6 +150,7 @@ public class JobManager {
} else {
throw new UnsupportedOperationException("Unsupported operaration");
}
+ LOG.info("Job {} was marked {}", jobExecution.getJobInstance().getJobName(), operation.name());
return new JobExecutionInfoResponse(jobExecution, timeZone);
}
diff --git a/ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/manager/Jobs.java b/ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/manager/Jobs.java
new file mode 100644
index 0000000..b2ca605
--- /dev/null
+++ b/ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/manager/Jobs.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.ambari.infra.manager;
+
+import org.apache.ambari.infra.model.JobExecutionInfoResponse;
+import org.springframework.batch.core.JobExecution;
+import org.springframework.batch.core.JobParameters;
+import org.springframework.batch.core.JobParametersInvalidException;
+import org.springframework.batch.core.launch.NoSuchJobException;
+import org.springframework.batch.core.launch.NoSuchJobExecutionException;
+import org.springframework.batch.core.repository.JobExecutionAlreadyRunningException;
+import org.springframework.batch.core.repository.JobInstanceAlreadyCompleteException;
+import org.springframework.batch.core.repository.JobRestartException;
+
+import java.util.Optional;
+
+public interface Jobs {
+ JobExecutionInfoResponse launchJob(String jobName, JobParameters params)
+ throws JobParametersInvalidException, NoSuchJobException,
+ JobExecutionAlreadyRunningException, JobRestartException, JobInstanceAlreadyCompleteException;
+ void restart(Long jobExecutionId)
+ throws JobInstanceAlreadyCompleteException, NoSuchJobException, JobExecutionAlreadyRunningException,
+ JobParametersInvalidException, JobRestartException, NoSuchJobExecutionException;
+
+ Optional<JobExecution> lastRun(String jobName) throws NoSuchJobException, NoSuchJobExecutionException;
+}
diff --git a/ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/rest/JobResource.java b/ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/rest/JobResource.java
index 0e20b54..502057e 100644
--- a/ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/rest/JobResource.java
+++ b/ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/rest/JobResource.java
@@ -18,6 +18,7 @@
*/
package org.apache.ambari.infra.rest;
+import com.google.common.base.Splitter;
import io.swagger.annotations.Api;
import io.swagger.annotations.ApiOperation;
import org.apache.ambari.infra.manager.JobManager;
@@ -35,11 +36,13 @@ import org.apache.ambari.infra.model.StepExecutionContextResponse;
import org.apache.ambari.infra.model.StepExecutionInfoResponse;
import org.apache.ambari.infra.model.StepExecutionProgressResponse;
import org.apache.ambari.infra.model.StepExecutionRequest;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import org.springframework.batch.admin.service.NoSuchStepExecutionException;
import org.springframework.batch.admin.web.JobInfo;
+import org.springframework.batch.core.JobParametersBuilder;
import org.springframework.batch.core.JobParametersInvalidException;
import org.springframework.batch.core.launch.JobExecutionNotRunningException;
-import org.springframework.batch.core.launch.JobInstanceAlreadyExistsException;
import org.springframework.batch.core.launch.NoSuchJobException;
import org.springframework.batch.core.launch.NoSuchJobExecutionException;
import org.springframework.batch.core.launch.NoSuchJobInstanceException;
@@ -67,6 +70,7 @@ import java.util.Set;
@Named
@Scope("request")
public class JobResource {
+ private static final Logger LOG = LoggerFactory.getLogger(JobResource.class);
@Inject
private JobManager jobManager;
@@ -83,9 +87,21 @@ public class JobResource {
@Path("{jobName}")
@ApiOperation("Start a new job instance by job name.")
public JobExecutionInfoResponse startJob(@BeanParam @Valid JobInstanceStartRequest request)
- throws JobParametersInvalidException, JobInstanceAlreadyExistsException, NoSuchJobException, JobExecutionAlreadyRunningException,
+ throws JobParametersInvalidException, NoSuchJobException, JobExecutionAlreadyRunningException,
JobRestartException, JobInstanceAlreadyCompleteException {
- return jobManager.launchJob(request.getJobName(), request.getParams());
+
+ String jobName = request.getJobName();
+ String params = request.getParams();
+ JobParametersBuilder jobParametersBuilder = new JobParametersBuilder();
+ if (params != null) {
+ LOG.info("Parsing parameters of job {} '{}'", jobName, params);
+ Splitter.on(',')
+ .trimResults()
+ .withKeyValueSeparator(Splitter.on('=').limit(2).trimResults())
+ .split(params).forEach(jobParametersBuilder::addString);
+ }
+
+ return jobManager.launchJob(jobName, jobParametersBuilder.toJobParameters());
}
@GET
@@ -117,7 +133,7 @@ public class JobResource {
@Path("/executions/{jobExecutionId}")
@ApiOperation("Get job and step details for job execution instance.")
public JobExecutionDetailsResponse getExectionInfo(@PathParam("jobExecutionId") @Valid Long jobExecutionId) throws NoSuchJobExecutionException {
- return jobManager.getExectionInfo(jobExecutionId);
+ return jobManager.getExecutionInfo(jobExecutionId);
}
@GET
@@ -150,8 +166,8 @@ public class JobResource {
@Produces({"application/json"})
@Path("/{jobName}/{jobInstanceId}/executions")
@ApiOperation("Get execution for job instance.")
- public List<JobExecutionInfoResponse> getExecutionsForInstance(@BeanParam @Valid JobExecutionRequest request) throws JobInstanceAlreadyCompleteException,
- NoSuchJobExecutionException, JobExecutionAlreadyRunningException, JobParametersInvalidException, JobRestartException, NoSuchJobException, NoSuchJobInstanceException {
+ public List<JobExecutionInfoResponse> getExecutionsForInstance(@BeanParam @Valid JobExecutionRequest request) throws
+ NoSuchJobException, NoSuchJobInstanceException {
return jobManager.getExecutionsForJobInstance(request.getJobName(), request.getJobInstanceId());
}
diff --git a/ambari-infra/ambari-infra-manager/src/main/resources/infra-manager.properties b/ambari-infra/ambari-infra-manager/src/main/resources/infra-manager.properties
index aea2b88..a0712ba 100644
--- a/ambari-infra/ambari-infra-manager/src/main/resources/infra-manager.properties
+++ b/ambari-infra/ambari-infra-manager/src/main/resources/infra-manager.properties
@@ -14,13 +14,14 @@
# limitations under the License.
infra-manager.batch.db.file=job-repository.db
-infra-manager.batch.db.init=true
+infra-manager.batch.db.init=false
infra-manager.batch.db.username=admin
infra-manager.batch.db.password=admin
management.security.enabled=false
management.health.solr.enabled=false
infra-manager.server.data.folder=/tmp/ambariInfraManager
+infra-manager.jobs.solr_data_archiving.archive_service_logs.enabled=true
infra-manager.jobs.solr_data_archiving.archive_service_logs.solr.zoo_keeper_connection_string=zookeeper:2181
infra-manager.jobs.solr_data_archiving.archive_service_logs.solr.collection=hadoop_logs
infra-manager.jobs.solr_data_archiving.archive_service_logs.solr.query_text=logtime:[${start} TO ${end}]
@@ -33,6 +34,10 @@ infra-manager.jobs.solr_data_archiving.archive_service_logs.destination=LOCAL
infra-manager.jobs.solr_data_archiving.archive_service_logs.local_destination_directory=/tmp/ambariInfraManager
infra-manager.jobs.solr_data_archiving.archive_service_logs.file_name_suffix_column=logtime
infra-manager.jobs.solr_data_archiving.archive_service_logs.file_name_suffix_date_format=yyyy-MM-dd'T'HH-mm-ss.SSSX
+infra-manager.jobs.solr_data_archiving.archive_service_logs.scheduling.enabled=true
+infra-manager.jobs.solr_data_archiving.archive_service_logs.scheduling.cron=0 * * * * ?
+infra-manager.jobs.solr_data_archiving.archive_service_logs.scheduling.intervalEndDelta=PT24H
+infra-manager.jobs.solr_data_archiving.archive_audit_logs.enabled=true
infra-manager.jobs.solr_data_archiving.archive_audit_logs.solr.zoo_keeper_connection_string=zookeeper:2181
infra-manager.jobs.solr_data_archiving.archive_audit_logs.solr.collection=audit_logs
infra-manager.jobs.solr_data_archiving.archive_audit_logs.solr.query_text=logtime:[${start} TO ${end}]
@@ -63,6 +68,7 @@ infra-manager.jobs.solr_data_archiving.archive_audit_logs.s3_endpoint=http://fak
#infra-manager.jobs.solr_data_archiving.export_ranger_audit_logs.query.filter_query_text=(logtime:"${logtime}" AND id:{"${id}" TO *]) OR logtime:{"${logtime}" TO "${end}"]
#infra-manager.jobs.solr_data_archiving.export_ranger_audit_logs.query.sort_column[0]=logtime
#infra-manager.jobs.solr_data_archiving.export_ranger_audit_logs.query.sort_column[1]=id
+infra-manager.jobs.solr_data_deleting.delete_audit_logs.enabled=true
infra-manager.jobs.solr_data_deleting.delete_audit_logs.zoo_keeper_connection_string=zookeeper:2181
infra-manager.jobs.solr_data_deleting.delete_audit_logs.collection=audit_logs
infra-manager.jobs.solr_data_deleting.delete_audit_logs.filter_field=logtime
diff --git a/ambari-infra/ambari-infra-manager/src/main/resources/log4j2.xml b/ambari-infra/ambari-infra-manager/src/main/resources/log4j2.xml
index 9737554..d3db3d7 100644
--- a/ambari-infra/ambari-infra-manager/src/main/resources/log4j2.xml
+++ b/ambari-infra/ambari-infra-manager/src/main/resources/log4j2.xml
@@ -37,5 +37,8 @@
<AppenderRef ref="File" />
<AppenderRef ref="Console" />
</Root>
+ <!--<Logger name="org.springframework.jdbc.core.JdbcTemplate" level="debug">-->
+ <!--<AppenderRef ref="Console"/>-->
+ <!--</Logger>-->
</Loggers>
</Configuration>
diff --git a/ambari-infra/ambari-infra-manager/src/test/java/org/apache/ambari/infra/job/JobSchedulerTest.java b/ambari-infra/ambari-infra-manager/src/test/java/org/apache/ambari/infra/job/JobSchedulerTest.java
new file mode 100644
index 0000000..ba1150f
--- /dev/null
+++ b/ambari-infra/ambari-infra-manager/src/test/java/org/apache/ambari/infra/job/JobSchedulerTest.java
@@ -0,0 +1,114 @@
+package org.apache.ambari.infra.job;
+
+import org.apache.ambari.infra.manager.Jobs;
+import org.easymock.EasyMockRunner;
+import org.easymock.EasyMockSupport;
+import org.easymock.Mock;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.springframework.batch.core.ExitStatus;
+import org.springframework.batch.core.JobExecution;
+import org.springframework.batch.core.JobParameters;
+import org.springframework.scheduling.TaskScheduler;
+import org.springframework.scheduling.support.CronTrigger;
+
+import javax.batch.operations.NoSuchJobException;
+import java.util.Optional;
+import java.util.concurrent.ScheduledFuture;
+
+import static org.easymock.EasyMock.eq;
+import static org.easymock.EasyMock.expect;
+import static org.easymock.EasyMock.expectLastCall;
+import static org.easymock.EasyMock.isA;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+@RunWith(EasyMockRunner.class)
+public class JobSchedulerTest extends EasyMockSupport {
+
+ @Mock
+ private TaskScheduler taskScheduler;
+ @Mock
+ private Jobs jobs;
+ @Mock
+ private ScheduledFuture scheduledFuture;
+ private JobScheduler jobScheduler;
+
+ @Before
+ public void setUp() throws Exception {
+ jobScheduler = new JobScheduler(taskScheduler, jobs);
+ }
+
+ @After
+ public void tearDown() throws Exception {
+ verifyAll();
+ }
+
+ @Test(expected = NoSuchJobException.class)
+ public void testScheduleWhenJobNotExistsThrowsException() throws Exception {
+ String jobName = "notFoundJob";
+ expect(jobs.lastRun(jobName)).andThrow(new NoSuchJobException());
+ replayAll();
+
+ jobScheduler.schedule(jobName, null);
+ }
+
+ @Test
+ public void testScheduleWhenNoPreviousExecutionExistsJobIsScheduled() throws Exception {
+ String jobName = "job0";
+ SchedulingProperties schedulingProperties = new SchedulingProperties();
+ schedulingProperties.setCron("* * * * * ?");
+ expect(jobs.lastRun(jobName)).andReturn(Optional.empty());
+ expect(taskScheduler.schedule(isA(Runnable.class), eq(new CronTrigger(schedulingProperties.getCron())))).andReturn(scheduledFuture);
+ replayAll();
+
+ jobScheduler.schedule(jobName, schedulingProperties);
+ }
+
+ @Test
+ public void testScheduleWhenPreviousExecutionWasSuccessfulJobIsScheduled() throws Exception {
+ String jobName = "job0";
+ SchedulingProperties schedulingProperties = new SchedulingProperties();
+ schedulingProperties.setCron("* * * * * ?");
+ JobExecution jobExecution = new JobExecution(1L, new JobParameters());
+ jobExecution.setExitStatus(ExitStatus.COMPLETED);
+ expect(jobs.lastRun(jobName)).andReturn(Optional.of(jobExecution));
+ expect(taskScheduler.schedule(isA(Runnable.class), eq(new CronTrigger(schedulingProperties.getCron())))).andReturn(scheduledFuture);
+ replayAll();
+
+ jobScheduler.schedule(jobName, schedulingProperties);
+ }
+
+ @Test
+ public void testScheduleWhenPreviousExecutionFailedJobIsRestartedAndScheduled() throws Exception {
+ String jobName = "job0";
+ SchedulingProperties schedulingProperties = new SchedulingProperties();
+ schedulingProperties.setCron("* * * * * ?");
+ JobExecution jobExecution = new JobExecution(1L, new JobParameters());
+ jobExecution.setExitStatus(ExitStatus.FAILED);
+ expect(jobs.lastRun(jobName)).andReturn(Optional.of(jobExecution));
+ jobs.restart(1L); expectLastCall();
+ expect(taskScheduler.schedule(isA(Runnable.class), eq(new CronTrigger(schedulingProperties.getCron())))).andReturn(scheduledFuture);
+ replayAll();
+
+ jobScheduler.schedule(jobName, schedulingProperties);
+ }
+}
\ No newline at end of file
diff --git a/ambari-infra/ambari-infra-manager/src/test/java/org/apache/ambari/infra/job/archive/DocumentExporterTest.java b/ambari-infra/ambari-infra-manager/src/test/java/org/apache/ambari/infra/job/archive/DocumentExporterTest.java
index 88fbff0..b31110c 100644
--- a/ambari-infra/ambari-infra-manager/src/test/java/org/apache/ambari/infra/job/archive/DocumentExporterTest.java
+++ b/ambari-infra/ambari-infra-manager/src/test/java/org/apache/ambari/infra/job/archive/DocumentExporterTest.java
@@ -19,6 +19,7 @@
package org.apache.ambari.infra.job.archive;
+import org.apache.ambari.infra.job.JobContextRepository;
import org.easymock.EasyMockRunner;
import org.easymock.EasyMockSupport;
import org.easymock.Mock;
@@ -26,12 +27,14 @@ import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
+import org.springframework.batch.core.BatchStatus;
import org.springframework.batch.core.JobExecution;
import org.springframework.batch.core.StepExecution;
import org.springframework.batch.core.scope.context.ChunkContext;
import org.springframework.batch.core.scope.context.StepContext;
import org.springframework.batch.item.ExecutionContext;
import org.springframework.batch.item.ItemStreamReader;
+import org.springframework.batch.repeat.RepeatStatus;
import java.io.IOException;
import java.io.UncheckedIOException;
@@ -39,10 +42,20 @@ import java.util.HashMap;
import static org.easymock.EasyMock.expect;
import static org.easymock.EasyMock.expectLastCall;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.core.Is.is;
@RunWith(EasyMockRunner.class)
public class DocumentExporterTest extends EasyMockSupport {
+ private static final long JOB_EXECUTION_ID = 1L;
+ private static final long STEP_EXECUTION_ID = 1L;
+ private static final Document DOCUMENT_2 = new Document(new HashMap<String, String>() {{
+ put("id", "2");
+ }});
+ private static final Document DOCUMENT_3 = new Document(new HashMap<String, String>() {{
+ put("id", "3");
+ }});
private DocumentExporter documentExporter;
@Mock
private ItemStreamReader<Document> reader;
@@ -52,17 +65,30 @@ public class DocumentExporterTest extends EasyMockSupport {
private DocumentItemWriter documentItemWriter;
@Mock
private DocumentItemWriter documentItemWriter2;
+ @Mock
+ private DocumentItemWriter documentItemWriter3;
+ @Mock
+ private JobContextRepository jobContextRepository;
- private ExecutionContext executionContext;
+// private ExecutionContext executionContext;
private ChunkContext chunkContext;
private static final Document DOCUMENT = new Document(new HashMap<String, String>() {{ put("id", "1"); }});
@Before
public void setUp() throws Exception {
- StepExecution stepExecution = new StepExecution("exportDoc", new JobExecution(1L));
- chunkContext = new ChunkContext(new StepContext(stepExecution));
- executionContext = stepExecution.getExecutionContext();
- documentExporter = new DocumentExporter(reader, documentDestination, 2);
+ chunkContext = chunkContext(BatchStatus.STARTED);
+ documentExporter = documentExporter(2);
+ }
+
+ private DocumentExporter documentExporter(int writeBlockSize) {
+ return new DocumentExporter(reader, documentDestination, writeBlockSize, jobContextRepository);
+ }
+
+ private ChunkContext chunkContext(BatchStatus batchStatus) {
+ StepExecution stepExecution = new StepExecution("exportDoc", new JobExecution(JOB_EXECUTION_ID));
+ stepExecution.setId(STEP_EXECUTION_ID);
+ stepExecution.getJobExecution().setStatus(batchStatus);
+ return new ChunkContext(new StepContext(stepExecution));
}
@After
@@ -72,7 +98,7 @@ public class DocumentExporterTest extends EasyMockSupport {
@Test
public void testNothingToRead() throws Exception {
- reader.open(executionContext); expectLastCall();
+ reader.open(executionContext(chunkContext)); expectLastCall();
expect(reader.read()).andReturn(null);
reader.close(); expectLastCall();
replayAll();
@@ -80,9 +106,13 @@ public class DocumentExporterTest extends EasyMockSupport {
documentExporter.execute(null, chunkContext);
}
+ private ExecutionContext executionContext(ChunkContext chunkContext) {
+ return chunkContext.getStepContext().getStepExecution().getExecutionContext();
+ }
+
@Test
public void testWriteLessDocumentsThanWriteBlockSize() throws Exception {
- reader.open(executionContext); expectLastCall();
+ reader.open(executionContext(chunkContext)); expectLastCall();
expect(reader.read()).andReturn(DOCUMENT);
expect(documentDestination.open(DOCUMENT)).andReturn(documentItemWriter);
documentItemWriter.write(DOCUMENT); expectLastCall();
@@ -91,36 +121,35 @@ public class DocumentExporterTest extends EasyMockSupport {
documentItemWriter.close(); expectLastCall();
replayAll();
- documentExporter.execute(null, chunkContext);
+ assertThat(documentExporter.execute(null, chunkContext), is(RepeatStatus.FINISHED));
}
@Test
public void testWriteMoreDocumentsThanWriteBlockSize() throws Exception {
- Document document2 = new Document(new HashMap<String, String>() {{ put("id", "2"); }});
- Document document3 = new Document(new HashMap<String, String>() {{ put("id", "3"); }});
-
- reader.open(executionContext); expectLastCall();
+ reader.open(executionContext(chunkContext)); expectLastCall();
expect(reader.read()).andReturn(DOCUMENT);
expect(documentDestination.open(DOCUMENT)).andReturn(documentItemWriter);
documentItemWriter.write(DOCUMENT); expectLastCall();
- expect(reader.read()).andReturn(document2);
- documentItemWriter.write(document2); expectLastCall();
- expect(reader.read()).andReturn(document3);
+ expect(reader.read()).andReturn(DOCUMENT_2);
+ documentItemWriter.write(DOCUMENT_2); expectLastCall();
+ expect(reader.read()).andReturn(DOCUMENT_3);
documentItemWriter.close(); expectLastCall();
- expect(documentDestination.open(document3)).andReturn(documentItemWriter2);
- documentItemWriter2.write(document3); expectLastCall();
+ jobContextRepository.updateExecutionContext(chunkContext.getStepContext().getStepExecution());
+ expect(jobContextRepository.getStepExecution(JOB_EXECUTION_ID, STEP_EXECUTION_ID)).andReturn(chunkContext.getStepContext().getStepExecution());
+ expect(documentDestination.open(DOCUMENT_3)).andReturn(documentItemWriter2);
+ documentItemWriter2.write(DOCUMENT_3); expectLastCall();
expect(reader.read()).andReturn(null);
- reader.update(executionContext);
+ reader.update(executionContext(chunkContext));
reader.close(); expectLastCall();
documentItemWriter2.close(); expectLastCall();
replayAll();
- documentExporter.execute(null, chunkContext);
+ assertThat(documentExporter.execute(null, chunkContext), is(RepeatStatus.FINISHED));
}
@Test(expected = IOException.class)
public void testReadError() throws Exception {
- reader.open(executionContext); expectLastCall();
+ reader.open(executionContext(chunkContext)); expectLastCall();
expect(reader.read()).andReturn(DOCUMENT);
expect(documentDestination.open(DOCUMENT)).andReturn(documentItemWriter);
documentItemWriter.write(DOCUMENT); expectLastCall();
@@ -134,7 +163,7 @@ public class DocumentExporterTest extends EasyMockSupport {
@Test(expected = UncheckedIOException.class)
public void testWriteError() throws Exception {
- reader.open(executionContext); expectLastCall();
+ reader.open(executionContext(chunkContext)); expectLastCall();
expect(reader.read()).andReturn(DOCUMENT);
expect(documentDestination.open(DOCUMENT)).andReturn(documentItemWriter);
documentItemWriter.write(DOCUMENT); expectLastCall().andThrow(new UncheckedIOException(new IOException("TEST")));
@@ -144,4 +173,43 @@ public class DocumentExporterTest extends EasyMockSupport {
documentExporter.execute(null, chunkContext);
}
+
+ @Test
+ public void testStopAndRestartExportsAllDocuments() throws Exception {
+ ChunkContext stoppingChunkContext = chunkContext(BatchStatus.STOPPING);
+ DocumentExporter documentExporter = documentExporter(1);
+
+ reader.open(executionContext(chunkContext)); expectLastCall();
+ expect(reader.read()).andReturn(DOCUMENT);
+
+ expect(documentDestination.open(DOCUMENT)).andReturn(documentItemWriter);
+ documentItemWriter.write(DOCUMENT); expectLastCall();
+ expect(reader.read()).andReturn(DOCUMENT_2);
+ expect(jobContextRepository.getStepExecution(JOB_EXECUTION_ID, STEP_EXECUTION_ID)).andReturn(chunkContext.getStepContext().getStepExecution());
+ documentItemWriter.close(); expectLastCall();
+ reader.update(executionContext(this.chunkContext));
+ jobContextRepository.updateExecutionContext(this.chunkContext.getStepContext().getStepExecution());
+
+ expect(documentDestination.open(DOCUMENT_2)).andReturn(documentItemWriter2);
+ documentItemWriter2.write(DOCUMENT_2); expectLastCall();
+ expect(reader.read()).andReturn(DOCUMENT_3);
+ expect(jobContextRepository.getStepExecution(JOB_EXECUTION_ID, STEP_EXECUTION_ID)).andReturn(stoppingChunkContext.getStepContext().getStepExecution());
+ documentItemWriter2.revert(); expectLastCall();
+ reader.close(); expectLastCall();
+
+ reader.open(executionContext(chunkContext));
+ expect(reader.read()).andReturn(DOCUMENT_3);
+ expect(documentDestination.open(DOCUMENT_3)).andReturn(documentItemWriter3);
+ documentItemWriter3.write(DOCUMENT_3); expectLastCall();
+ documentItemWriter3.close(); expectLastCall();
+
+ expect(reader.read()).andReturn(null);
+ reader.close(); expectLastCall();
+ replayAll();
+
+ RepeatStatus repeatStatus = documentExporter.execute(null, this.chunkContext);
+ assertThat(repeatStatus, is(RepeatStatus.CONTINUABLE));
+ repeatStatus = documentExporter.execute(null, this.chunkContext);
+ assertThat(repeatStatus, is(RepeatStatus.FINISHED));
+ }
}
\ No newline at end of file
diff --git a/ambari-infra/ambari-infra-manager/src/test/resoruces/vagrant-infra-manager.properties.sample b/ambari-infra/ambari-infra-manager/src/test/resoruces/vagrant-infra-manager.properties.sample
index f008a53..d722f0e 100644
--- a/ambari-infra/ambari-infra-manager/src/test/resoruces/vagrant-infra-manager.properties.sample
+++ b/ambari-infra/ambari-infra-manager/src/test/resoruces/vagrant-infra-manager.properties.sample
@@ -12,6 +12,7 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
+
infra-manager.batch.db.file=job-repository.db
infra-manager.batch.db.init=true
infra-manager.batch.db.username=admin
@@ -20,18 +21,19 @@ management.security.enabled=false
management.health.solr.enabled=false
infra-manager.server.data.folder=/tmp/ambariInfraManager
-infra-manager.jobs.solr_data_export.archive_service_logs.solr.zoo_keeper_connection_string=c6401.ambari.apache.org:2181/infra-solr
-infra-manager.jobs.solr_data_export.archive_service_logs.solr.collection=hadoop_logs
-infra-manager.jobs.solr_data_export.archive_service_logs.solr.query_text=logtime:[${start} TO ${end}]
-infra-manager.jobs.solr_data_export.archive_service_logs.solr.filter_query_text=(logtime:${logtime} AND id:{${id} TO *]) OR logtime:{${logtime} TO ${end}]
-infra-manager.jobs.solr_data_export.archive_service_logs.solr.sort_column[0]=logtime
-infra-manager.jobs.solr_data_export.archive_service_logs.solr.sort_column[1]=id
-infra-manager.jobs.solr_data_export.archive_service_logs.solr.delete_query_text=logtime:[${start.logtime} TO ${end.logtime}} OR (logtime:${end.logtime} AND id:[* TO ${end.id}])
-infra-manager.jobs.solr_data_export.archive_service_logs.read_block_size=2000
-infra-manager.jobs.solr_data_export.archive_service_logs.write_block_size=1000
-infra-manager.jobs.solr_data_export.archive_service_logs.destination=HDFS
-infra-manager.jobs.solr_data_export.archive_service_logs.file_name_suffix_column=logtime
-infra-manager.jobs.solr_data_export.archive_service_logs.file_name_suffix_date_format=yyyy-MM-dd'T'HH-mm-ss.SSSX
-infra-manager.jobs.solr_data_export.archive_service_logs.hdfs_endpoint=hdfs://c6401.ambari.apache.org:8020
-infra-manager.jobs.solr_data_export.archive_service_logs.hdfs_destination_directory=/archived_service_logs
+infra-manager.jobs.solr_data_archiving.archive_service_logs.enabled=true
+infra-manager.jobs.solr_data_archiving.archive_service_logs.solr.zoo_keeper_connection_string=c6401.ambari.apache.org:2181/infra-solr
+infra-manager.jobs.solr_data_archiving.archive_service_logs.solr.collection=hadoop_logs
+infra-manager.jobs.solr_data_archiving.archive_service_logs.solr.query_text=logtime:[${start} TO ${end}]
+infra-manager.jobs.solr_data_archiving.archive_service_logs.solr.filter_query_text=(logtime:${logtime} AND id:{${id} TO *]) OR logtime:{${logtime} TO ${end}]
+infra-manager.jobs.solr_data_archiving.archive_service_logs.solr.sort_column[0]=logtime
+infra-manager.jobs.solr_data_archiving.archive_service_logs.solr.sort_column[1]=id
+infra-manager.jobs.solr_data_archiving.archive_service_logs.solr.delete_query_text=logtime:[${start.logtime} TO ${end.logtime}} OR (logtime:${end.logtime} AND id:[* TO ${end.id}])
+infra-manager.jobs.solr_data_archiving.archive_service_logs.read_block_size=2000
+infra-manager.jobs.solr_data_archiving.archive_service_logs.write_block_size=1000
+infra-manager.jobs.solr_data_archiving.archive_service_logs.destination=HDFS
+infra-manager.jobs.solr_data_archiving.archive_service_logs.file_name_suffix_column=logtime
+infra-manager.jobs.solr_data_archiving.archive_service_logs.file_name_suffix_date_format=yyyy-MM-dd'T'HH-mm-ss.SSSX
+infra-manager.jobs.solr_data_archiving.archive_service_logs.hdfs_endpoint=hdfs://c6401.ambari.apache.org:8020
+infra-manager.jobs.solr_data_archiving.archive_service_logs.hdfs_destination_directory=/archived_service_logs
# Note: set hdfs user using the HADOOP_USER_NAME environmental variable. Value: hdfs
\ No newline at end of file
--
To stop receiving notification emails like this one, please contact
oleewere@apache.org.