You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gobblin.apache.org by hu...@apache.org on 2017/12/04 23:21:08 UTC

incubator-gobblin git commit: [GOBBLIN-329] Add a new basic cluster integration test

Repository: incubator-gobblin
Updated Branches:
  refs/heads/master b7b2bd9d1 -> 462ea8cf2


[GOBBLIN-329] Add a new basic cluster integration test

Closes #2181 from HappyRay/add-cluster-basic-
integration-test


Project: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/commit/462ea8cf
Tree: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/tree/462ea8cf
Diff: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/diff/462ea8cf

Branch: refs/heads/master
Commit: 462ea8cf214f22000b198f813979930b4d2dda4b
Parents: b7b2bd9
Author: Ray Yang <ru...@gmail.com>
Authored: Mon Dec 4 15:21:02 2017 -0800
Committer: Hung Tran <hu...@linkedin.com>
Committed: Mon Dec 4 15:21:02 2017 -0800

----------------------------------------------------------------------
 .../GobblinClusterConfigurationKeys.java        |   2 +-
 .../gobblin/cluster/GobblinClusterUtils.java    |   6 +-
 .../gobblin/cluster/ClusterIntegrationTest.java | 220 +++++++++++++++++++
 .../src/test/resources/BasicCluster.conf        |  24 ++
 .../src/test/resources/HelloWorldJob.conf       |  45 ++++
 5 files changed, 293 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/462ea8cf/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinClusterConfigurationKeys.java
