You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ambari.apache.org by nc...@apache.org on 2018/01/02 16:56:02 UTC
[11/37] ambari git commit: AMBARI-22514,
AMBARI-22653. Ambari Infra Manager: solr data exporting jobs and
integration test environment. (Krisztian Kasa via swagle)
AMBARI-22514, AMBARI-22653. Ambari Infra Manager: solr data exporting jobs and integration test environment. (Krisztian Kasa via swagle)
Project: http://git-wip-us.apache.org/repos/asf/ambari/repo
Commit: http://git-wip-us.apache.org/repos/asf/ambari/commit/36d0271f
Tree: http://git-wip-us.apache.org/repos/asf/ambari/tree/36d0271f
Diff: http://git-wip-us.apache.org/repos/asf/ambari/diff/36d0271f
Branch: refs/heads/branch-feature-AMBARI-21674
Commit: 36d0271f74a70f5cfeca0e5ca0ebeb795fab6138
Parents: a15fc7f
Author: Siddharth Wagle <sw...@hortonworks.com>
Authored: Thu Dec 21 13:24:03 2017 -0800
Committer: Siddharth Wagle <sw...@hortonworks.com>
Committed: Thu Dec 21 13:24:03 2017 -0800
----------------------------------------------------------------------
ambari-infra/ambari-infra-manager-it/pom.xml | 155 +++++++++++++
.../org/apache/ambari/infra/InfraClient.java | 93 ++++++++
.../ambari/infra/InfraManagerStories.java | 108 +++++++++
.../ambari/infra/OffsetDateTimeConverter.java | 39 ++++
.../ambari/infra/steps/AbstractInfraSteps.java | 223 +++++++++++++++++++
.../ambari/infra/steps/ExportJobsSteps.java | 106 +++++++++
.../src/test/resources/log4j.properties | 16 ++
.../resources/stories/infra_api_tests.story | 23 ++
.../ambari-infra-manager/docker/Dockerfile | 6 +-
.../docker/docker-compose.yml | 81 +++++++
.../docker/infra-manager-docker-compose.sh | 105 +++++++++
.../apache/ambari/infra/job/ObjectSource.java | 23 ++
.../infra/job/archive/AbstractFileAction.java | 33 +++
.../infra/job/archive/CompositeFileAction.java | 7 +-
.../ambari/infra/job/archive/Document.java | 1 -
.../archive/DocumentExportConfiguration.java | 74 +++---
.../job/archive/DocumentExportJobListener.java | 23 ++
.../job/archive/DocumentExportProperties.java | 140 +++++++++---
.../job/archive/DocumentExportPropertyMap.java | 38 ++++
.../job/archive/DocumentExportStepListener.java | 47 ----
.../infra/job/archive/DocumentItemReader.java | 8 +-
.../infra/job/archive/DocumentIterator.java | 5 +-
.../infra/job/archive/DocumentSource.java | 7 +-
.../ambari/infra/job/archive/FileAction.java | 2 +-
.../job/archive/LocalDocumentItemWriter.java | 8 +-
.../ambari/infra/job/archive/S3Properties.java | 57 ++---
.../ambari/infra/job/archive/S3Uploader.java | 23 +-
.../infra/job/archive/SolrDocumentIterator.java | 3 +-
.../infra/job/archive/SolrDocumentSource.java | 22 +-
.../infra/job/archive/SolrQueryBuilder.java | 28 ++-
.../infra/job/archive/SolrQueryProperties.java | 40 +++-
.../infra/job/archive/TarGzCompressor.java | 2 +-
.../src/main/resources/infra-manager.properties | 48 +++-
.../archive/DocumentExportPropertiesTest.java | 54 +++++
.../job/archive/DocumentItemReaderTest.java | 8 +-
.../archive/LocalDocumentItemWriterTest.java | 8 +-
.../infra/job/archive/SolrQueryBuilderTest.java | 18 +-
.../job/archive/SolrQueryPropertiesTest.java | 54 +++++
ambari-infra/pom.xml | 5 +-
39 files changed, 1532 insertions(+), 209 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ambari/blob/36d0271f/ambari-infra/ambari-infra-manager-it/pom.xml
----------------------------------------------------------------------
diff --git a/ambari-infra/ambari-infra-manager-it/pom.xml b/ambari-infra/ambari-infra-manager-it/pom.xml
new file mode 100644
index 0000000..97e8ea0
--- /dev/null
+++ b/ambari-infra/ambari-infra-manager-it/pom.xml
@@ -0,0 +1,155 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+
+ <parent>
+ <artifactId>ambari-infra</artifactId>
+ <groupId>org.apache.ambari</groupId>
+ <version>2.0.0.0-SNAPSHOT</version>
+ </parent>
+
+ <name>Ambari Infra Manager Integration Tests</name>
+ <url>http://maven.apache.org</url>
+ <modelVersion>4.0.0</modelVersion>
+
+ <artifactId>ambari-infra-manager-it</artifactId>
+
+ <properties>
+ <jbehave.version>4.0.5</jbehave.version>
+ <failsafe-plugin.version>2.20</failsafe-plugin.version>
+ <docker.host>localhost</docker.host>
+ <stories.location>NONE</stories.location>
+ </properties>
+
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.solr</groupId>
+ <artifactId>solr-solrj</artifactId>
+ <version>${solr.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>com.amazonaws</groupId>
+ <artifactId>aws-java-sdk-s3</artifactId>
+ <version>1.11.5</version>
+ </dependency>
+ <dependency>
+ <groupId>commons-io</groupId>
+ <artifactId>commons-io</artifactId>
+ <version>2.5</version>
+ </dependency>
+
+ <dependency>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-api</artifactId>
+ <version>1.7.20</version>
+ </dependency>
+ <dependency>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-log4j12</artifactId>
+ <version>1.7.20</version>
+ </dependency>
+
+ <dependency>
+ <groupId>org.jbehave</groupId>
+ <artifactId>jbehave-core</artifactId>
+ <version>${jbehave.version}</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>junit</groupId>
+ <artifactId>junit</artifactId>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.easymock</groupId>
+ <artifactId>easymock</artifactId>
+ <version>3.4</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.hamcrest</groupId>
+ <artifactId>hamcrest-all</artifactId>
+ <version>1.3</version>
+ <scope>test</scope>
+ </dependency>
+ </dependencies>
+
+ <build>
+ <testOutputDirectory>target/classes</testOutputDirectory>
+ <testResources>
+ <testResource>
+ <directory>src/test/java/</directory>
+ <includes>
+ <include>**/*.story</include>
+ </includes>
+ </testResource>
+ <testResource>
+ <directory>src/test/resources</directory>
+ </testResource>
+ </testResources>
+ </build>
+
+ <profiles>
+ <profile>
+ <id>it</id>
+ <activation>
+ <property>
+ <name>it</name>
+ </property>
+ </activation>
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-failsafe-plugin</artifactId>
+ <version>${failsafe-plugin.version}</version>
+ <executions>
+ <execution>
+ <id>run-integration-tests</id>
+ <phase>integration-test</phase>
+ <goals>
+ <goal>integration-test</goal>
+ </goals>
+ <configuration>
+ <includes>
+ <include>**/*Stories.java</include>
+ </includes>
+ <systemPropertyVariables>
+ <log4j.configuration>file:${project.build.testOutputDirectory}/log4j.properties</log4j.configuration>
+ <docker.host>${docker.host}</docker.host>
+ <backend.stories.location>${stories.location}</backend.stories.location>
+ </systemPropertyVariables>
+ </configuration>
+ </execution>
+ <execution>
+ <id>verify-integration-tests</id>
+ <phase>verify</phase>
+ <goals>
+ <goal>verify</goal>
+ </goals>
+ </execution>
+ </executions>
+ </plugin>
+ </plugins>
+ </build>
+ </profile>
+ </profiles>
+
+</project>
http://git-wip-us.apache.org/repos/asf/ambari/blob/36d0271f/ambari-infra/ambari-infra-manager-it/src/test/java/org/apache/ambari/infra/InfraClient.java
----------------------------------------------------------------------
diff --git a/ambari-infra/ambari-infra-manager-it/src/test/java/org/apache/ambari/infra/InfraClient.java b/ambari-infra/ambari-infra-manager-it/src/test/java/org/apache/ambari/infra/InfraClient.java
new file mode 100644
index 0000000..0e391a3
--- /dev/null
+++ b/ambari-infra/ambari-infra-manager-it/src/test/java/org/apache/ambari/infra/InfraClient.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.ambari.infra;
+
+import org.apache.commons.io.IOUtils;
+import org.apache.http.client.ClientProtocolException;
+import org.apache.http.client.methods.CloseableHttpResponse;
+import org.apache.http.client.methods.HttpGet;
+import org.apache.http.client.methods.HttpPost;
+import org.apache.http.client.methods.HttpRequestBase;
+import org.apache.http.client.utils.URIBuilder;
+import org.apache.http.impl.client.CloseableHttpClient;
+import org.apache.http.impl.client.DefaultHttpRequestRetryHandler;
+import org.apache.http.impl.client.HttpClientBuilder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.io.UncheckedIOException;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.nio.charset.Charset;
+
+import static org.apache.commons.lang.StringUtils.isBlank;
+
+// TODO: use swagger
+public class InfraClient implements AutoCloseable {
+ private static final Logger LOG = LoggerFactory.getLogger(InfraClient.class);
+
+ private final CloseableHttpClient httpClient;
+ private final URI baseUrl;
+
+ public InfraClient(String baseUrl) {
+ try {
+ this.baseUrl = new URI(baseUrl);
+ } catch (URISyntaxException e) {
+ throw new RuntimeException(e);
+ }
+ httpClient = HttpClientBuilder.create().setRetryHandler(new DefaultHttpRequestRetryHandler(0, false)).build();
+ }
+
+ @Override
+ public void close() throws Exception {
+ httpClient.close();
+ }
+
+ // TODO: return job data
+ public void getJobs() {
+ execute(new HttpGet(baseUrl));
+ }
+
+ private String execute(HttpRequestBase post) {
+ try (CloseableHttpResponse response = httpClient.execute(post)) {
+ String responseBodyText = IOUtils.toString(response.getEntity().getContent(), Charset.defaultCharset());
+ LOG.info("Response code {} body {} ", response.getStatusLine().getStatusCode(), responseBodyText);
+ return responseBodyText;
+ } catch (ClientProtocolException e) {
+ throw new RuntimeException(e);
+ } catch (IOException e) {
+ throw new UncheckedIOException(e);
+ }
+ }
+
+ // TODO: return job data
+ public void startJob(String jobName, String parameters) {
+ URIBuilder uriBuilder = new URIBuilder(baseUrl);
+ uriBuilder.setScheme("http");
+ uriBuilder.setPath(uriBuilder.getPath() + "/" + jobName);
+ if (!isBlank(parameters))
+ uriBuilder.addParameter("params", parameters);
+ try {
+ execute(new HttpPost(uriBuilder.build()));
+ } catch (URISyntaxException e) {
+ throw new RuntimeException(e);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/ambari/blob/36d0271f/ambari-infra/ambari-infra-manager-it/src/test/java/org/apache/ambari/infra/InfraManagerStories.java
----------------------------------------------------------------------
diff --git a/ambari-infra/ambari-infra-manager-it/src/test/java/org/apache/ambari/infra/InfraManagerStories.java b/ambari-infra/ambari-infra-manager-it/src/test/java/org/apache/ambari/infra/InfraManagerStories.java
new file mode 100644
index 0000000..cf720ef
--- /dev/null
+++ b/ambari-infra/ambari-infra-manager-it/src/test/java/org/apache/ambari/infra/InfraManagerStories.java
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.ambari.infra;
+
+import com.google.common.collect.Lists;
+import org.apache.ambari.infra.steps.ExportJobsSteps;
+import org.apache.commons.lang.StringUtils;
+import org.jbehave.core.configuration.Configuration;
+import org.jbehave.core.configuration.MostUsefulConfiguration;
+import org.jbehave.core.io.LoadFromClasspath;
+import org.jbehave.core.io.LoadFromRelativeFile;
+import org.jbehave.core.io.StoryFinder;
+import org.jbehave.core.io.StoryLoader;
+import org.jbehave.core.junit.JUnitStories;
+import org.jbehave.core.reporters.Format;
+import org.jbehave.core.reporters.StoryReporterBuilder;
+import org.jbehave.core.steps.InjectableStepsFactory;
+import org.jbehave.core.steps.InstanceStepsFactory;
+import org.jbehave.core.steps.ParameterConverters;
+
+import java.io.File;
+import java.net.URL;
+import java.util.List;
+
+import static java.util.Collections.singletonList;
+import static org.jbehave.core.io.CodeLocations.codeLocationFromClass;
+
+public class InfraManagerStories extends JUnitStories {
+ private static final String BACKEND_STORIES_LOCATION_PROPERTY = "backend.stories.location";
+ private static final String STORY_SUFFIX = ".story";
+
+ @Override
+ public Configuration configuration() {
+ return new MostUsefulConfiguration()
+ .useStoryLoader(getStoryLoader(BACKEND_STORIES_LOCATION_PROPERTY, this.getClass()))
+ .useParameterConverters(new ParameterConverters().addConverters(new OffsetDateTimeConverter()))
+ .useStoryReporterBuilder(
+ new StoryReporterBuilder().withFailureTrace(true).withDefaultFormats().withFormats(Format.CONSOLE, Format.TXT));
+ }
+
+ private static StoryLoader getStoryLoader(String property, Class clazz) {
+ boolean useExternalStoryLocation = useExternalStoryLocation(property);
+ if (useExternalStoryLocation) {
+ try {
+ return new LoadFromRelativeFile(new URL("file://" + System.getProperty(property)));
+ } catch (Exception e) {
+ throw new RuntimeException("Cannot load story files from url: file://" + System.getProperty(property));
+ }
+ } else {
+ return new LoadFromClasspath(clazz);
+ }
+ }
+
+ @Override
+ public InjectableStepsFactory stepsFactory() {
+ return new InstanceStepsFactory(configuration(), new ExportJobsSteps());
+ }
+
+ @Override
+ protected List<String> storyPaths() {
+ return findStories(BACKEND_STORIES_LOCATION_PROPERTY, STORY_SUFFIX, this.getClass());
+ }
+
+ private static List<String> findStories(String property, String suffix, Class clazz) {
+ if (useExternalStoryLocation(property)) {
+ return findStoriesInFolder(System.getProperty(property), suffix);
+ } else {
+ return new StoryFinder()
+ .findPaths(codeLocationFromClass(clazz).getFile(), singletonList(String.format("**/*%s", suffix)), null);
+ }
+ }
+
+ private static List<String> findStoriesInFolder(String folderAbsolutePath, String suffix) {
+ List<String> results = Lists.newArrayList();
+ File folder = new File(folderAbsolutePath);
+ File[] listOfFiles = folder.listFiles();
+ if (listOfFiles != null) {
+ for (File file : listOfFiles) {
+ if (file.getName().endsWith(suffix)) {
+ results.add(file.getName());
+ }
+ }
+ }
+ return results;
+ }
+
+ private static boolean useExternalStoryLocation(String property) {
+ String storyLocationProp = System.getProperty(property);
+ return StringUtils.isNotEmpty(storyLocationProp) && !"NONE".equals(storyLocationProp);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/ambari/blob/36d0271f/ambari-infra/ambari-infra-manager-it/src/test/java/org/apache/ambari/infra/OffsetDateTimeConverter.java
----------------------------------------------------------------------
diff --git a/ambari-infra/ambari-infra-manager-it/src/test/java/org/apache/ambari/infra/OffsetDateTimeConverter.java b/ambari-infra/ambari-infra-manager-it/src/test/java/org/apache/ambari/infra/OffsetDateTimeConverter.java
new file mode 100644
index 0000000..9db562c
--- /dev/null
+++ b/ambari-infra/ambari-infra-manager-it/src/test/java/org/apache/ambari/infra/OffsetDateTimeConverter.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.ambari.infra;
+
+import org.jbehave.core.steps.ParameterConverters;
+
+import java.lang.reflect.Type;
+import java.time.OffsetDateTime;
+import java.time.format.DateTimeFormatter;
+
+public class OffsetDateTimeConverter implements ParameterConverters.ParameterConverter {
+ private static final DateTimeFormatter SOLR_DATETIME_FORMATTER = DateTimeFormatter.ofPattern("yyyy-MM-dd'T'HH:mm:ss.SSSX");
+
+ @Override
+ public boolean accept(Type type) {
+ return type instanceof Class<?> && OffsetDateTime.class.isAssignableFrom((Class<?>) type);
+ }
+
+ @Override
+ public Object convertValue(String value, Type type) {
+ return OffsetDateTime.parse(value, SOLR_DATETIME_FORMATTER);
+ }
+}
http://git-wip-us.apache.org/repos/asf/ambari/blob/36d0271f/ambari-infra/ambari-infra-manager-it/src/test/java/org/apache/ambari/infra/steps/AbstractInfraSteps.java
----------------------------------------------------------------------
diff --git a/ambari-infra/ambari-infra-manager-it/src/test/java/org/apache/ambari/infra/steps/AbstractInfraSteps.java b/ambari-infra/ambari-infra-manager-it/src/test/java/org/apache/ambari/infra/steps/AbstractInfraSteps.java
new file mode 100644
index 0000000..703e1cf
--- /dev/null
+++ b/ambari-infra/ambari-infra-manager-it/src/test/java/org/apache/ambari/infra/steps/AbstractInfraSteps.java
@@ -0,0 +1,223 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.ambari.infra.steps;
+
+import com.amazonaws.auth.BasicAWSCredentials;
+import com.amazonaws.services.s3.AmazonS3Client;
+import com.amazonaws.services.s3.model.ListObjectsRequest;
+import com.amazonaws.services.s3.model.ObjectListing;
+import org.apache.ambari.infra.InfraClient;
+import org.apache.commons.io.IOUtils;
+import org.apache.commons.lang.StringUtils;
+import org.apache.http.client.methods.CloseableHttpResponse;
+import org.apache.http.client.methods.HttpGet;
+import org.apache.http.impl.client.CloseableHttpClient;
+import org.apache.http.impl.client.DefaultHttpRequestRetryHandler;
+import org.apache.http.impl.client.HttpClientBuilder;
+import org.apache.solr.client.solrj.SolrClient;
+import org.apache.solr.client.solrj.SolrServerException;
+import org.apache.solr.client.solrj.impl.LBHttpSolrClient;
+import org.apache.solr.common.SolrInputDocument;
+import org.jbehave.core.annotations.AfterStories;
+import org.jbehave.core.annotations.BeforeStories;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.UncheckedIOException;
+import java.net.URL;
+import java.nio.charset.StandardCharsets;
+import java.time.OffsetDateTime;
+import java.util.Date;
+import java.util.UUID;
+import java.util.function.BooleanSupplier;
+
+import static java.lang.System.currentTimeMillis;
+
+public abstract class AbstractInfraSteps {
+ private static final Logger LOG = LoggerFactory.getLogger(AbstractInfraSteps.class);
+
+ private static final int SOLR_PORT = 8983;
+ private static final int INFRA_MANAGER_PORT = 61890;
+ private static final int FAKE_S3_PORT = 4569;
+ private static final String AUDIT_LOGS_COLLECTION = "audit_logs";
+ protected static final String S3_BUCKET_NAME = "testbucket";
+ private String ambariFolder;
+ private String shellScriptLocation;
+ private String dockerHost;
+ private SolrClient solrClient;
+ private AmazonS3Client s3client;
+
+ public InfraClient getInfraClient() {
+ return new InfraClient(String.format("http://%s:%d/api/v1/jobs", dockerHost, INFRA_MANAGER_PORT));
+ }
+
+ public SolrClient getSolrClient() {
+ return solrClient;
+ }
+
+ public AmazonS3Client getS3client() {
+ return s3client;
+ }
+
+ @BeforeStories
+ public void initDockerContainer() throws Exception {
+ LOG.info("Create new docker container for testing Ambari Infra Manager ...");
+ URL location = AbstractInfraSteps.class.getProtectionDomain().getCodeSource().getLocation();
+ ambariFolder = new File(location.toURI()).getParentFile().getParentFile().getParentFile().getParent();
+ shellScriptLocation = ambariFolder + "/ambari-infra/ambari-infra-manager/docker/infra-manager-docker-compose.sh";
+
+ runCommand(new String[]{shellScriptLocation, "start"});
+
+ dockerHost = System.getProperty("docker.host") != null ? System.getProperty("docker.host") : "localhost";
+
+ waitUntilSolrIsUp();
+
+ solrClient = new LBHttpSolrClient.Builder().withBaseSolrUrls(String.format("http://%s:%d/solr/%s_shard1_replica1",
+ dockerHost,
+ SOLR_PORT,
+ AUDIT_LOGS_COLLECTION)).build();
+
+ LOG.info("Creating collection");
+ runCommand(new String[]{"docker", "exec", "docker_solr_1", "solr", "create_collection", "-c", AUDIT_LOGS_COLLECTION, "-d", "configsets/"+ AUDIT_LOGS_COLLECTION +"/conf", "-n", AUDIT_LOGS_COLLECTION + "_conf"});
+
+ LOG.info("Initializing s3 client");
+ s3client = new AmazonS3Client(new BasicAWSCredentials("remote-identity", "remote-credential"));
+ s3client.setEndpoint(String.format("http://%s:%d", dockerHost, FAKE_S3_PORT));
+ s3client.createBucket(S3_BUCKET_NAME);
+
+ checkInfraManagerReachable();
+ }
+
+ protected void runCommand(String[] command) {
+ try {
+ LOG.info("Exec command: {}", StringUtils.join(command, " "));
+ Process process = Runtime.getRuntime().exec(command);
+ String stdout = IOUtils.toString(process.getInputStream(), StandardCharsets.UTF_8);
+ LOG.info("Exec command result {}", stdout);
+ } catch (Exception e) {
+ throw new RuntimeException("Error during execute shell command: ", e);
+ }
+ }
+
+ private void waitUntilSolrIsUp() throws Exception {
+ try(CloseableHttpClient httpClient = HttpClientBuilder.create().setRetryHandler(new DefaultHttpRequestRetryHandler(0, false)).build()) {
+ doWithin(60, "Start Solr", () -> pingSolr(httpClient));
+ }
+ }
+
+ protected void doWithin(int sec, String actionName, BooleanSupplier predicate) {
+ doWithin(sec, actionName, () -> {
+ if (!predicate.getAsBoolean())
+ throw new RuntimeException("Predicate was false!");
+ });
+ }
+
+ protected void doWithin(int sec, String actionName, Runnable runnable) {
+ long start = currentTimeMillis();
+ Exception exception;
+ while (true) {
+ try {
+ runnable.run();
+ return;
+ }
+ catch (Exception e) {
+ exception = e;
+ }
+
+ if (currentTimeMillis() - start > sec * 1000) {
+ throw new AssertionError(String.format("Unable to perform action '%s' within %d seconds", actionName, sec), exception);
+ }
+ else {
+ LOG.info("Performing action '{}' failed. retrying...", actionName);
+ }
+ try {
+ Thread.sleep(1000);
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new RuntimeException(e);
+ }
+ }
+ }
+
+ private boolean pingSolr(CloseableHttpClient httpClient) {
+ try (CloseableHttpResponse response = httpClient.execute(new HttpGet(String.format("http://%s:%d/solr/admin/collections?action=LIST", dockerHost, SOLR_PORT)))) {
+ return response.getStatusLine().getStatusCode() == 200;
+ }
+ catch (IOException e) {
+ throw new UncheckedIOException(e);
+ }
+ }
+
+ private void checkInfraManagerReachable() throws Exception {
+ try (InfraClient httpClient = getInfraClient()) {
+ doWithin(30, "Start Ambari Infra Manager", httpClient::getJobs);
+ LOG.info("Ambari Infra Manager is up and running");
+ }
+ }
+
+ protected void addDocument(OffsetDateTime logtime) throws SolrServerException, IOException {
+ SolrInputDocument solrInputDocument = new SolrInputDocument();
+ solrInputDocument.addField("logType", "HDFSAudit");
+ solrInputDocument.addField("cluster", "cl1");
+ solrInputDocument.addField("event_count", 1);
+ solrInputDocument.addField("repo", "hdfs");
+ solrInputDocument.addField("reqUser", "ambari-qa");
+ solrInputDocument.addField("type", "hdfs_audit");
+ solrInputDocument.addField("seq_num", 9);
+ solrInputDocument.addField("result", 1);
+ solrInputDocument.addField("path", "/root/test-logs/hdfs-audit/hdfs-audit.log");
+ solrInputDocument.addField("ugi", "ambari-qa (auth:SIMPLE)");
+ solrInputDocument.addField("host", "logfeeder.apache.org");
+ solrInputDocument.addField("action", "getfileinfo");
+ solrInputDocument.addField("log_message", "allowed=true\tugi=ambari-qa (auth:SIMPLE)\tip=/192.168.64.102\tcmd=getfileinfo\tsrc=/ats/active\tdst=null\tperm=null\tproto=rpc\tcallerContext=HIVE_QUERY_ID:ambari-qa_20160317200111_223b3079-4a2d-431c-920f-6ba37ed63e9f");
+ solrInputDocument.addField("logger_name", "FSNamesystem.audit");
+ solrInputDocument.addField("id", UUID.randomUUID().toString());
+ solrInputDocument.addField("authType", "SIMPLE");
+ solrInputDocument.addField("logfile_line_number", 1);
+ solrInputDocument.addField("cliIP", "/192.168.64.102");
+ solrInputDocument.addField("level", "INFO");
+ solrInputDocument.addField("resource", "/ats/active");
+ solrInputDocument.addField("ip", "172.18.0.2");
+ solrInputDocument.addField("evtTime", "2017-12-08T10:23:16.452Z");
+ solrInputDocument.addField("req_caller_id", "HIVE_QUERY_ID:ambari-qa_20160317200111_223b3079-4a2d-431c-920f-6ba37ed63e9f");
+ solrInputDocument.addField("repoType", 1);
+ solrInputDocument.addField("enforcer", "hadoop-acl");
+ solrInputDocument.addField("cliType", "rpc");
+ solrInputDocument.addField("message_md5", "-6778765776916226588");
+ solrInputDocument.addField("event_md5", "5627261521757462732");
+ solrInputDocument.addField("logtime", new Date(logtime.toInstant().toEpochMilli()));
+ solrInputDocument.addField("_ttl_", "+7DAYS");
+ solrInputDocument.addField("_expire_at_", "2017-12-15T10:23:19.106Z");
+ solrClient.add(solrInputDocument);
+ }
+
+ @AfterStories
+ public void shutdownContainers() throws Exception {
+ Thread.sleep(2000); // sync with s3 server
+ ListObjectsRequest listObjectsRequest = new ListObjectsRequest().withBucketName(S3_BUCKET_NAME);
+ ObjectListing objectListing = getS3client().listObjects(listObjectsRequest);
+ LOG.info("Found {} files on s3.", objectListing.getObjectSummaries().size());
+ objectListing.getObjectSummaries().forEach(s3ObjectSummary -> LOG.info("Found file in s3 with key {}", s3ObjectSummary.getKey()));
+
+ LOG.info("shutdown containers");
+ runCommand(new String[]{shellScriptLocation, "stop"});
+ }
+}
http://git-wip-us.apache.org/repos/asf/ambari/blob/36d0271f/ambari-infra/ambari-infra-manager-it/src/test/java/org/apache/ambari/infra/steps/ExportJobsSteps.java
----------------------------------------------------------------------
diff --git a/ambari-infra/ambari-infra-manager-it/src/test/java/org/apache/ambari/infra/steps/ExportJobsSteps.java b/ambari-infra/ambari-infra-manager-it/src/test/java/org/apache/ambari/infra/steps/ExportJobsSteps.java
new file mode 100644
index 0000000..4a09d7d
--- /dev/null
+++ b/ambari-infra/ambari-infra-manager-it/src/test/java/org/apache/ambari/infra/steps/ExportJobsSteps.java
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.ambari.infra.steps;
+
+import com.amazonaws.services.s3.AmazonS3Client;
+import com.amazonaws.services.s3.model.ListObjectsRequest;
+import com.amazonaws.services.s3.model.ObjectListing;
+import com.amazonaws.services.s3.model.ObjectMetadata;
+import org.apache.ambari.infra.InfraClient;
+import org.jbehave.core.annotations.Alias;
+import org.jbehave.core.annotations.Given;
+import org.jbehave.core.annotations.Then;
+import org.jbehave.core.annotations.When;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.ByteArrayInputStream;
+import java.time.Duration;
+import java.time.OffsetDateTime;
+
+import static org.hamcrest.Matchers.containsString;
+import static org.hamcrest.Matchers.hasProperty;
+import static org.hamcrest.core.IsCollectionContaining.hasItem;
+import static org.junit.Assert.assertThat;
+
+public class ExportJobsSteps extends AbstractInfraSteps {
+ private static final Logger LOG = LoggerFactory.getLogger(ExportJobsSteps.class);
+
+ @Given("$count documents in solr")
+ public void addDocuments(int count) throws Exception {
+ for (int i = 0; i < count; ++i)
+ addDocument(OffsetDateTime.now().minusMinutes(i));
+ getSolrClient().commit();
+ }
+
+ @Given("$count documents in solr with logtime from $startLogtime to $endLogtime")
+ public void addDocuments(long count, OffsetDateTime startLogtime, OffsetDateTime endLogtime) throws Exception {
+ Duration duration = Duration.between(startLogtime, endLogtime);
+ long increment = duration.toNanos() / count;
+ for (int i = 0; i < count; ++i)
+ addDocument(startLogtime.plusNanos(increment * i));
+ getSolrClient().commit();
+ }
+
+ @Given("a file on s3 with key $key")
+ public void addFileToS3(String key) throws Exception {
+ try (ByteArrayInputStream inputStream = new ByteArrayInputStream("anything".getBytes())) {
+ getS3client().putObject(S3_BUCKET_NAME, key, inputStream, new ObjectMetadata());
+ }
+ }
+
+ @When("start $jobName job")
+ public void startJob(String jobName) throws Exception {
+ startJob(jobName, null);
+ }
+
+ @When("start $jobName job with parameters $parameters")
+ @Alias("restart $jobName job with parameters $parameters")
+ public void startJob(String jobName, String parameters) throws Exception {
+ try (InfraClient httpClient = getInfraClient()) {
+ httpClient.startJob(jobName, parameters);
+ }
+ }
+
+ @When("delete file with key $key from s3")
+ public void deleteFileFromS3(String key) {
+ getS3client().deleteObject(S3_BUCKET_NAME, key);
+ }
+
+ @Then("Check filenames contains the text $text on s3 server after $waitSec seconds")
+ public void checkS3After(String text, int waitSec) throws Exception {
+ AmazonS3Client s3Client = getS3client();
+ ListObjectsRequest listObjectsRequest = new ListObjectsRequest().withBucketName(S3_BUCKET_NAME);
+ doWithin(waitSec, "check uploaded files to s3", () -> s3Client.doesBucketExist(S3_BUCKET_NAME)
+ && !s3Client.listObjects(listObjectsRequest).getObjectSummaries().isEmpty());
+
+ ObjectListing objectListing = s3Client.listObjects(listObjectsRequest);
+ assertThat(objectListing.getObjectSummaries(), hasItem(hasProperty("key", containsString(text))));
+ }
+
+ @Then("Check $count files exists on s3 server with filenames containing the text $text after $waitSec seconds")
+ public void checkNumberOfFilesOnS3(int count, String text, int waitSec) {
+ AmazonS3Client s3Client = getS3client();
+ ListObjectsRequest listObjectsRequest = new ListObjectsRequest().withBucketName(S3_BUCKET_NAME);
+ doWithin(waitSec, "check uploaded files to s3", () -> s3Client.doesBucketExist(S3_BUCKET_NAME)
+ && s3Client.listObjects(listObjectsRequest).getObjectSummaries().stream()
+ .filter(s3ObjectSummary -> s3ObjectSummary.getKey().contains(text))
+ .count() == count);
+ }
+}
http://git-wip-us.apache.org/repos/asf/ambari/blob/36d0271f/ambari-infra/ambari-infra-manager-it/src/test/resources/log4j.properties
----------------------------------------------------------------------
diff --git a/ambari-infra/ambari-infra-manager-it/src/test/resources/log4j.properties b/ambari-infra/ambari-infra-manager-it/src/test/resources/log4j.properties
new file mode 100644
index 0000000..956bc63
--- /dev/null
+++ b/ambari-infra/ambari-infra-manager-it/src/test/resources/log4j.properties
@@ -0,0 +1,16 @@
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+log4j.rootLogger=INFO, stdout
+log4j.appender.stdout=org.apache.log4j.ConsoleAppender
+log4j.appender.stdout.Target=System.out
+log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
+log4j.appender.stdout.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss} %-5p %c{1}:%L - %m%n
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ambari/blob/36d0271f/ambari-infra/ambari-infra-manager-it/src/test/resources/stories/infra_api_tests.story
----------------------------------------------------------------------
diff --git a/ambari-infra/ambari-infra-manager-it/src/test/resources/stories/infra_api_tests.story b/ambari-infra/ambari-infra-manager-it/src/test/resources/stories/infra_api_tests.story
new file mode 100644
index 0000000..cd1f49d
--- /dev/null
+++ b/ambari-infra/ambari-infra-manager-it/src/test/resources/stories/infra_api_tests.story
@@ -0,0 +1,23 @@
+Scenario: Export documents form solr and upload them to s3 using defult configuration
+
+Given 1000 documents in solr
+When start export_audit_logs job
+Then Check filenames contains the text audit_logs on s3 server after 20 seconds
+
+
+Scenario: Exporting 10 documents using writeBlockSize=3 produces 4 files
+
+Given 10 documents in solr with logtime from 2010-10-09T05:00:00.000Z to 2010-10-09T20:00:00.000Z
+When start export_audit_logs job with parameters writeBlockSize=3,start=2010-10-09T00:00:00.000Z,end=2010-10-11T00:00:00.000Z
+Then Check 4 files exists on s3 server with filenames containing the text solr_archive_audit_logs_-_2010-10-09 after 20 seconds
+
+
+Scenario: Export job fails when part of the data is exported. After resolving the issue and restarting the job exports the rest of the data.
+
+Given 200 documents in solr with logtime from 2011-10-09T05:00:00.000Z to 2011-10-09T20:00:00.000Z
+And a file on s3 with key solr_archive_audit_logs_-_2011-10-09T08:00:00.000Z.json.tar.gz
+When start export_audit_logs job with parameters writeBlockSize=20,start=2010-11-09T00:00:00.000Z,end=2011-10-11T00:00:00.000Z
+Then Check 3 files exists on s3 server with filenames containing the text solr_archive_audit_logs_-_2011-10-09 after 20 seconds
+When delete file with key solr_archive_audit_logs_-_2011-10-09T08:00:00.000Z.json.tar.gz from s3
+And restart export_audit_logs job with parameters writeBlockSize=20,start=2010-11-09T00:00:00.000Z,end=2011-10-11T00:00:00.000Z
+Then Check 10 files exists on s3 server with filenames containing the text solr_archive_audit_logs_-_2011-10-09 after 20 seconds
http://git-wip-us.apache.org/repos/asf/ambari/blob/36d0271f/ambari-infra/ambari-infra-manager/docker/Dockerfile
----------------------------------------------------------------------
diff --git a/ambari-infra/ambari-infra-manager/docker/Dockerfile b/ambari-infra/ambari-infra-manager/docker/Dockerfile
index adb584a..eaefe95 100644
--- a/ambari-infra/ambari-infra-manager/docker/Dockerfile
+++ b/ambari-infra/ambari-infra-manager/docker/Dockerfile
@@ -22,9 +22,9 @@ RUN yum -y install glibc-common
ENV HOME /root
#Install JAVA
-ENV JAVA_VERSION 8u31
-ENV BUILD_VERSION b13
-RUN wget --no-cookies --no-check-certificate --header "Cookie: oraclelicense=accept-securebackup-cookie" "http://download.oracle.com/otn-pub/java/jdk/$JAVA_VERSION-$BUILD_VERSION/jdk-$JAVA_VERSION-linux-x64.rpm" -O jdk-8-linux-x64.rpm
+ENV JAVA_VERSION 8u131
+ENV BUILD_VERSION b11
+RUN wget --no-check-certificate --no-cookies --header "Cookie:oraclelicense=accept-securebackup-cookie" http://download.oracle.com/otn-pub/java/jdk/$JAVA_VERSION-$BUILD_VERSION/d54c1d3a095b4ff2b6607d096fa80163/jdk-$JAVA_VERSION-linux-x64.rpm -O jdk-8-linux-x64.rpm
RUN rpm -ivh jdk-8-linux-x64.rpm
ENV JAVA_HOME /usr/java/default/
http://git-wip-us.apache.org/repos/asf/ambari/blob/36d0271f/ambari-infra/ambari-infra-manager/docker/docker-compose.yml
----------------------------------------------------------------------
diff --git a/ambari-infra/ambari-infra-manager/docker/docker-compose.yml b/ambari-infra/ambari-infra-manager/docker/docker-compose.yml
new file mode 100644
index 0000000..1172631
--- /dev/null
+++ b/ambari-infra/ambari-infra-manager/docker/docker-compose.yml
@@ -0,0 +1,81 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License
+version: '3.3'
+services:
+ zookeeper:
+ image: zookeeper:${ZOOKEEPER_VERSION:-3.4.10}
+ restart: always
+ hostname: zookeeper
+ networks:
+ - infra-network
+ ports:
+ - 2181:2181
+ environment:
+ ZOO_MY_ID: 1
+ ZOO_SERVERS: server.1=zookeeper:2888:3888
+ solr:
+ image: solr:${SOLR_VERSION:-6.6.2}
+ restart: always
+ hostname: solr
+ ports:
+ - "8983:8983"
+ networks:
+ - infra-network
+ env_file:
+ - Profile
+ entrypoint:
+ - docker-entrypoint.sh
+ - solr
+ - start
+ - "-f"
+ - "-c"
+ - "-z"
+ - ${ZOOKEEPER_CONNECTION_STRING}
+ volumes:
+ - $AMBARI_LOCATION/ambari-logsearch/ambari-logsearch-server/src/main/configsets:/opt/solr/configsets
+ localstack-s3:
+ image: localstack/localstack
+ ports:
+ - "4569:4569"
+ environment:
+ - SERVICES=s3:4569
+ hostname: fakes3
+ networks:
+ infra-network:
+ aliases:
+ - testbucket.fakes3
+ env_file:
+ - Profile
+ inframanager:
+ image: ambari-infra-manager:v1.0
+ restart: always
+ hostname: infra-manager.apache.org
+ networks:
+ - infra-network
+ env_file:
+ - Profile
+ ports:
+ - 61890:61890
+ - 5007:5007
+ environment:
+ COMPONENT: infra-manager
+ COMPONENT_LOG: infra-manager
+ ZK_CONNECT_STRING: ${ZOOKEEPER_CONNECTION_STRING}
+ DISPLAY: $DOCKERIP:0
+ volumes:
+ - $AMBARI_LOCATION/ambari-infra/ambari-infra-manager/target/package:/root/ambari-infra-manager
+networks:
+ infra-network:
+ driver: bridge
http://git-wip-us.apache.org/repos/asf/ambari/blob/36d0271f/ambari-infra/ambari-infra-manager/docker/infra-manager-docker-compose.sh
----------------------------------------------------------------------
diff --git a/ambari-infra/ambari-infra-manager/docker/infra-manager-docker-compose.sh b/ambari-infra/ambari-infra-manager/docker/infra-manager-docker-compose.sh
new file mode 100644
index 0000000..ab02659
--- /dev/null
+++ b/ambari-infra/ambari-infra-manager/docker/infra-manager-docker-compose.sh
@@ -0,0 +1,105 @@
+#!/bin/bash
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License
+
+sdir="`dirname \"$0\"`"
+: ${1:?"argument is missing: (start|stop)"}
+command="$1"
+
+function start_containers() {
+ check_env_files
+ echo "Start containers ..."
+ pushd $sdir/../
+ local AMBARI_INFRA_MANAGER_LOCATION=$(pwd)
+ echo $AMBARI_INFRA_MANAGER_LOCATION
+ kill_containers
+ cd $AMBARI_INFRA_MANAGER_LOCATION/docker
+ docker-compose up -d
+ popd
+ echo "Containers started"
+}
+
+function check_env_files() {
+ local count=0;
+
+ check_env_file .env setup_env
+ count=$((count + $?));
+ check_env_file Profile setup_profile
+ count=$((count + $?));
+
+ if [[ "$count" -gt 0 ]]
+ then
+ echo "Exit"
+ exit;
+ fi
+}
+
+function check_env_file() {
+ if [ -f "$sdir/$1" ];
+ then
+ echo "$1 file exists"
+ return 0;
+ else
+ echo "$1 file does not exist, Creating a new one..."
+ $2
+ echo "$1 file has been created. Check it out before starting Ambari Infra Manager. ($sdir/$1)"
+ return 1;
+ fi
+}
+
+function setup_env() {
+ pushd $sdir/../../
+ local AMBARI_LOCATION=$(pwd)
+ popd
+ local docker_ip=$(get_docker_ip)
+ cat << EOF > $sdir/.env
+DOCKERIP=$docker_ip
+MAVEN_REPOSITORY_LOCATION=$HOME/.m2
+AMBARI_LOCATION=$AMBARI_LOCATION
+
+ZOOKEEPER_VERSION=3.4.10
+ZOOKEEPER_CONNECTION_STRING=zookeeper:2181
+
+SOLR_VERSION=6.6.2
+EOF
+}
+
+function setup_profile() {
+ pushd $sdir/../../
+ local AMBARI_LOCATION=$(pwd)
+ popd
+ cat << EOF > $sdir/Profile
+EOF
+}
+
+function kill_containers() {
+ echo "Try to remove containers if exists ..."
+ docker rm -f docker_inframanager_1
+ docker rm -f docker_solr_1
+ docker rm -f docker_zookeeper_1
+ docker rm -f docker_localstack-s3_1
+}
+
+case $command in
+ "start")
+ start_containers
+ ;;
+ "stop")
+ kill_containers
+ ;;
+ *)
+ echo "Available commands: (start|stop|build-and-run|build|build-docker-and-run|build-mvn-and-run|build-docker-only|build-mvn-only)"
+ ;;
+esac
http://git-wip-us.apache.org/repos/asf/ambari/blob/36d0271f/ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/ObjectSource.java
----------------------------------------------------------------------
diff --git a/ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/ObjectSource.java b/ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/ObjectSource.java
new file mode 100644
index 0000000..98a1e0d
--- /dev/null
+++ b/ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/ObjectSource.java
@@ -0,0 +1,23 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.ambari.infra.job;
+
+public interface ObjectSource<T> {
+ CloseableIterator<T> open(T current, int rows);
+}
http://git-wip-us.apache.org/repos/asf/ambari/blob/36d0271f/ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/archive/AbstractFileAction.java
----------------------------------------------------------------------
diff --git a/ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/archive/AbstractFileAction.java b/ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/archive/AbstractFileAction.java
new file mode 100644
index 0000000..7a30393
--- /dev/null
+++ b/ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/archive/AbstractFileAction.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.ambari.infra.job.archive;
+
+import java.io.File;
+
+public abstract class AbstractFileAction implements FileAction {
+ @Override
+ public File perform(File inputFile, boolean deleteInput) {
+ File outputFile = perform(inputFile);
+ if (deleteInput)
+ inputFile.delete();
+ return outputFile;
+ }
+
+ protected abstract File perform(File inputFile);
+}
http://git-wip-us.apache.org/repos/asf/ambari/blob/36d0271f/ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/archive/CompositeFileAction.java
----------------------------------------------------------------------
diff --git a/ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/archive/CompositeFileAction.java b/ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/archive/CompositeFileAction.java
index 84ce160..8421802 100644
--- a/ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/archive/CompositeFileAction.java
+++ b/ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/archive/CompositeFileAction.java
@@ -19,6 +19,7 @@
package org.apache.ambari.infra.job.archive;
import java.io.File;
+import java.util.ArrayList;
import java.util.List;
import static java.util.Arrays.asList;
@@ -28,7 +29,7 @@ public class CompositeFileAction implements FileAction {
private final List<FileAction> actions;
public CompositeFileAction(FileAction... actions) {
- this.actions = asList(actions);
+ this.actions = new ArrayList<>(asList(actions));
}
public void add(FileAction action) {
@@ -36,10 +37,10 @@ public class CompositeFileAction implements FileAction {
}
@Override
- public File perform(File inputFile) {
+ public File perform(File inputFile, boolean deleteInput) {
File file = inputFile;
for (FileAction action : actions) {
- file = action.perform(file);
+ file = action.perform(file, deleteInput);
}
return file;
}
http://git-wip-us.apache.org/repos/asf/ambari/blob/36d0271f/ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/archive/Document.java
----------------------------------------------------------------------
diff --git a/ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/archive/Document.java b/ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/archive/Document.java
index 84f5ece..1f3957a 100644
--- a/ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/archive/Document.java
+++ b/ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/archive/Document.java
@@ -26,7 +26,6 @@ import java.util.Map;
import static java.util.Collections.unmodifiableMap;
-// TODO: create entities for each solr collections
public class Document {
private final Map<String, String> fieldMap;
http://git-wip-us.apache.org/repos/asf/ambari/blob/36d0271f/ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/archive/DocumentExportConfiguration.java
----------------------------------------------------------------------
diff --git a/ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/archive/DocumentExportConfiguration.java b/ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/archive/DocumentExportConfiguration.java
index 69f41d3..1895911 100644
--- a/ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/archive/DocumentExportConfiguration.java
+++ b/ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/archive/DocumentExportConfiguration.java
@@ -18,6 +18,7 @@
*/
package org.apache.ambari.infra.job.archive;
+import org.apache.ambari.infra.job.ObjectSource;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.batch.core.Job;
@@ -26,28 +27,23 @@ import org.springframework.batch.core.configuration.annotation.JobBuilderFactory
import org.springframework.batch.core.configuration.annotation.JobScope;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepScope;
+import org.springframework.batch.core.configuration.support.JobRegistryBeanPostProcessor;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
+import javax.annotation.PostConstruct;
import javax.inject.Inject;
import java.io.File;
import java.nio.file.Paths;
-import java.time.OffsetDateTime;
-import java.time.ZoneOffset;
-import java.time.format.DateTimeFormatter;
-
-import static org.apache.ambari.infra.job.archive.SolrDocumentSource.SOLR_DATETIME_FORMATTER;
-import static org.apache.commons.lang.StringUtils.isBlank;
@Configuration
public class DocumentExportConfiguration {
private static final Logger LOG = LoggerFactory.getLogger(DocumentExportConfiguration.class);
- private static final DateTimeFormatter FILENAME_DATETIME_FORMATTER = DateTimeFormatter.ofPattern("yyyy-MM-dd'T'HH_mm_ss.SSSX");
@Inject
- private DocumentExportProperties properties;
+ private DocumentExportPropertyMap propertyMap;
@Inject
private StepBuilderFactory steps;
@@ -55,11 +51,26 @@ public class DocumentExportConfiguration {
@Inject
private JobBuilderFactory jobs;
+ @Inject
+ @Qualifier("exportStep")
+ private Step exportStep;
+
+ @Inject
+ private JobRegistryBeanPostProcessor jobRegistryBeanPostProcessor;
- @Bean
- public Job logExportJob(@Qualifier("exportStep") Step logExportStep) {
- return jobs.get("solr_data_export").listener(new DocumentExportJobListener()).start(logExportStep).build();
+ @PostConstruct
+ public void createJobs() {
+ propertyMap.getSolrDataExport().values().forEach(DocumentExportProperties::validate);
+
+ propertyMap.getSolrDataExport().keySet().forEach(jobName -> {
+ Job job = logExportJob(jobName, exportStep);
+ jobRegistryBeanPostProcessor.postProcessAfterInitialization(job, jobName);
+ });
+ }
+
+ private Job logExportJob(String jobName, Step logExportStep) {
+ return jobs.get(jobName).listener(new DocumentExportJobListener(propertyMap)).start(logExportStep).build();
}
@Bean
@@ -67,16 +78,17 @@ public class DocumentExportConfiguration {
public Step exportStep(DocumentExporter documentExporter) {
return steps.get("export")
.tasklet(documentExporter)
- .listener(new DocumentExportStepListener(properties))
.build();
}
@Bean
@StepScope
- public DocumentExporter getDocumentExporter(DocumentItemReader documentItemReader,
- @Value("#{stepExecution.jobExecution.id}") String jobId) {
+ public DocumentExporter documentExporter(DocumentItemReader documentItemReader,
+ @Value("#{stepExecution.jobExecution.id}") String jobId,
+ @Value("#{stepExecution.jobExecution.executionContext.get('exportProperties')}") DocumentExportProperties properties) {
File path = Paths.get(
properties.getDestinationDirectoryPath(),
+ // TODO: jobId should remain the same after continuing job
String.format("%s_%s", properties.getQuery().getCollection(), jobId)).toFile(); // TODO: add end date
LOG.info("Destination directory path={}", path);
if (!path.exists()) {
@@ -86,33 +98,43 @@ public class DocumentExportConfiguration {
}
CompositeFileAction fileAction = new CompositeFileAction(new TarGzCompressor());
+ properties.s3Properties().ifPresent(s3Properties -> fileAction.add(new S3Uploader(s3Properties)));
return new DocumentExporter(
documentItemReader,
- firstDocument -> new LocalDocumentItemWriter(
- new File(path, String.format("%s_-_%s.json",
- properties.getQuery().getCollection(),
- firstDocument.get(properties.getFileNameSuffixColumn()))),
- fileAction),
+ firstDocument -> localDocumentItemWriter(properties, path, fileAction, firstDocument),
properties.getWriteBlockSize());
}
+ private LocalDocumentItemWriter localDocumentItemWriter(DocumentExportProperties properties, File path, FileAction fileAction, Document firstDocument) {
+ return new LocalDocumentItemWriter(outFile(properties.getQuery().getCollection(), path, firstDocument.get(properties.getFileNameSuffixColumn())),
+ file -> fileAction.perform(file, true));
+ }
+
+ private File outFile(String collection, File directoryPath, String suffix) {
+ // TODO: format date (suffix)
+ File file = new File(directoryPath, String.format("%s_-_%s.json", collection, suffix));
+ LOG.info("Exporting to temp file {}", file.getAbsolutePath());
+ return file;
+ }
+
@Bean
@StepScope
- public DocumentItemReader reader(DocumentSource documentSource) {
+ public DocumentItemReader reader(ObjectSource<Document> documentSource,
+ @Value("#{stepExecution.jobExecution.executionContext.get('exportProperties')}") DocumentExportProperties properties) {
return new DocumentItemReader(documentSource, properties.getReadBlockSize());
}
@Bean
@StepScope
- public DocumentSource logSource(@Value("#{jobParameters[endDate]}") String endDateText) {
- OffsetDateTime endDate = OffsetDateTime.now(ZoneOffset.UTC);
- if (!isBlank(endDateText))
- endDate = OffsetDateTime.parse(endDateText);
+ public ObjectSource logSource(@Value("#{jobParameters[start]}") String start,
+ @Value("#{jobParameters[end]}") String end,
+ @Value("#{stepExecution.jobExecution.executionContext.get('exportProperties')}") DocumentExportProperties properties) {
return new SolrDocumentSource(
- properties.getZooKeeperSocket(),
+ properties.getZooKeeperConnectionString(),
properties.getQuery(),
- SOLR_DATETIME_FORMATTER.format(endDate));
+ start,
+ end);
}
}
http://git-wip-us.apache.org/repos/asf/ambari/blob/36d0271f/ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/archive/DocumentExportJobListener.java
----------------------------------------------------------------------
diff --git a/ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/archive/DocumentExportJobListener.java b/ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/archive/DocumentExportJobListener.java
index f1df46c..3b6c402 100644
--- a/ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/archive/DocumentExportJobListener.java
+++ b/ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/archive/DocumentExportJobListener.java
@@ -23,9 +23,32 @@ import org.springframework.batch.core.JobExecution;
import org.springframework.batch.core.JobExecutionListener;
public class DocumentExportJobListener implements JobExecutionListener {
+
+ private final DocumentExportPropertyMap propertyMap;
+
+ public DocumentExportJobListener(DocumentExportPropertyMap propertyMap) {
+ this.propertyMap = propertyMap;
+ }
+
+
@Override
public void beforeJob(JobExecution jobExecution) {
+ try {
+ String jobName = jobExecution.getJobInstance().getJobName();
+ DocumentExportProperties defaultProperties = propertyMap.getSolrDataExport().get(jobName);
+ if (defaultProperties == null)
+ throw new UnsupportedOperationException("Properties not found for job " + jobName);
+ DocumentExportProperties properties = defaultProperties.deepCopy();
+ properties.apply(jobExecution.getJobParameters());
+ properties.validate();
+ jobExecution.getExecutionContext().put("exportProperties", properties);
+ }
+ catch (UnsupportedOperationException | IllegalArgumentException ex) {
+ jobExecution.stop();
+ jobExecution.setExitStatus(new ExitStatus(ExitStatus.FAILED.getExitCode(), ex.getMessage()));
+ throw ex;
+ }
}
@Override
http://git-wip-us.apache.org/repos/asf/ambari/blob/36d0271f/ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/archive/DocumentExportProperties.java
----------------------------------------------------------------------
diff --git a/ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/archive/DocumentExportProperties.java b/ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/archive/DocumentExportProperties.java
index d6301c0..37f6d1b 100644
--- a/ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/archive/DocumentExportProperties.java
+++ b/ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/archive/DocumentExportProperties.java
@@ -18,38 +18,34 @@
*/
package org.apache.ambari.infra.job.archive;
-import org.hibernate.validator.constraints.NotBlank;
+import org.apache.htrace.fasterxml.jackson.databind.ObjectMapper;
import org.springframework.batch.core.JobParameters;
-import org.springframework.boot.context.properties.ConfigurationProperties;
-import org.springframework.context.annotation.Configuration;
-import org.springframework.context.annotation.PropertySource;
-import javax.validation.constraints.Min;
+import java.io.IOException;
+import java.io.UncheckedIOException;
+import java.util.Optional;
import static org.apache.commons.lang.StringUtils.isBlank;
-@Configuration
-@PropertySource(value = {"classpath:infra-manager.properties"})
-@ConfigurationProperties(prefix = "infra-manager.jobs.solr_data_export")
public class DocumentExportProperties {
- @NotBlank
- private String zooKeeperSocket;
- @Min(1)
+ private String zooKeeperConnectionString;
private int readBlockSize;
- @Min(1)
private int writeBlockSize;
- @NotBlank
private String destinationDirectoryPath;
- @NotBlank
private String fileNameSuffixColumn;
private SolrQueryProperties query;
-
- public String getZooKeeperSocket() {
- return zooKeeperSocket;
+ private String s3AccessKey;
+ private String s3SecretKey;
+ private String s3KeyPrefix;
+ private String s3BucketName;
+ private String s3Endpoint;
+
+ public String getZooKeeperConnectionString() {
+ return zooKeeperConnectionString;
}
- public void setZooKeeperSocket(String zooKeeperSocket) {
- this.zooKeeperSocket = zooKeeperSocket;
+ public void setZooKeeperConnectionString(String zooKeeperConnectionString) {
+ this.zooKeeperConnectionString = zooKeeperConnectionString;
}
public int getReadBlockSize() {
@@ -76,37 +72,109 @@ public class DocumentExportProperties {
this.destinationDirectoryPath = destinationDirectoryPath;
}
+ public String getFileNameSuffixColumn() {
+ return fileNameSuffixColumn;
+ }
+
+ public void setFileNameSuffixColumn(String fileNameSuffixColumn) {
+ this.fileNameSuffixColumn = fileNameSuffixColumn;
+ }
+
+ public SolrQueryProperties getQuery() {
+ return query;
+ }
+
+ public void setQuery(SolrQueryProperties query) {
+ this.query = query;
+ }
+
+ public String getS3AccessKey() {
+ return s3AccessKey;
+ }
+
+ public void setS3AccessKey(String s3AccessKey) {
+ this.s3AccessKey = s3AccessKey;
+ }
+
+ public String getS3SecretKey() {
+ return s3SecretKey;
+ }
+
+ public void setS3SecretKey(String s3SecretKey) {
+ this.s3SecretKey = s3SecretKey;
+ }
+
+ public String getS3KeyPrefix() {
+ return s3KeyPrefix;
+ }
+
+ public void setS3KeyPrefix(String s3KeyPrefix) {
+ this.s3KeyPrefix = s3KeyPrefix;
+ }
+
+ public String getS3BucketName() {
+ return s3BucketName;
+ }
+
+ public void setS3BucketName(String s3BucketName) {
+ this.s3BucketName = s3BucketName;
+ }
+
+ public String getS3Endpoint() {
+ return s3Endpoint;
+ }
+
+ public void setS3Endpoint(String s3Endpoint) {
+ this.s3Endpoint = s3Endpoint;
+ }
+
public void apply(JobParameters jobParameters) {
- // TODO: solr query params
- zooKeeperSocket = jobParameters.getString("zooKeeperSocket", zooKeeperSocket);
+ zooKeeperConnectionString = jobParameters.getString("zooKeeperConnectionString", zooKeeperConnectionString);
readBlockSize = getIntJobParameter(jobParameters, "readBlockSize", readBlockSize);
writeBlockSize = getIntJobParameter(jobParameters, "writeBlockSize", writeBlockSize);
destinationDirectoryPath = jobParameters.getString("destinationDirectoryPath", destinationDirectoryPath);
- query.setCollection(jobParameters.getString("collection", query.getCollection()));
- query.setQueryText(jobParameters.getString("queryText", query.getQueryText()));
- query.setFilterQueryText(jobParameters.getString("filterQueryText", query.getFilterQueryText()));
+ query.apply(jobParameters);
}
private int getIntJobParameter(JobParameters jobParameters, String parameterName, int defaultValue) {
- String writeBlockSizeText = jobParameters.getString(parameterName);
- if (isBlank(writeBlockSizeText))
+ String valueText = jobParameters.getString(parameterName);
+ if (isBlank(valueText))
return defaultValue;
- return this.writeBlockSize = Integer.parseInt(writeBlockSizeText);
+ return Integer.parseInt(valueText);
}
- public String getFileNameSuffixColumn() {
- return fileNameSuffixColumn;
+ public DocumentExportProperties deepCopy() {
+ try {
+ ObjectMapper objectMapper = new ObjectMapper();
+ String json = objectMapper.writeValueAsString(this);
+ return objectMapper.readValue(json, DocumentExportProperties.class);
+ } catch (IOException e) {
+ throw new UncheckedIOException(e);
+ }
}
- public void setFileNameSuffixColumn(String fileNameSuffixColumn) {
- this.fileNameSuffixColumn = fileNameSuffixColumn;
+ public Optional<S3Properties> s3Properties() {
+ if (!isBlank(s3AccessKey) && !isBlank(s3SecretKey) && !isBlank(s3BucketName))
+ return Optional.of(new S3Properties(s3AccessKey, s3SecretKey, s3KeyPrefix, s3BucketName, s3Endpoint));
+ return Optional.empty();
}
- public SolrQueryProperties getQuery() {
- return query;
- }
+ public void validate() {
+ if (isBlank(zooKeeperConnectionString))
+ throw new IllegalArgumentException("The property zooKeeperConnectionString can not be null or empty string!");
- public void setQuery(SolrQueryProperties query) {
- this.query = query;
+ if (readBlockSize == 0)
+ throw new IllegalArgumentException("The property readBlockSize must be greater than 0!");
+
+ if (writeBlockSize == 0)
+ throw new IllegalArgumentException("The property writeBlockSize must be greater than 0!");
+
+ if (isBlank(destinationDirectoryPath))
+ throw new IllegalArgumentException("The property destinationDirectoryPath can not be null or empty string!");
+
+ if (isBlank(fileNameSuffixColumn))
+ throw new IllegalArgumentException("The property fileNameSuffixColumn can not be null or empty string!");
+
+ query.validate();
}
}
http://git-wip-us.apache.org/repos/asf/ambari/blob/36d0271f/ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/archive/DocumentExportPropertyMap.java
----------------------------------------------------------------------
diff --git a/ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/archive/DocumentExportPropertyMap.java b/ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/archive/DocumentExportPropertyMap.java
new file mode 100644
index 0000000..9af4afc
--- /dev/null
+++ b/ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/archive/DocumentExportPropertyMap.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.ambari.infra.job.archive;
+
+import org.springframework.boot.context.properties.ConfigurationProperties;
+import org.springframework.context.annotation.Configuration;
+
+import java.util.Map;
+
+@Configuration
+@ConfigurationProperties(prefix = "infra-manager.jobs")
+public class DocumentExportPropertyMap {
+ private Map<String, DocumentExportProperties> solrDataExport;
+
+ public Map<String, DocumentExportProperties> getSolrDataExport() {
+ return solrDataExport;
+ }
+
+ public void setSolrDataExport(Map<String, DocumentExportProperties> solrDataExport) {
+ this.solrDataExport = solrDataExport;
+ }
+}
http://git-wip-us.apache.org/repos/asf/ambari/blob/36d0271f/ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/archive/DocumentExportStepListener.java
----------------------------------------------------------------------
diff --git a/ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/archive/DocumentExportStepListener.java b/ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/archive/DocumentExportStepListener.java
deleted file mode 100644
index 3bab6d5..0000000
--- a/ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/archive/DocumentExportStepListener.java
+++ /dev/null
@@ -1,47 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.ambari.infra.job.archive;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.springframework.batch.core.ExitStatus;
-import org.springframework.batch.core.StepExecution;
-import org.springframework.batch.core.StepExecutionListener;
-
-public class DocumentExportStepListener implements StepExecutionListener {
- private static final Logger LOG = LoggerFactory.getLogger(DocumentExportStepListener.class);
-
- private final DocumentExportProperties properties;
-
- public DocumentExportStepListener(DocumentExportProperties properties) {
- this.properties = properties;
- }
-
- @Override
- public void beforeStep(StepExecution stepExecution) {
- properties.apply(stepExecution.getJobParameters());
- LOG.info("LogExport step - before step execution");
- }
-
- @Override
- public ExitStatus afterStep(StepExecution stepExecution) {
- LOG.info("LogExport step - after step execution");
- return stepExecution.getExitStatus();
- }
-}
http://git-wip-us.apache.org/repos/asf/ambari/blob/36d0271f/ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/archive/DocumentItemReader.java
----------------------------------------------------------------------
diff --git a/ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/archive/DocumentItemReader.java b/ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/archive/DocumentItemReader.java
index a4378a4..3a6b869 100644
--- a/ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/archive/DocumentItemReader.java
+++ b/ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/archive/DocumentItemReader.java
@@ -18,6 +18,8 @@
*/
package org.apache.ambari.infra.job.archive;
+import org.apache.ambari.infra.job.CloseableIterator;
+import org.apache.ambari.infra.job.ObjectSource;
import org.springframework.batch.item.ExecutionContext;
import org.springframework.batch.item.ItemStreamException;
import org.springframework.batch.item.support.AbstractItemStreamItemReader;
@@ -31,16 +33,16 @@ public class DocumentItemReader extends AbstractItemStreamItemReader<Document> i
public final static String POSITION = "last-read";
- private final DocumentSource documentSource;
+ private final ObjectSource<Document> documentSource;
private final int readBlockSize;
- private DocumentIterator documentIterator = null;
+ private CloseableIterator<Document> documentIterator = null;
private int count = 0;
private boolean eof = false;
private Document current = null;
private Document previous = null;
- public DocumentItemReader(DocumentSource documentSource, int readBlockSize) {
+ public DocumentItemReader(ObjectSource<Document> documentSource, int readBlockSize) {
this.documentSource = documentSource;
this.readBlockSize = readBlockSize;
setName(ClassUtils.getShortName(DocumentItemReader.class));
http://git-wip-us.apache.org/repos/asf/ambari/blob/36d0271f/ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/archive/DocumentIterator.java
----------------------------------------------------------------------
diff --git a/ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/archive/DocumentIterator.java b/ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/archive/DocumentIterator.java
index 6232cfc..5fa29b0 100644
--- a/ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/archive/DocumentIterator.java
+++ b/ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/archive/DocumentIterator.java
@@ -16,10 +16,9 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.ambari.infra.job.archive;
+package org.apache.ambari.infra.job;
import java.util.Iterator;
-// TODO: generic closeable iterator
-public interface DocumentIterator extends Iterator<Document>, AutoCloseable {
+public interface CloseableIterator<T> extends Iterator<T>, AutoCloseable {
}
http://git-wip-us.apache.org/repos/asf/ambari/blob/36d0271f/ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/archive/DocumentSource.java
----------------------------------------------------------------------
diff --git a/ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/archive/DocumentSource.java b/ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/archive/DocumentSource.java
index c9871a3..7427771 100644
--- a/ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/archive/DocumentSource.java
+++ b/ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/archive/DocumentSource.java
@@ -18,7 +18,8 @@
*/
package org.apache.ambari.infra.job.archive;
-// TODO: generic object source
-public interface DocumentSource {
- DocumentIterator open(Document current, int rows);
+import java.io.File;
+
+public interface ItemWriterListener {
+ void onCompleted(File file);
}
http://git-wip-us.apache.org/repos/asf/ambari/blob/36d0271f/ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/archive/FileAction.java
----------------------------------------------------------------------
diff --git a/ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/archive/FileAction.java b/ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/archive/FileAction.java
index 26a8c63..d3f2a65 100644
--- a/ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/archive/FileAction.java
+++ b/ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/archive/FileAction.java
@@ -21,5 +21,5 @@ package org.apache.ambari.infra.job.archive;
import java.io.File;
public interface FileAction {
- File perform(File inputFile);
+ File perform(File inputFile, boolean deleteInput);
}
http://git-wip-us.apache.org/repos/asf/ambari/blob/36d0271f/ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/archive/LocalDocumentItemWriter.java
----------------------------------------------------------------------
diff --git a/ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/archive/LocalDocumentItemWriter.java b/ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/archive/LocalDocumentItemWriter.java
index 02d898d..baad61b 100644
--- a/ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/archive/LocalDocumentItemWriter.java
+++ b/ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/archive/LocalDocumentItemWriter.java
@@ -29,10 +29,10 @@ public class LocalDocumentItemWriter implements DocumentItemWriter {
private final File outFile;
private final BufferedWriter bufferedWriter;
- private final FileAction fileAction;
+ private final ItemWriterListener itemWriterListener;
- public LocalDocumentItemWriter(File outFile, FileAction fileAction) {
- this.fileAction = fileAction;
+ public LocalDocumentItemWriter(File outFile, ItemWriterListener itemWriterListener) {
+ this.itemWriterListener = itemWriterListener;
this.outFile = outFile;
try {
this.bufferedWriter = new BufferedWriter(new OutputStreamWriter(new FileOutputStream(outFile), ENCODING));
@@ -64,7 +64,7 @@ public class LocalDocumentItemWriter implements DocumentItemWriter {
public void close() {
try {
bufferedWriter.close();
- fileAction.perform(outFile);
+ itemWriterListener.onCompleted(outFile);
} catch (IOException e) {
throw new UncheckedIOException(e);
}
http://git-wip-us.apache.org/repos/asf/ambari/blob/36d0271f/ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/archive/S3Properties.java
----------------------------------------------------------------------
diff --git a/ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/archive/S3Properties.java b/ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/archive/S3Properties.java
index 495401d..0979f10 100644
--- a/ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/archive/S3Properties.java
+++ b/ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/archive/S3Properties.java
@@ -18,47 +18,48 @@
*/
package org.apache.ambari.infra.job.archive;
-import org.hibernate.validator.constraints.NotBlank;
-
public class S3Properties {
- @NotBlank
- private String accessKey;
- @NotBlank
- private String secretKey;
- @NotBlank
- private String keyPrefix;
- @NotBlank
- private String bucketName;
-
- public String getAccessKey() {
- return accessKey;
- }
+ private String s3AccessKey;
+ private String s3SecretKey;
+ private String s3KeyPrefix;
+ private String s3BucketName;
+ private String s3EndPoint;
- public String getSecretKey() {
- return secretKey;
+ public S3Properties(String s3AccessKey, String s3SecretKey, String s3KeyPrefix, String s3BucketName, String s3EndPoint) {
+ this.s3AccessKey = s3AccessKey;
+ this.s3SecretKey = s3SecretKey;
+ this.s3KeyPrefix = s3KeyPrefix;
+ this.s3BucketName = s3BucketName;
+ this.s3EndPoint = s3EndPoint;
}
- public String getKeyPrefix() {
- return keyPrefix;
+ public String getS3AccessKey() {
+ return s3AccessKey;
}
- public String getBucketName() {
- return bucketName;
+ public String getS3SecretKey() {
+ return s3SecretKey;
}
- public void setAccessKey(String accessKey) {
- this.accessKey = accessKey;
+ public String getS3KeyPrefix() {
+ return s3KeyPrefix;
}
- public void setSecretKey(String secretKey) {
- this.secretKey = secretKey;
+ public String getS3BucketName() {
+ return s3BucketName;
}
- public void setKeyPrefix(String keyPrefix) {
- this.keyPrefix = keyPrefix;
+ public String getS3EndPoint() {
+ return s3EndPoint;
}
- public void setBucketName(String bucketName) {
- this.bucketName = bucketName;
+ @Override
+ public String toString() {
+ return "S3Properties{" +
+ "s3AccessKey='" + s3AccessKey + '\'' +
+ ", s3KeyPrefix='" + s3KeyPrefix + '\'' +
+ ", s3BucketName='" + s3BucketName + '\'' +
+ ", s3EndPoint='" + s3EndPoint + '\'' +
+ '}';
}
}
http://git-wip-us.apache.org/repos/asf/ambari/blob/36d0271f/ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/archive/S3Uploader.java
----------------------------------------------------------------------
diff --git a/ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/archive/S3Uploader.java b/ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/archive/S3Uploader.java
index 3214e50..deeb9c7 100644
--- a/ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/archive/S3Uploader.java
+++ b/ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/archive/S3Uploader.java
@@ -2,9 +2,13 @@ package org.apache.ambari.infra.job.archive;
import com.amazonaws.auth.BasicAWSCredentials;
import com.amazonaws.services.s3.AmazonS3Client;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import java.io.File;
+import static org.apache.commons.lang.StringUtils.isBlank;
+
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
@@ -23,17 +27,25 @@ import java.io.File;
* specific language governing permissions and limitations
* under the License.
*/
-public class S3Uploader implements FileAction {
+public class S3Uploader extends AbstractFileAction {
+
+ private static final Logger LOG = LoggerFactory.getLogger(DocumentExportConfiguration.class);
private final AmazonS3Client client;
private final String keyPrefix;
private final String bucketName;
public S3Uploader(S3Properties s3Properties) {
- this.keyPrefix = s3Properties.getKeyPrefix();
- this.bucketName = s3Properties.getBucketName();
- BasicAWSCredentials credentials = new BasicAWSCredentials(s3Properties.getAccessKey(), s3Properties.getSecretKey());
+ LOG.info("Initializing S3 client with " + s3Properties);
+
+ this.keyPrefix = s3Properties.getS3KeyPrefix();
+ this.bucketName = s3Properties.getS3BucketName();
+ BasicAWSCredentials credentials = new BasicAWSCredentials(s3Properties.getS3AccessKey(), s3Properties.getS3SecretKey());
client = new AmazonS3Client(credentials);
+ if (!isBlank(s3Properties.getS3EndPoint()))
+ client.setEndpoint(s3Properties.getS3EndPoint());
+// Note: without pathStyleAccess=true endpoint going to be <bucketName>.<host>:<port>
+// client.setS3ClientOptions(S3ClientOptions.builder().setPathStyleAccess(true).build());
}
@Override
@@ -41,8 +53,7 @@ public class S3Uploader implements FileAction {
String key = keyPrefix + inputFile.getName();
if (client.doesObjectExist(bucketName, key)) {
- System.out.println("Object '" + key + "' already exists");
- System.exit(0);
+ throw new UnsupportedOperationException(String.format("Object '%s' already exists in bucket '%s'", key, bucketName));
}
client.putObject(bucketName, key, inputFile);
http://git-wip-us.apache.org/repos/asf/ambari/blob/36d0271f/ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/archive/SolrDocumentIterator.java
----------------------------------------------------------------------
diff --git a/ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/archive/SolrDocumentIterator.java b/ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/archive/SolrDocumentIterator.java
index db4069b..2e7341d 100644
--- a/ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/archive/SolrDocumentIterator.java
+++ b/ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/archive/SolrDocumentIterator.java
@@ -18,6 +18,7 @@
*/
package org.apache.ambari.infra.job.archive;
+import org.apache.ambari.infra.job.CloseableIterator;
import org.apache.solr.client.solrj.impl.CloudSolrClient;
import org.apache.solr.client.solrj.response.QueryResponse;
import org.apache.solr.common.SolrDocument;
@@ -31,7 +32,7 @@ import java.util.HashMap;
import java.util.Iterator;
import java.util.TimeZone;
-public class SolrDocumentIterator implements DocumentIterator {
+public class SolrDocumentIterator implements CloseableIterator<Document> {
private static final DateFormat SOLR_DATE_FORMAT = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSSX");