You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gobblin.apache.org by hu...@apache.org on 2017/12/04 23:21:08 UTC
incubator-gobblin git commit: [GOBBLIN-329] Add a new basic cluster
integration test
Repository: incubator-gobblin
Updated Branches:
refs/heads/master b7b2bd9d1 -> 462ea8cf2
[GOBBLIN-329] Add a new basic cluster integration test
Closes #2181 from HappyRay/add-cluster-basic-
integration-test
Project: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/commit/462ea8cf
Tree: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/tree/462ea8cf
Diff: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/diff/462ea8cf
Branch: refs/heads/master
Commit: 462ea8cf214f22000b198f813979930b4d2dda4b
Parents: b7b2bd9
Author: Ray Yang <ru...@gmail.com>
Authored: Mon Dec 4 15:21:02 2017 -0800
Committer: Hung Tran <hu...@linkedin.com>
Committed: Mon Dec 4 15:21:02 2017 -0800
----------------------------------------------------------------------
.../GobblinClusterConfigurationKeys.java | 2 +-
.../gobblin/cluster/GobblinClusterUtils.java | 6 +-
.../gobblin/cluster/ClusterIntegrationTest.java | 220 +++++++++++++++++++
.../src/test/resources/BasicCluster.conf | 24 ++
.../src/test/resources/HelloWorldJob.conf | 45 ++++
5 files changed, 293 insertions(+), 4 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/462ea8cf/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinClusterConfigurationKeys.java
----------------------------------------------------------------------
diff --git a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinClusterConfigurationKeys.java b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinClusterConfigurationKeys.java
index ab6f8b4..653babf 100644
--- a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinClusterConfigurationKeys.java
+++ b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinClusterConfigurationKeys.java
@@ -35,7 +35,7 @@ public class GobblinClusterConfigurationKeys {
public static final String STANDALONE_CLUSTER_MODE = "standalone_cluster";
public static final String STANDALONE_CLUSTER_MODE_KEY = GOBBLIN_CLUSTER_PREFIX + "standaloneMode";
public static final boolean DEFAULT_STANDALONE_CLUSTER_MODE = false;
- public static final String CLUSTRER_WORK_DIR = GOBBLIN_CLUSTER_PREFIX + "workDir";
+ public static final String CLUSTER_WORK_DIR = GOBBLIN_CLUSTER_PREFIX + "workDir";
// Helix configuration properties.
public static final String HELIX_CLUSTER_NAME_KEY = GOBBLIN_CLUSTER_PREFIX + "helix.cluster.name";
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/462ea8cf/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinClusterUtils.java
----------------------------------------------------------------------
diff --git a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinClusterUtils.java b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinClusterUtils.java
index 3082720..3f53443 100644
--- a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinClusterUtils.java
+++ b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinClusterUtils.java
@@ -17,7 +17,7 @@
package org.apache.gobblin.cluster;
-import static org.apache.gobblin.cluster.GobblinClusterConfigurationKeys.CLUSTRER_WORK_DIR;
+import static org.apache.gobblin.cluster.GobblinClusterConfigurationKeys.CLUSTER_WORK_DIR;
import com.typesafe.config.Config;
import java.net.InetAddress;
@@ -54,8 +54,8 @@ public class GobblinClusterUtils {
public static Path getAppWorkDirPathFromConfig(Config config, FileSystem fs,
String applicationName, String applicationId) {
- if (config.hasPath(CLUSTRER_WORK_DIR)) {
- return new Path(config.getString(CLUSTRER_WORK_DIR));
+ if (config.hasPath(CLUSTER_WORK_DIR)) {
+ return new Path(config.getString(CLUSTER_WORK_DIR));
}
return new Path(fs.getHomeDirectory(), getAppWorkDirPath(applicationName, applicationId));
}
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/462ea8cf/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/ClusterIntegrationTest.java
----------------------------------------------------------------------
diff --git a/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/ClusterIntegrationTest.java b/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/ClusterIntegrationTest.java
new file mode 100644
index 0000000..b54db1a
--- /dev/null
+++ b/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/ClusterIntegrationTest.java
@@ -0,0 +1,220 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.cluster;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStream;
+import java.net.URL;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.commons.io.FileUtils;
+import org.apache.curator.test.TestingServer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.Test;
+
+import com.google.common.base.Optional;
+import com.google.common.io.Resources;
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
+
+import org.apache.gobblin.testing.AssertWithBackoff;
+
+
+public class ClusterIntegrationTest {
+
+ public final static Logger _logger = LoggerFactory.getLogger(ClusterIntegrationTest.class);
+ public static final String JOB_CONF_NAME = "HelloWorldJob.conf";
+ Config _config;
+ private Path _workPath;
+ private Path _jobConfigPath;
+ private Path _jobOutputBasePath;
+ private URL _jobConfResourceUrl;
+ private TestingServer _testingZKServer;
+ private GobblinTaskRunner _worker;
+ private GobblinClusterManager _manager;
+
+
+ @Test
+ public void simpleJobShouldComplete() throws Exception {
+ init();
+ startCluster();
+ waitForAndVerifyOutputFiles();
+ shutdownCluster();
+ }
+
+ private void init() throws Exception {
+ initWorkDir();
+ initZooKeeper();
+ initConfig();
+ initJobConfDir();
+ initJobOutputDir();
+ }
+
+ private void initWorkDir() throws IOException {
+ // Relative to the current directory
+ _workPath = Paths.get("gobblin-integration-test-work-dir");
+ _logger.info("Created a new work directory: " + _workPath.toAbsolutePath());
+
+ // Delete the working directory in case the previous test fails to delete the directory
+ // e.g. when the test was killed forcefully under a debugger.
+ deleteWorkDir();
+ Files.createDirectory(_workPath);
+ }
+
+ private void initJobConfDir() throws IOException {
+ String jobConfigDir = _config.getString(GobblinClusterConfigurationKeys.JOB_CONF_PATH_KEY);
+ _jobConfigPath = Paths.get(jobConfigDir);
+ Files.createDirectories(_jobConfigPath);
+ _jobConfResourceUrl = Resources.getResource(JOB_CONF_NAME);
+ copyJobConfFromResource();
+ }
+
+ private void initJobOutputDir() throws IOException {
+ _jobOutputBasePath = Paths.get(_workPath + "/job-output");
+ Files.createDirectory(_jobOutputBasePath);
+ }
+
+ private void copyJobConfFromResource() throws IOException {
+ try (InputStream resourceStream = _jobConfResourceUrl.openStream()) {
+ File targetFile = new File(_jobConfigPath + "/" + JOB_CONF_NAME);
+ FileUtils.copyInputStreamToFile(resourceStream, targetFile);
+ }
+ }
+
+ private void initZooKeeper() throws Exception {
+ _testingZKServer = new TestingServer(false);
+ _logger.info(
+ "Created testing ZK Server. Connection string : " + _testingZKServer.getConnectString());
+ }
+
+ private void initConfig() {
+ Config configFromResource = getConfigFromResource();
+ Config configOverride = getConfigOverride();
+ _config = configOverride.withFallback(configFromResource).resolve();
+ }
+
+ private Config getConfigOverride() {
+ Map<String, String> configMap = new HashMap<>();
+ String zkConnectionString = _testingZKServer.getConnectString();
+ configMap.put(GobblinClusterConfigurationKeys.ZK_CONNECTION_STRING_KEY, zkConnectionString);
+ configMap.put(GobblinClusterConfigurationKeys.CLUSTER_WORK_DIR, _workPath.toString());
+ Config config = ConfigFactory.parseMap(configMap);
+ return config;
+ }
+
+ private Config getConfigFromResource() {
+ URL url = Resources.getResource("BasicCluster.conf");
+ Config config = ConfigFactory.parseURL(url);
+ return config;
+ }
+
+ @AfterMethod
+ public void tearDown() throws IOException {
+ deleteWorkDir();
+ }
+
+ private void deleteWorkDir() throws IOException {
+ if ((_workPath != null) && Files.exists(_workPath)) {
+ FileUtils.deleteDirectory(_workPath.toFile());
+ }
+ }
+
+ private void createHelixCluster() {
+ String zkConnectionString = _config
+ .getString(GobblinClusterConfigurationKeys.ZK_CONNECTION_STRING_KEY);
+ String helix_cluster_name = _config
+ .getString(GobblinClusterConfigurationKeys.HELIX_CLUSTER_NAME_KEY);
+ HelixUtils.createGobblinHelixCluster(zkConnectionString, helix_cluster_name);
+ }
+
+ private void startCluster() throws Exception {
+ _testingZKServer.start();
+ createHelixCluster();
+ startWorker();
+ startManager();
+ }
+
+ private void startWorker() throws Exception {
+ _worker = new GobblinTaskRunner(TestHelper.TEST_APPLICATION_NAME, "Worker",
+ TestHelper.TEST_APPLICATION_ID, "1",
+ _config, Optional.absent());
+
+ // Need to run in another thread since the start call will not return until the stop method
+ // is called.
+ Thread workerThread = new Thread(_worker::start);
+ workerThread.start();
+ }
+
+ private void startManager() throws Exception {
+ _manager = new GobblinClusterManager(TestHelper.TEST_APPLICATION_NAME,
+ TestHelper.TEST_APPLICATION_ID,
+ _config, Optional.absent());
+
+ _manager.start();
+ }
+
+ private void shutdownCluster() throws InterruptedException, IOException {
+ _worker.stop();
+ _manager.stop();
+ _testingZKServer.close();
+ }
+
+ private void waitForAndVerifyOutputFiles() throws Exception {
+
+ AssertWithBackoff asserter = AssertWithBackoff.create().logger(_logger).timeoutMs(60_000)
+ .maxSleepMs(100).backoffFactor(1.5);
+
+ asserter.assertTrue(this::hasExpectedFilesBeenCreated, "Waiting for job-completion");
+ }
+
+ private boolean hasExpectedFilesBeenCreated(Void input) {
+ int numOfFiles = getNumOfOutputFiles(_jobOutputBasePath);
+ return numOfFiles == 1;
+ }
+
+ private int getNumOfOutputFiles(Path jobOutputDir) {
+ Collection<File> outputFiles = FileUtils
+ .listFiles(jobOutputDir.toFile(), new String[]{"txt"}, true);
+ return outputFiles.size();
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/462ea8cf/gobblin-cluster/src/test/resources/BasicCluster.conf
----------------------------------------------------------------------
diff --git a/gobblin-cluster/src/test/resources/BasicCluster.conf b/gobblin-cluster/src/test/resources/BasicCluster.conf
new file mode 100644
index 0000000..a42b41d
--- /dev/null
+++ b/gobblin-cluster/src/test/resources/BasicCluster.conf
@@ -0,0 +1,24 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+# Cluster / Helix configuration properties
+gobblin.cluster.helix.cluster.name=BasicGobblinCluster
+gobblin.cluster.workDir=/tmp/gobblinClusterBasicTest/
+gobblin.cluster.job.conf.path=${gobblin.cluster.workDir}/jobs
+gobblin.cluster.standaloneMode=true
+gobblin.cluster.job.executeInSchedulingThread=false
+jobexecutor.threadpool.size=20
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/462ea8cf/gobblin-cluster/src/test/resources/HelloWorldJob.conf
----------------------------------------------------------------------
diff --git a/gobblin-cluster/src/test/resources/HelloWorldJob.conf b/gobblin-cluster/src/test/resources/HelloWorldJob.conf
new file mode 100644
index 0000000..7db6b09
--- /dev/null
+++ b/gobblin-cluster/src/test/resources/HelloWorldJob.conf
@@ -0,0 +1,45 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+# Number of hellos to generate
+gobblin.source.helloWorld.numHellos=1
+
+# Job Identification
+job.name=HelloWorldTestJob
+job.group=HelloWorldGroup
+job.description="The Hello World Gobblin job"
+
+source.class=org.apache.gobblin.util.test.HelloWorldSource
+
+# also works with local file system
+writer.destination.type=HDFS
+writer.builder.class=org.apache.gobblin.writer.SimpleDataWriterBuilder
+writer.output.format=txt
+writer.staging.dir=${gobblin.cluster.workDir}/writer-staging
+writer.output.dir=${gobblin.cluster.workDir}/writer-output
+
+# Need the converter since this writer accepts bytes only.
+converter.classes=org.apache.gobblin.converter.string.StringToBytesConverter
+
+data.publisher.type=org.apache.gobblin.publisher.BaseDataPublisher
+data.publisher.final.dir=${gobblin.cluster.workDir}/job-output
+data.publisher.replace.final.dir=false
+
+state.store.enabled=false
+
+# Miscellaneous
+job.lock.enabled=false