----------------------------------------------------------------------
diff --git a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinClusterConfigurationKeys.java b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinClusterConfigurationKeys.java
index ab6f8b4..653babf 100644
--- a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinClusterConfigurationKeys.java
+++ b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinClusterConfigurationKeys.java
@@ -35,7 +35,7 @@ public class GobblinClusterConfigurationKeys {
   public static final String STANDALONE_CLUSTER_MODE = "standalone_cluster";
   public static final String STANDALONE_CLUSTER_MODE_KEY = GOBBLIN_CLUSTER_PREFIX + "standaloneMode";
   public static final boolean DEFAULT_STANDALONE_CLUSTER_MODE = false;
-  public static final String CLUSTRER_WORK_DIR = GOBBLIN_CLUSTER_PREFIX + "workDir";
+  public static final String CLUSTER_WORK_DIR = GOBBLIN_CLUSTER_PREFIX + "workDir";
 
   // Helix configuration properties.
   public static final String HELIX_CLUSTER_NAME_KEY = GOBBLIN_CLUSTER_PREFIX + "helix.cluster.name";

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/462ea8cf/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinClusterUtils.java
----------------------------------------------------------------------
diff --git a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinClusterUtils.java b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinClusterUtils.java
index 3082720..3f53443 100644
--- a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinClusterUtils.java
+++ b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinClusterUtils.java
@@ -17,7 +17,7 @@
 
 package org.apache.gobblin.cluster;
 
-import static org.apache.gobblin.cluster.GobblinClusterConfigurationKeys.CLUSTRER_WORK_DIR;
+import static org.apache.gobblin.cluster.GobblinClusterConfigurationKeys.CLUSTER_WORK_DIR;
 
 import com.typesafe.config.Config;
 import java.net.InetAddress;
@@ -54,8 +54,8 @@ public class GobblinClusterUtils {
 
   public static Path getAppWorkDirPathFromConfig(Config config, FileSystem fs,
       String applicationName, String applicationId) {
-    if (config.hasPath(CLUSTRER_WORK_DIR)) {
-      return new Path(config.getString(CLUSTRER_WORK_DIR));
+    if (config.hasPath(CLUSTER_WORK_DIR)) {
+      return new Path(config.getString(CLUSTER_WORK_DIR));
     }
     return new Path(fs.getHomeDirectory(), getAppWorkDirPath(applicationName, applicationId));
   }

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/462ea8cf/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/ClusterIntegrationTest.java
----------------------------------------------------------------------
diff --git a/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/ClusterIntegrationTest.java b/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/ClusterIntegrationTest.java
new file mode 100644
index 0000000..b54db1a
--- /dev/null
+++ b/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/ClusterIntegrationTest.java
@@ -0,0 +1,220 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.cluster;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStream;
+import java.net.URL;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.commons.io.FileUtils;
+import org.apache.curator.test.TestingServer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.Test;
+
+import com.google.common.base.Optional;
+import com.google.common.io.Resources;
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
+
+import org.apache.gobblin.testing.AssertWithBackoff;
+
+
+public class ClusterIntegrationTest {
+
+  public final static Logger _logger = LoggerFactory.getLogger(ClusterIntegrationTest.class);
+  public static final String JOB_CONF_NAME = "HelloWorldJob.conf";
+  Config _config;
+  private Path _workPath;
+  private Path _jobConfigPath;
+  private Path _jobOutputBasePath;
+  private URL _jobConfResourceUrl;
+  private TestingServer _testingZKServer;
+  private GobblinTaskRunner _worker;
+  private GobblinClusterManager _manager;
+
+
+  @Test
+  public void simpleJobShouldComplete() throws Exception {
+    init();
+    startCluster();
+    waitForAndVerifyOutputFiles();
+    shutdownCluster();
+  }
+
+  private void init() throws Exception {
+    initWorkDir();
+    initZooKeeper();
+    initConfig();
+    initJobConfDir();
+    initJobOutputDir();
+  }
+
+  private void initWorkDir() throws IOException {
+    // Relative to the current directory
+    _workPath = Paths.get("gobblin-integration-test-work-dir");
+    _logger.info("Created a new work directory: " + _workPath.toAbsolutePath());
+
+    // Delete the working directory in case the previous test fails to delete the directory
+    // e.g. when the test was killed forcefully under a debugger.
+    deleteWorkDir();
+    Files.createDirectory(_workPath);
+  }
+
+  private void initJobConfDir() throws IOException {
+    String jobConfigDir = _config.getString(GobblinClusterConfigurationKeys.JOB_CONF_PATH_KEY);
+    _jobConfigPath = Paths.get(jobConfigDir);
+    Files.createDirectories(_jobConfigPath);
+    _jobConfResourceUrl = Resources.getResource(JOB_CONF_NAME);
+    copyJobConfFromResource();
+  }
+
+  private void initJobOutputDir() throws IOException {
+    _jobOutputBasePath = Paths.get(_workPath + "/job-output");
+    Files.createDirectory(_jobOutputBasePath);
+  }
+
+  private void copyJobConfFromResource() throws IOException {
+    try (InputStream resourceStream = _jobConfResourceUrl.openStream()) {
+      File targetFile = new File(_jobConfigPath + "/" + JOB_CONF_NAME);
+      FileUtils.copyInputStreamToFile(resourceStream, targetFile);
+    }
+  }
+
+  private void initZooKeeper() throws Exception {
+    _testingZKServer = new TestingServer(false);
+    _logger.info(
+        "Created testing ZK Server. Connection string : " + _testingZKServer.getConnectString());
+  }
+
+  private void initConfig() {
+    Config configFromResource = getConfigFromResource();
+    Config configOverride = getConfigOverride();
+    _config = configOverride.withFallback(configFromResource).resolve();
+  }
+
+  private Config getConfigOverride() {
+    Map<String, String> configMap = new HashMap<>();
+    String zkConnectionString = _testingZKServer.getConnectString();
+    configMap.put(GobblinClusterConfigurationKeys.ZK_CONNECTION_STRING_KEY, zkConnectionString);
+    configMap.put(GobblinClusterConfigurationKeys.CLUSTER_WORK_DIR, _workPath.toString());
+    Config config = ConfigFactory.parseMap(configMap);
+    return config;
+  }
+
+  private Config getConfigFromResource() {
+    URL url = Resources.getResource("BasicCluster.conf");
+    Config config = ConfigFactory.parseURL(url);
+    return config;
+  }
+
+  @AfterMethod
+  public void tearDown() throws IOException {
+    deleteWorkDir();
+  }
+
+  private void deleteWorkDir() throws IOException {
+    if ((_workPath != null) && Files.exists(_workPath)) {
+      FileUtils.deleteDirectory(_workPath.toFile());
+    }
+  }
+
+  private void createHelixCluster() {
+    String zkConnectionString = _config
+        .getString(GobblinClusterConfigurationKeys.ZK_CONNECTION_STRING_KEY);
+    String helix_cluster_name = _config
+        .getString(GobblinClusterConfigurationKeys.HELIX_CLUSTER_NAME_KEY);
+    HelixUtils.createGobblinHelixCluster(zkConnectionString, helix_cluster_name);
+  }
+
+  private void startCluster() throws Exception {
+    _testingZKServer.start();
+    createHelixCluster();
+    startWorker();
+    startManager();
+  }
+
+  private void startWorker() throws Exception {
+    _worker = new GobblinTaskRunner(TestHelper.TEST_APPLICATION_NAME, "Worker",
+        TestHelper.TEST_APPLICATION_ID, "1",
+        _config, Optional.absent());
+
+    // Need to run in another thread since the start call will not return until the stop method
+    // is called.
+    Thread workerThread = new Thread(_worker::start);
+    workerThread.start();
+  }
+
+  private void startManager() throws Exception {
+    _manager = new GobblinClusterManager(TestHelper.TEST_APPLICATION_NAME,
+        TestHelper.TEST_APPLICATION_ID,
+        _config, Optional.absent());
+
+    _manager.start();
+  }
+
+  private void shutdownCluster() throws InterruptedException, IOException {
+    _worker.stop();
+    _manager.stop();
+    _testingZKServer.close();
+  }
+
+  private void waitForAndVerifyOutputFiles() throws Exception {
+
+    AssertWithBackoff asserter = AssertWithBackoff.create().logger(_logger).timeoutMs(60_000)
+        .maxSleepMs(100).backoffFactor(1.5);
+
+    asserter.assertTrue(this::hasExpectedFilesBeenCreated, "Waiting for job-completion");
+  }
+
+  private boolean hasExpectedFilesBeenCreated(Void input) {
+    int numOfFiles = getNumOfOutputFiles(_jobOutputBasePath);
+    return numOfFiles == 1;
+  }
+
+  private int getNumOfOutputFiles(Path jobOutputDir) {
+    Collection<File> outputFiles = FileUtils
+        .listFiles(jobOutputDir.toFile(), new String[]{"txt"}, true);
+    return outputFiles.size();
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/462ea8cf/gobblin-cluster/src/test/resources/BasicCluster.conf
----------------------------------------------------------------------
diff --git a/gobblin-cluster/src/test/resources/BasicCluster.conf b/gobblin-cluster/src/test/resources/BasicCluster.conf
new file mode 100644
index 0000000..a42b41d
--- /dev/null
+++ b/gobblin-cluster/src/test/resources/BasicCluster.conf
@@ -0,0 +1,24 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+# Cluster / Helix configuration properties
+gobblin.cluster.helix.cluster.name=BasicGobblinCluster
+gobblin.cluster.workDir=/tmp/gobblinClusterBasicTest/
+gobblin.cluster.job.conf.path=${gobblin.cluster.workDir}/jobs
+gobblin.cluster.standaloneMode=true
+gobblin.cluster.job.executeInSchedulingThread=false
+jobexecutor.threadpool.size=20

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/462ea8cf/gobblin-cluster/src/test/resources/HelloWorldJob.conf
----------------------------------------------------------------------
diff --git a/gobblin-cluster/src/test/resources/HelloWorldJob.conf b/gobblin-cluster/src/test/resources/HelloWorldJob.conf
new file mode 100644
index 0000000..7db6b09
--- /dev/null
+++ b/gobblin-cluster/src/test/resources/HelloWorldJob.conf
@@ -0,0 +1,45 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+# Number of hellos to generate
+gobblin.source.helloWorld.numHellos=1
+
+# Job Identification
+job.name=HelloWorldTestJob
+job.group=HelloWorldGroup
+job.description="The Hello World Gobblin job"
+
+source.class=org.apache.gobblin.util.test.HelloWorldSource
+
+# also works with local file system
+writer.destination.type=HDFS
+writer.builder.class=org.apache.gobblin.writer.SimpleDataWriterBuilder
+writer.output.format=txt
+writer.staging.dir=${gobblin.cluster.workDir}/writer-staging
+writer.output.dir=${gobblin.cluster.workDir}/writer-output
+
+# Need the converter since this writer accepts bytes only.
+converter.classes=org.apache.gobblin.converter.string.StringToBytesConverter
+
+data.publisher.type=org.apache.gobblin.publisher.BaseDataPublisher
+data.publisher.final.dir=${gobblin.cluster.workDir}/job-output
+data.publisher.replace.final.dir=false
+
+state.store.enabled=false
+
+# Miscellaneous
+job.lock.enabled=false